IO Multiplexing

如果对c语言中socket编程不熟悉的同学可以参考这里:

前言

  1. 多线程io模型的问题 在讲述io多路复用之前,就需要提到经典的多线程编程模型.一般来说,服务器编程中,我们会对每接收到一个请求就创建一个子线程来执行该请求.大致代码如下(并不是真实的代码):
   while(1) {
       int tid;
       int client_fd = accept(sockfd,(struct sockaddr_in*)&addr,sizeof(struct sockaddr_in));
       //开辟一个线程来,调用process来处理请求,参数为client_fd;
       pthread_create(&tid,NULL,process,client_fd);
   }

多线程模型的实现十分简单,但是当请求数量十分多的时候.线程多了上下文切换的开销,或者一些内核空间和用户空间之间的缓冲区复制会对性能有较大的影响.这也就是所谓c10k问题.

  1. 如何去响应两个都会导致程序阻塞的api? 假如在一个服务端,我们要实现既能响应来自键盘的输入,也能响应客户端的请求.然而,fgets()函数会等待键盘输入而一直阻塞,如果先执行tcp链接的建立,那么recv()也会使得程序陷入阻塞,不能响应来自键盘的输入.对于这个问题的解决方案,代码在下面会讲到.

为了解决这些问题,io多路复用的出现了.最早的io多路复用api是select(),当然其他poll(),epoll(),pselect(),kqueue()等等都是差不多的.他们的作用都是:使得单个线程阻塞地来监听多个描述符,如果这些描述符之中有我们感兴趣的事件,就结束阻塞,进而去处理这些事件.

引用man select()中的一段话:

select() allows a program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input possible). A file descriptor is considered ready if it is possible to perform a corresponding I/O operation (e.g., read(2), or a sufficiently small write(2)) without blocking.

Select()

接下来先介绍select()的api基本结构,具体可以参考man-select

int select(int nfds, fd_set *restrict readfds,
           fd_set *restrict writefds, fd_set *restrict exceptfds,
           struct timeval *restrict timeout);

select()函数的主要参数是:描述符的集合,比如说我们想从哪些描述符从读取数据等

  1. readfds 要监听的可读的描述符.当readfs内有可读的数据的时候,select()会返回,并且会将readfds中所有的描述符清空,只保留那些有可读数据的描述符.,
  2. writefds 要监听的可写的描述符,当wirtefds内有可写的描述符的时候,select()会返回,并且会将writefds中所有的描述清空,只保留可以写入数据的描述符.
  3. exceptfds 要监听的错误的描述符,当exceptfds内有引发错误的描述时候,select()会返回,并且会将所有的描述符清空,只保留那些发生了异常的描述符.
  4. nfds 它的值为,我们所需要的描述符的最大值 +1,将来在判断那些描述符可读的时候,就根据nfds来遍历这些描述符.比如说nfds =900,就会去遍历0-900之间的所有描述符.判断哪些可读(不过这不需要我们实现,内核会做的)
  5. timeout select()将会阻塞的时间,它是一个timeval的结构,可以指定是毫秒还是秒.具体就看man select吧.一般来说都是让select()无限阻塞.它对应的结构如下:
   struct timeval {
       time_t      tv_sec;         /* seconds */
       suseconds_t tv_usec;        /* microseconds */
   };

描述符集合是如何实现的:

fd_set实际上是一个位图,如果某bit为1就表示我们要监听该描述符,为0表示不监听.fd_set的长度是1024,也就是select()最多可以监听1024个描述符.所以它不适合现代的程序来用.

man select也提到了这个问题:

WARNING: select() can monitor only file descriptors numbers that are less than FD_SETSIZE (1024)—an unreasonably low limit for many modern applications—and this limitation will not change. All modern applications should instead use poll(2) or epoll(7), which do not suffer this limitation.

select()的使用

我们用两个描述符来监听两个tcp链接,当他们任意一个有可读的数据时候.就去读取并且原样返回,实现一个简单的echoserver.

在正式介绍select()的操作之前还要介绍,几个宏定义

  1. FD_ZERO 将描述符集合整个清空,一般在初始化的时候可以使用.
  2. FD_SET 前面说过了fd_set就是一个位图,所以FD_SET自然是将该描述符对应的bit置为1
  3. FD_CLR 在fd_set中将某位描述符对应的bit设置为0
  4. FD_ISSET 判断该描述符是否被设置,前面说了select()返回后会清楚其他描述符,只保留可以操作的描述符.

select()监听stdin和recv()

前面提到的问题,程序会因为fgets()recv()而被阻塞.为了解决这个问题,我们将以下两件事件都让select()来监听:

  1. 客户端有数据发送过来可读
  2. stdin有可读的数据

代码如下:

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <strings.h>
#include <zconf.h>

