Linux poll详解

poll 函数用于检测一组文件描述符(File Descriptor, fd)上的可读可写和出错事件,其函数签名如下:

#include <poll.h>

int poll(struct pollfd* fds, nfds_t nfds, int timeout);

参数解释:

  • fds:指向一个结构体数组的首个元素的指针,每个数组元素都是一个 struct pollfd 结构,用于指定检测某个给定的 fd 的条件;
  • nfds:参数 fds 结构体数组的长度,nfds_t 本质上是 unsigned long int,其定义如下:
typedef unsigned long int nfds_t;
  • timeout:表示 poll 函数的超时时间,单位为毫秒。

struct pollfd 结构体定义如下:

struct pollfd {
    int   fd;         /* 待检测事件的 fd       */
    short events;     /* 关心的事件组合        */
    short revents;    /* 检测后的得到的事件类型  */
};

struct pollfd的 events 字段是由开发者来设置,告诉内核我们关注什么事件,而 revents 字段是 poll 函数返回时内核设置的,用以说明该 fd 发生了什么事件。

events 和 revents 一般有如下取值:

事件宏事件描述是否可以作为输入(events)是否可以作为输出(revents)
POLLIN数据可读(包括普通数据&优先数据)
POLLOUT数据可写(普通数据&优先数据)
POLLRDNORM等同于 POLLIN
POLLRDBAND优先级带数据可读(一般用于 Linux 系统)
POLLPRI高优先级数据可读,例如 TCP 带外数据
POLLWRNORM等同于 POLLOUT
POLLWRBAND优先级带数据可写
POLLRDHUPTCP连接被对端关闭,或者关闭了写操作,由 GNU 引入
POPPHUP挂起
POLLERR错误
POLLNVAL文件描述符没有打开

poll 检测一组 fd 上的可读可写和出错事件的概念与前文介绍 select 的事件含义一样,这里就不再赘述。poll 与 select 相比具有如下优点:

  • poll 不要求开发者计算最大文件描述符加 1 的大小;
  • 相比于 selectpoll 在处理大数目的文件描述符的时候速度更快;
  • poll 没有最大连接数的限制,原因是它是基于链表来存储的;
  • 在调用 poll 函数时,只需要对参数进行一次设置就好了。

我们来看一个具体的例子:

/**
 * 演示 poll 函数的用法,poll_server.cpp
 * zhangxf 2019.03.16
 */
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <iostream>
#include <string.h>
#include <vector>
#include <errno.h>

//无效fd标记
#define INVALID_FD  -1

int main(int argc, char *argv[])
{
    //创建一个侦听socket
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd == INVALID_FD)
    {
        std::cout << "create listen socket error." << std::endl;
        return -1;
    }

    //将侦听socket设置为非阻塞的
    int oldSocketFlag = fcntl(listenfd, F_GETFL, 0);
    int newSocketFlag = oldSocketFlag | O_NONBLOCK;
    if (fcntl(listenfd, F_SETFL, newSocketFlag) == -1)
    {
        close(listenfd);
        std::cout << "set listenfd to nonblock error." << std::endl;
        return -1;
    }

    //复用地址和端口号
    int on = 1;
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on));
    setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, (char *) &on, sizeof(on));

    //初始化服务器地址
    struct sockaddr_in bindaddr;
    bindaddr.sin_family = AF_INET;
    bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    bindaddr.sin_port = htons(3000);
    if (bind(listenfd, (struct sockaddr *) &bindaddr, sizeof(bindaddr)) == -1)
    {
        std::cout << "bind listen socket error." << std::endl;
        close(listenfd);
        return -1;
    }

    //启动侦听
    if (listen(listenfd, SOMAXCONN) == -1)
    {
        std::cout << "listen error." << std::endl;
        close(listenfd);
        return -1;
    }

    std::vector<pollfd> fds;
    pollfd listen_fd_info;
    listen_fd_info.fd = listenfd;
    listen_fd_info.events = POLLIN;
    listen_fd_info.revents = 0;
    fds.push_back(listen_fd_info);

    //是否存在无效的fd标志
    bool exist_invalid_fd;
    int n;
    while (true)
    {
        exist_invalid_fd = false;
        n = poll(&fds[0], fds.size(), 1000);
        if (n < 0)
        {
            //被信号中断
            if (errno == EINTR)
                continue;

            //出错,退出
            break;
        }
        else if (n == 0)
        {
            //超时,继续
            continue;
        }

        for (size_t i = 0; i < fds.size(); ++i)
        {
            // 事件可读
            if (fds[i].revents & POLLIN)
            {
                if (fds[i].fd == listenfd)
                {
                    //侦听socket,接受新连接
                    struct sockaddr_in clientaddr;
                    socklen_t clientaddrlen = sizeof(clientaddr);
                    //接受客户端连接, 并加入到fds集合中
                    int clientfd = accept(listenfd, (struct sockaddr *) &clientaddr, &clientaddrlen);
                    if (clientfd != -1)
                    {
                        //将客户端socket设置为非阻塞的
                        int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
                        int newSocketFlag = oldSocketFlag | O_NONBLOCK;
                        if (fcntl(clientfd, F_SETFL, newSocketFlag) == -1)
                        {
                            close(clientfd);
                            std::cout << "set clientfd to nonblock error." << std::endl;
                        }
                        else
                        {
                            struct pollfd client_fd_info;
                            client_fd_info.fd = clientfd;
                            client_fd_info.events = POLLIN;
                            client_fd_info.revents = 0;
                            fds.push_back(client_fd_info);
                            std::cout << "new client accepted, clientfd: " << clientfd << std::endl;
                        }
                    }
                }
                else
                {
                    //普通clientfd,收取数据
                    char buf[64] = {0};
                    int m = recv(fds[i].fd, buf, 64, 0);
                    if (m <= 0)
                    {
                        if (errno != EINTR && errno != EWOULDBLOCK)
                        {
                            //出错或对端关闭了连接,关闭对应的clientfd,并设置无效标志位
                            for (std::vector<pollfd>::iterator iter = fds.begin(); iter != fds.end(); ++iter)
                            {
                                if (iter->fd == fds[i].fd)
                                {
                                    std::cout << "client disconnected, clientfd: " << fds[i].fd << std::endl;
                                    close(fds[i].fd);
                                    iter->fd = INVALID_FD;
                                    exist_invalid_fd = true;
                                    break;
                                }
                            }
                        }
                    }
                    else
                    {
                        std::cout << "recv from client: " << buf << ", clientfd: " << fds[i].fd << std::endl;
                    }
                }
            } else if (fds[i].revents & POLLERR)
            {
                //TODO: 暂且不处理
            }

        }// end  outer-for-loop

        if (exist_invalid_fd)
        {
            //统一清理无效的fd
            for (std::vector<pollfd>::iterator iter = fds.begin(); iter != fds.end();)
            {
                if (iter->fd == INVALID_FD)
                    iter = fds.erase(iter);
                else
                    ++iter;
            }
        }
    }// end  while-loop


    //关闭所有socket
    for (std::vector<pollfd>::iterator iter = fds.begin(); iter != fds.end(); ++iter)
        close(iter->fd);

    return 0;
}