#define FD_NUM 2
#define BUF_SIZE 1024

int main() {
    char buf[BUF_SIZE];
    struct sockaddr_in server_addr,client_addr;
    fd_set rset,ready_set;

    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    bzero(&server_addr,sizeof(struct sockaddr_in));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8080);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    bind(sockfd,(struct sockaddr_in*)&server_addr,sizeof(struct sockaddr_in));

    listen(sockfd,5);
    int client_len = sizeof(struct sockaddr_in);
    int client_fd = accept(sockfd,(struct sockaddr_in*)&client_addr,&client_len);
    FD_ZERO(&rset); //将fd_set中的数据都清空
    //接下来将我们感兴趣的事件加入到fd_set中,分别是从stdin读取和客户端发送了可读的数据
    FD_SET(STDOUT_FILENO,&rset); 
    FD_SET(client_fd,&rset);
    while(1) {
        ready_set = rset;
        //select会阻塞,直到所关注的事件发生才会结束阻塞
        select(client_fd + 1,&ready_set,NULL,NULL,NULL);
        //如果stdin有数据可以读取
        if (FD_ISSET(STDOUT_FILENO,&ready_set)) {
            char buf[15];
            fgets(buf,15,stdin);
            printf("input:%s",buf);
        }
        //如果客户端有数据可以读取
        if (FD_ISSET(client_fd,&ready_set)) {
            int count = recv(client_fd,buf,BUF_SIZE,0);
            send(client_fd,buf,count,0);
        }
    }
    return 0;
}

对于上述代码值得解释的就是:select()在返回后会清除其他不相关的描述符,只保留当前可以操作的描述符.所以在下一次循环开始前,我们要对ready_set重新赋值,再传给select使用.

网络编程中的select()

可以让服务端程序也采用select()来监听多个客户端的描述符.等待这些客户端之间的进程发送数据过来.

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <strings.h>

#define FD_NUM 2
#define BUF_SIZE 1024

int main() {
    char buf[BUF_SIZE];
    struct sockaddr_in server_addr,client_addr;
    int fds[FD_NUM];
    fd_set set;
    int max_fd = 0;

    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    bzero(&server_addr,sizeof(struct sockaddr_in));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8080);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    bind(sockfd,(struct sockaddr_in*)&server_addr,sizeof(struct sockaddr_in));

    listen(sockfd,5);
    for(int i = 0; i < FD_NUM; i++) {
        bzero(&client_addr,sizeof(client_addr));
        int addrlen = sizeof(client_addr);
        fds[i]= accept(sockfd,(struct sockaddr_in *)&client_addr,&addrlen);
        if (fds[i] > max_fd)
            max_fd = fds[i];
    }
    while (1) {
        FD_ZERO(&set);
        for(int i = 0; i < FD_NUM; i++)
            FD_SET(fds[i],&set);
        select(max_fd+1,&set,NULL,NULL,NULL);
        for(int i = 0; i < FD_NUM; i++) {
            if (FD_ISSET(fds[i],&set)) {
                int count = recv(fds[i],buf,BUF_SIZE,0);
                send(fds[i],buf,count,0);
            }
        }
    }
    return 0;
}

select()的缺点

  1. 监听的描述符有限 select监听的描述符在数字上只能小于1024.也就是只能监听描述符从0-1024号.这在现代的程序看来是十分小的.它的底层使用位图来实现的,0号描述符对应第0bit,1号描述符对应第1个bit.
  2. 每次都会修改原来的fd_set select()在捕捉到事件发生后,会修改传入所参数中的fd_set.所以如果我们要继续监听这些描述符,要对fd_set重新赋值.
  3. 在确认哪些描述符是可操作的时候,我们要遍历所有的描述符,然后用FD_ISSET()来判断,具有较高的事件复杂度.
  4. 如果还有可写的描述符集合,那么当select()返回的时候.并不知道是因为可读还是可写使得select()返回,相当麻烦.

select()的优点是具有更好的可以移植性.基本上的unix-like系统都有select().

poll()

poll的基本作用和select()相似,但是它突破了select只能监听1024个描述符的限制.它使用一组描述符数组代替select()中的描述符集合,而申请的数组是任意长度的.

下面先来介绍它的api的基本结构:

int poll(struct pollfd *fds, nfds_t nfds, int timeout);
  1. struct pollfd *fds 它是一个由pollfd组成的数组.接下来继续介绍struct pollfd的结构,它的结构如下:
   struct pollfd {
       int   fd;         /* file descriptor */
       short events;     /* requested events */
       short revents;    /* returned events */
   };

fd是我们要关注的描述符.如stdin,stdout等等.events使我们期望在改描述符上会接收到的事件,在实际操作中它是一个位图,使用或运算(|)来将期待的事件汇集在一起.revents是当事件发生后,由内核来填充的值,它表示实际发生的事件.

下面是poll支持的所有事件:

   #define POLLIN        0x001       /* There is data to read.  */
   #define POLLPRI        0x002       /* There is urgent data to read.  */
   #define POLLOUT        0x004       /* Writing now will not block.  */

   #if defined __USE_XOPEN || defined __USE_XOPEN2K8
   /* These values are defined in XPG4.2.  */
   # define POLLRDNORM    0x040       /* Normal data may be read.  */
   # define POLLRDBAND    0x080       /* Priority data may be read.  */
   # define POLLWRNORM    0x100       /* Writing now will not block.  */
   # define POLLWRBAND    0x200       /* Priority data may be written.  */

假如说,我们要让fd等待读和写的这两事件,那么就是用语句events = POLLIN | POLLWRNORM来实现.当事件发生后,revents == POLLIN来判断所发生的事件是何种类型

  1. nfds 描述符数组的长度.我想在内核代码实现中,要使用该变量来逐个遍历找到正确的描述符,然后设置revents.该变量就可以作为循环的判断条件
  2. timeout poll会阻塞的事件,单位是毫秒,和select有些许不同.

poll()的应用

同样的,我们用poll来监听stdin和一个网络链接.代码如下:

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <strings.h>
#include <zconf.h>
#include <poll.h>

#define FD_NUM 2
#define BUF_SIZE 1024

int main() {
    char buf[BUF_SIZE];
    struct sockaddr_in server_addr,client_addr;
    struct pollfd fds[FD_NUM]; //申请一个描述符数组

    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    bzero(&server_addr,sizeof(struct sockaddr_in));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8080);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    bind(sockfd,(struct sockaddr_in*)&server_addr,sizeof(struct sockaddr_in));

    listen(sockfd,5);
    int client_len = sizeof(struct sockaddr_in);
    int client_fd = accept(sockfd,(struct sockaddr_in*)&client_addr,&client_len);
    //初始化描述符数组的内容,分别监听stdin和客户端的链接
    fds[0].fd = STDIN_FILENO;
    fds[0].events = POLLIN; //我们只监听有数据到达的事件
    fds[1].fd = client_fd;
    fds[1].events = POLLIN;
    while(1) {
        poll(fds,FD_NUM,NULL); //阻塞等待事件的发生
        //如果事件发生了,判断各个pollfd中的revents的类型再去采取对应的操作
        if (fds[0].revents == POLLIN) {
            fds[0].revents == 0;  //将描述符中的revents清空,不然就会引起误解
            char buf[15];
            fgets(buf,15,stdin);
            printf("input:%s",buf);
        }
        if (fds[1].revents == POLLIN) {
            fds[1].revents == 0; 
            int count = recv(client_fd,buf,BUF_SIZE,0);
            send(client_fd,buf,count,0);
        }
    }
    return 0;
}

poll()和select()的对比

  1. 在功能上,select()和poll()相类似.但是poll可以接受任意个数的描述符,将他们都放到数组里.
  2. 在事件发生后,poll()不会像selct()那样修改原来的数据结构,而是将所发生的事件填充到描述符pollfd中的revents中,通过这个成员变量来判断所发生的事件.

虽然poll突破了描述符个数的缺陷,但是当poll()结束阻塞的时候,我们还是需要去遍历整个文件描述符数组.来判断哪些事件发生了.

为了解决这个问题呢,epoll()就诞生了,它比poll和select()更加高效.

epoll()

epoll()也是io多路复用的api之一,它比poll和select都有着更高的效率.不同于之前的poll()和select(),epoll()的主要由三个函数来操作,如下:

epoll_create(int size);
epoll_ctl(int epfd,int op,int fd,int fd,struct epoll_event *evnet);
epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);
  1. epoll_create(int size) 是创建一个epoll对象,该函数返回的是一个指向epoll对象的描述符.size是一个无效的参数,没有什么用.根据man epoll中提到,只要让size 设置为大于0的数即可.
  2. epoll_ctl(int epfd,int op,int fd,int fd,struct epoll_event *evnet) 是将我们所感兴趣的事件注册到epoll的interest list中(好像是一个双向链表,但是没有深入到源码中查看). 所以第一个参数就是epoll对象的描述符. int op是指定当前的操作是什么,是将我们的事件加入到列表中还是移除还是修改,它的取值如下:
   #define EPOLL_CTL_ADD 1    /* Add a file descriptor to the interface.  */
   #define EPOLL_CTL_DEL 2    /* Remove a file descriptor from the interface.  */
   #define EPOLL_CTL_MOD 3    /* Change file descriptor epoll_event structure.  */