编译上述程序生成 poll_server 并运行,然后使用 nc 命令模拟三个客户端并给 poll_server 发送消息,效果如下:

图片[1]-Linux poll详解-不念博客

由于 nc 命令是以 \n 作为结束标志的,所以 poll_server 收到客户端消息时显示时分两行的。

通过上面的示例代码,我们也能看出 poll 函数存在的一些缺点:

  • 在调用 poll 函数时,不管有没有有意义,大量的 fd 的数组被整体在用户态和内核地址空间之间复制;
  • 与 select 函数一样,poll 函数返回后,需要遍历 fd 集合来获取就绪的 fd,这样会使性能下降;
  • 同时连接的大量客户端在一时刻可能只有很少的就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

与 select 函数实现非阻塞的 connect 原理一样,我们可以使用 poll 去实现,即通过 poll 检测 clientfd 在一定时间内是否可写,示例代码如下:

/**
 * Linux 下使用poll实现异步的connect,linux_nonblocking_connect_poll.cpp
 * zhangyl 2019.03.16
 */
#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <poll.h>
#include <iostream>
#include <string.h>
#include <stdio.h>
#include <fcntl.h>
#include <errno.h>

#define SERVER_ADDRESS "127.0.0.1"
#define SERVER_PORT     3000
#define SEND_DATA       "helloworld"

int main(int argc, char* argv[])
{
    //1.创建一个socket
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (clientfd == -1)
    {
        std::cout << "create client socket error." << std::endl;
        return -1;
    }
 
 //将 clientfd 设置成非阻塞模式 
 int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
 int newSocketFlag = oldSocketFlag | O_NONBLOCK;
 if (fcntl(clientfd, F_SETFL,  newSocketFlag) == -1)
 {
  close(clientfd);
  std::cout << "set socket to nonblock error." << std::endl;
  return -1;
 }

    //2.连接服务器
    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_addr.s_addr = inet_addr(SERVER_ADDRESS);
    serveraddr.sin_port = htons(SERVER_PORT);
 for (;;)
 {
  int ret = connect(clientfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr));
  if (ret == 0)
  {
   std::cout << "connect to server successfully." << std::endl;
   close(clientfd);
   return 0;
  } 
  else if (ret == -1) 
  {
   if (errno == EINTR)
   {
    //connect 动作被信号中断,重试connect
    std::cout << "connecting interruptted by signal, try again." << std::endl;
    continue;
   } else if (errno == EINPROGRESS)
   {
    //连接正在尝试中
    break;
   } else {
    //真的出错了,
    close(clientfd);
    return -1;
   }
  }
 }
 
 pollfd event;
 event.fd = clientfd;
 event.events = POLLOUT;
 int timeout = 3000;
 if (poll(&event, 1, timeout) != 1)
 {
  close(clientfd);
  std::cout << "[poll] connect to server error." << std::endl;
  return -1;
 }
 
 if (!(event.revents & POLLOUT))
 {
  close(clientfd);
  std::cout << "[POLLOUT] connect to server error." << std::endl;
  return -1;
 }
 
 int err;
    socklen_t len = static_cast<socklen_t>(sizeof err);
    if (::getsockopt(clientfd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
        return -1;
        
    if (err == 0)
        std::cout << "connect to server successfully." << std::endl;
    else
     std::cout << "connect to server error." << std::endl;
    
 //5. 关闭socket
 close(clientfd);

    return 0;
}

运行效果与前面的 select 实现非阻塞的 connect 一样,这里就不再给出运行效果图了。

© 版权声明
THE END