int fd是指定我们当前需要监听的描述符.和struct pollfd中的fd相类似

struct epoll_event *evnet中设置了我们要在该描述上监听何种操作,是否有数据可读或者可写等.它也是一个位图,与poll()中相类似.

  1. epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout) 是阻塞地等待我们所感性的事件发生.和select(),poll()相类似. int epfd:epoll对象的描述符 struct epoll_event *events:所发生的事件的数组 int maxevents:可以返回的最多的事件数 int timeout:阻塞的事件,单位秒 返回值:epoll_wait()的返回值是所发生的事件个数

上述api的表述如果不理解没关系,看接下来的代码帮助理解.下面的代码所实现的功能还是和之前一样,不再过多解释,代码如下:

#include <stdio.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <strings.h>
#include <zconf.h>
#include <sys/epoll.h>

#define FD_NUM 2
#define BUF_SIZE 1024

int main() {
    char buf[BUF_SIZE];
    struct sockaddr_in server_addr,client_addr;

    int sockfd = socket(AF_INET,SOCK_STREAM,0);
    bzero(&server_addr,sizeof(struct sockaddr_in));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(8080);
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);

    bind(sockfd,(struct sockaddr_in*)&server_addr,sizeof(struct sockaddr_in));
    listen(sockfd,5);
    //创建一个epoll对象,参数10是没有什么意义的
    int epfd = epoll_create(10);
    //创建一个epoll_event数组,将来epoll_wait返回的时候,将
    //可以操作的事件放到这个数字中
    struct epoll_event events[FD_NUM];
    //用于epoll_ctl
    struct epoll_event ev;

    //接下来就是调用epoll_ctl往epoll中注册我们要关注的事件
    bzero(&ev,sizeof(struct epoll_event));
    ev.data.fd = STDIN_FILENO; //标准输入
    ev.events = EPOLLIN; //标注输入存在可读数据
    //EPOLL_CTL_ADD表示往epoll中追加事件
    epoll_ctl(epfd,EPOLL_CTL_ADD,ev.data.fd,&ev);

    bzero(&ev,sizeof(struct epoll_event));
    int client_len = sizeof(struct sockaddr_in);
    int client_fd = accept(sockfd,(struct sockaddr_in*)&client_addr,&client_len);
    ev.data.fd = client_fd;
    ev.events = EPOLLIN;
    epoll_ctl(epfd,EPOLL_CTL_ADD,ev.data.fd,&ev);

    while(1) {
        //epoll_wait()返回的是所有可以操作的事件的个数
        //并且将这些事件放到events数组中,接下来我们只要去遍历就行
        int nfds = epoll_wait(epfd,events,2,-1);
        for(int i = 0; i < nfds; i++) {
            if (events[i].data.fd == STDIN_FILENO) {
                char echo_buf[15];
                fgets(echo_buf,15,stdin);
                printf("input:%s",echo_buf);
            }
            if (events[i].data.fd == client_fd) {
                int count = recv(client_fd,buf,BUF_SIZE,0);
                send(client_fd,buf,count,0);
            }
        }
    }
    return 0;
}

epoll()的优点(不完整)

我没有深入到poll和epoll的源码中分析他们各自的数据结构如何.从比较浅的层面来说,在poll当中,一旦有任意一个描述符可操作就会返回,然后,我们就要遍历所有的描述符数组来判断哪些描述是可以操作的.效率十分低下,在select()的情况中更加糟糕,因为可能需要遍历多个描述符集合.

而epoll来说,它的返回值是可操作的描述符的个数,并且都放在events数组中,就不用去遍历那些无效的描述符.只遍历有用的描述符.

在他们的各自的数据结构而言,参考了别人的博客,提到poll会涉及到数据从用户空间到内核空间的复制,而poll每执行一次就会重新复制.而epoll中使用红黑树来管理这些描述符,对于插入和删除都是十分高效的.

此外,在前面的poll和select()中,一旦selct()或者poll()调用后,我们很难再往里面添加我们感兴趣的事件.而epoll却十分方便,下面引用一段man epoll中的代码来辅以理解.代码如下:

for (;;) {
    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    if (nfds == -1) {
        perror("epoll_wait");
        exit(EXIT_FAILURE);
    }

    for (n = 0; n < nfds; ++n) {
        if (events[n].data.fd == listen_sock) {
            conn_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
            if (conn_sock == -1) {
                perror("accept");
                exit(EXIT_FAILURE);
            }
            setnonblocking(conn_sock);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = conn_sock;
            if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,&ev) == -1) {
                perror("epoll_ctl: conn_sock");
                exit(EXIT_FAILURE);
            }
        } else {
            do_use_fd(events[n].data.fd);
        }
    }
}
暂无评论

发送评论 编辑评论

|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