epoll的EPOLLEXCLUSIVE模式如何与电平触发相互作用? [英] How does epoll's EPOLLEXCLUSIVE mode interact with level-triggering?

查看:51
本文介绍了epoll的EPOLLEXCLUSIVE模式如何与电平触发相互作用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设发生以下一系列事件:

  • 我们设置了监听套接字
  • 线程A使用 EPOLLIN |块等待侦听套接字变得可读.EPOLLEXCLUSIVE
  • 线程B还使用 EPOLLIN |阻止等待侦听套接字变得可读.EPOLLEXCLUSIVE
  • 传入连接到达侦听套接字,使该套接字可读,内核选择唤醒线程A.
  • 但是,在线程真正唤醒并调用 accept 之前,第二个传入连接到达侦听套接字.

在这里,套接字已经可以读取,因此第二个连接不会改变它.这是级别触发的epoll,因此根据正常规则,第二个连接可以视为无操作,并且不需要唤醒第二个线程....当然,不是唤醒第二个线程会破坏 EPOLLEXCLUSIVE 的全部目的吗?但是我对API设计师做正确的事情的信任并不像从前那样强大,而且我在文档中找不到任何东西可以排除这种情况.

问题

a)是否有可能出现上述情况,即两个连接到达但只有线程被唤醒?还是可以保证侦听套接字上每个不同的传入连接都会唤醒另一个线程?

b)是否有一个通用规则来预测 EPOLLEXCLUSIVE 和级别触发的epoll如何相互作用?

b)关于 EPOLLIN |EPOLLEXCLUSIVE EPOLLOUT |用于字节流fds的EPOLLEXCLUSIVE ,例如连接的TCP套接字还是管道?例如.如果在管道已经可读的情况下到达更多数据会发生什么?

解决方案

已编辑(原始答案在用于测试的代码之后)

为了确保一切都清楚,我将介绍与边缘触发事件( EPOLLET )以及级别触发事件相关的 EPOLLEXCLUSIVE 这些如何影响预期行为.

如您所知:

  • 边缘触发:设置 EPOLLET 后,只有事件更改了 fd 的状态时,事件才会被触发-只有第一个事件被触发,在完全处理该事件之前,不会触发任何新事件.

    此设计明确旨在防止由于正在处理的事件(例如,当 EPOLLIN 被删除时新数据到达时)而导致 epoll_wait 返回已经引发,但尚未调用 read 或未读取所有数据).

    边缘触发事件规则很简单将所有相同类型(即 EPOLLIN )事件合并,直到处理了所有可用数据.

    对于侦听套接字,在使用 accept接受所有现有的 listen 积压"套接字之前,不会再次触发 EPOLLIN 事件.

    对于字节流,直到从流中读取了所有可用字节(清空缓冲区)后,才会触发新事件.

  • 级别触发:另一方面,级别触发事件的行为更接近于旧版 select (或 poll )的运行方式,允许 epoll 与较早的代码一起使用.

    事件合并规则更复杂:相同类型的事件只有在没有人等待事件的情况下才会合并(没有人在等待 epoll_wait返回),,或者如果在 epoll_wait 返回之前发生了多个事件 ...否则,任何事件都会导致 epoll_wait 返回.

    在侦听套接字的情况下,每次客户端连接时都会触发 EPOLLIN 事件...除非没有人在等待 epoll_wait 返回,否则在这种情况下,对 epoll_wait 的下一次调用将立即返回,并且在此期间发生的所有 EPOLLIN 事件都将被合并为一个事件.

    对于字节流,每次有新数据进入时都会触发新事件……当然,除非没有人在等待 epoll_wait 返回,在这种情况下下次使用util epoll_wait 返回的所有到达的数据将立即返回下一次调用(即使它以不同的块/事件到达).

  • 排他性回报: EPOLLEXCLUSIVE 标志用于防止听到雷声"行为,因此只有一个 epoll_wait 调用方每次 fd 唤醒事件都被唤醒.

    正如我之前指出的,对于边缘触发状态, fd 唤醒事件是在 fd 状态下的更改.所以所有 EPOLLIN 事件都将被引发,直到所有数据被读取(监听套接字的积压被清空).

    另一方面,对于电平触发事件,每个EPOLLIN 都会调用一个唤醒事件.如果没有人在等待,这些事件将被合并.

按照您的问题中的示例进行操作:

  • 对于级别触发事件:每次客户端连接时,一个线程将从 epoll_wait 返回...但是,如果要连接另外两个客户端当两个线程都忙于接受前两个客户端时,这些 EPOLLIN 事件将合并为一个事件,并且对 epoll_wait 的下一次调用将随该合并事件一起立即返回.

    在问题给出的示例上下文中,由于 epoll_wait 返回,线程B有望唤醒".

    在这种情况下,两个线程都将竞争"走向 accept .

    但是,这不会破坏 EPOLLEXCLUSIVE 指令或意图.

    EPOLLEXCLUSIVE 指令旨在防止听到雷鸣"现象.在这种情况下,两个线程正在争相接受两个连接.每个线程可以(大概)安全地调用 accept ,而不会出错.如果使用了三个线程,则第三个线程将保持睡眠状态.

    如果未使用 EPOLLEXCLUSIVE ,则只要有可用的连接,所有的 epoll_wait 线程都将被唤醒,这意味着第一个连接到达后,两个线程都会争先恐后地接受单个连接(导致其中一个线程可能出现错误).

  • 用于边缘触发事件:预计只有一个线程会收到唤醒"调用.该线程有望接受所有等待的连接(清空监听积压").在清空待办事项之前,不会为该套接字引发任何 EPOLLIN 事件.

这同样适用于可读的套接字和管道.唤醒的线程应处理所有可读数据.这样可以防止等待线程尝试同时读取数据并遇到文件锁定争用情况.

我建议(这就是我要做的)将侦听套接字设置为非阻塞模式,并循环调用 accept ,直到 EAGAIN (或 EWOULDBLOCK )错误,表明积压为空.无法避免合并事件的风险.从套接字读取同样如此.

使用代码进行测试:

我编写了一个简单的测试,其中包含一些 sleep 命令和阻塞套接字.客户端套接字只有在两个线程都开始等待 epoll 后才启动.

客户端线程的启动被延迟,因此客户端1和客户端2间隔了第二秒.

一旦服务器线程被唤醒,它将在调用 accept 之前休眠一秒钟(允许第二个客户端执行此操作).也许服务器应该多睡一会儿,但似乎足够接近来管理调度程序,而不必求助于条件变量.

这是我的测试代码的结果(可能是一团糟,我不是测试设计的最佳人选)...

在支持 EPOLLEXCLUSIVE 的Ubuntu 16.10上,测试结果表明,响应于客户端,监听线程被一个接一个地唤醒.在问题的示例中,线程B被唤醒.

 测试地址:< null>:8000服务器线程2唤醒并发生1个事件服务器线程2将休眠一秒钟,以使事情发生.客户端编号1已连接服务器线程1唤醒并发生1个事件服务器线程1将休眠一秒钟,以使事情发生.客户端编号 2 已连接服务器线程2接受了连接并打了招呼.客户端1:Hello World-来自服务器线程2.服务器线程1接受了连接并打了招呼.客户端 2:Hello World - 来自服务器线程 1. 

要与Ubuntu 16.04(不支持 EPOLLEXCLUSIVE )进行比较,则两个线程都将在第一次连接时被唤醒.由于我使用阻塞套接字,因此第二个线程挂在 accept 上,直到客户端#2连接为止.

  main.c:178:2:警告:#warning EPOLLEXCLUSIVE未声明,测试无效[-Wcpp]#warning EPOLLEXCLUSIVE未声明,测试没有用^测试地址:< null>:8000服务器线程1唤醒并发生1个事件服务器线程1将休眠一秒钟,以使事情发生.服务器线程2唤醒并发生1个事件服务器线程2将休眠一秒钟,以使事情发生.客户端编号1已连接服务器线程1接受了连接并打了招呼.客户端1:Hello World-来自服务器线程1.客户端编号2已连接服务器线程2接受了连接并打了招呼.客户端2:Hello World-来自服务器线程2. 

为进一步比较,级别触发的 kqueue 的结果显示,对于第一个连接,两个线程均被唤醒.由于我使用阻塞套接字,第二个线程挂在 accept 上,直到客户端 #2 连接.

 测试地址:< null>:8000客户端编号1已连接服务器线程2唤醒并发生1个事件服务器线程1唤醒并发生1个事件服务器线程2将休眠一秒钟,以使事情发生.服务器线程1将休眠一秒钟,以使事情发生.服务器线程2接受了连接并打了招呼.客户端1:Hello World-来自服务器线程2.客户端编号2已连接服务器线程1接受了连接并打了招呼.客户端2:Hello World-来自服务器线程1. 

我的测试代码是(抱歉缺少注释和凌乱的代码,我不是为了将来的维护而写):

  #ifndef _GNU_SOURCE#定义_GNU_SOURCE#万一#define ADD_EPOLL_OPTION 0//定义为EPOLLET或0#include< arpa/inet.h>#include< errno.h>#include #include< limits.h>#include< netdb.h>#include< pthread.h>#include< stdint.h>#include< stdio.h>#include< stdlib.h>#include< string.h>#include #include< sys/resource.h>#include< sys/socket.h>#include< sys/time.h>#include< sys/types.h>#include< time.h>#include< unistd.h>#if!defined(__ linux__)&&!defined(__ CYGWIN__)#include< sys/event.h>#定义reactor_epoll 0#别的#define reactor_epoll 1#include< sys/epoll.h>#include< sys/timerfd.h>#万一int sock_listen(const char * address,const char * port);无效* listen_threard(无效* arg);void * client_thread(void * arg);int server_fd;char const * address = NULL;char const * port ="8000";int main(int argc,char const * argv []){如果(argc == 2){port = argv [1];} else if(argc == 3){port = argv [2];地址= argv [1];}fprintf(stderr,测试地址:%s:%s \ n",地址?地址:< null>",端口);server_fd = sock_listen(地址,端口);/* 代码 */pthread_t线程[4];对于(size_t i = 0; i< 2; i ++){如果(pthread_create(threads + i,NULL,listen_threard,(void *)i))perror(无法启动服务器线程"),退出(-1);}对于(size_t i = 2; i< 4; i ++){睡眠(1);如果(pthread_create(threads + i,NULL,client_thread,(void *)i))perror(无法启动客户端线程"),退出(-1);}//仅加入服务器线程.对于(size_t i = 0; i< 2; i ++){pthread_join(threads [i],NULL);}关闭(server_fd);睡眠(1);返回0;}/**将套接字设置为非阻塞状态.*/inline int sock_set_non_block(int fd)//感谢Bjorn Reese{/*如果他们有O_NONBLOCK,请使用Posix方法进行操作*/#如果已定义(O_NONBLOCK)/*修订程序:O_NONBLOCK已定义,但在SunOS 4.1.x和AIX 3.2.5上已损坏.*/整数标志;如果(-1 ==(标志= fcntl(fd,F_GETFL,0)))标志= 0;//printf(标志初始值为%d \ n",标志);返回fcntl(fd,F_SETFL,标志| O_NONBLOCK);#别的/*否则,请使用旧方法*/静态int标志= 1;返回ioctl(fd,FIOBIO,& flags);#万一}/*打开一个监听套接字*/int sock_listen(const char * address,const char * port){int srvfd;//设置地址struct addrinfo提示;struct addrinfo * servinfo;//将指向结果memset(& hints,0,sizeof提示);//确保结构为空hints.ai_family = AF_UNSPEC;//不在乎IPv4或IPv6hints.ai_socktype = SOCK_STREAM;//TCP流套接字hints.ai_flags = AI_PASSIVE;//为我填写我的IPif(getaddrinfo(address,port,& hints,& servinfo)){perror("addr err");返回-1;}//获取文件描述符srvfd =套接字(servinfo-> ai_family,servinfo-> ai_socktype,servinfo-> ai_protocol);如果(srvfd< = 0){perror("socket err");自由地址信息(服务信息);返回-1;}////保持服务器套接字阻塞以进行测试.////确保套接字未阻塞//如果(sock_set_non_block(srvfd)< 0){//perror(无法将套接字设置为非阻塞!");//freeaddrinfo(servinfo);//close(srvfd);//返回-1;//}//避免使用地址"{int optval = 1;setsockopt(srvfd,SOL_SOCKET,SO_REUSEADDR,& optval,sizeof(optval));}//将地址绑定到套接字{int bound = 0;for(struct addrinfo * p = servinfo; p!= NULL; p = p-> ai_next){如果(!bind(srvfd,p-> ai_addr,p-> ai_addrlen))界= 1;}如果(!bound){//perror("绑定错误");freeaddrinfo(servinfo);关闭(srvfd);返回-1;}}freeaddrinfo(servinfo);//听如果(listen(srvfd,SOMAXCONN)< 0){perror(无法开始收听");关闭(srvfd);返回-1;}返回srvfd;}/*将开始侦听,睡眠5秒钟,然后接受所有积压,然后* 结束 */无效* listen_threard(无效* arg){int epoll_fd;ssize_t event_count;#if Reactor_epoll#ifndef EPOLLEXCLUSIVE#warning EPOLLEXCLUSIVE未声明,测试没有用#define EPOLLEXCLUSIVE 0#万一//创建epoll等待fdepoll_fd = epoll_create1(0);如果(epoll_fd< 0)perror(无法创建epoll fd"),退出(1);//将服务器 fd 添加到 epoll 监视列表{struct epoll_event chevent = {0};chevent.data.ptr =(无效*)((uintptr_t)server_fd);chevent.events =EPOLLOUT |伊波林|EPOLLERR |EPOLLEXCLUSIVE |ADD_EPOLL_OPTION;epoll_ctl(epoll_fd,EPOLL_CTL_ADD,server_fd,&chevent);}//等待epollstruct epoll_event事件[10];event_count = epoll_wait(epoll_fd,events,10,5000);#别的//在BSD上测试,使用kqueueepoll_fd = kqueue();如果(epoll_fd< 0)perror(无法创建kqueue fd"),退出(1);//将服务器fd添加到kqueue监视列表{struct kevent chevent [2];EV_SET(chevent,server_fd,EVFILT_READ,EV_ADD | EV_ENABLE,0、0,(void *)(((uintptr_t)server_fd));EV_SET(chevent + 1,server_fd,EVFILT_WRITE,EV_ADD | EV_ENABLE,0,0,(void *)((uintptr_t)server_fd));kevent(epoll_fd,chevent,2,NULL,0,NULL);}//等待kqueue静态结构时间规格react_timeout = {.tv_sec = 5,.tv_nsec = 0};struct kevent事件[10];event_count = kevent(epoll_fd, NULL, 0, events, 10, &reactor_timeout);#万一关闭(epoll_fd);如果(event_count< = 0){fprintf(stderr,服务器线程%lu唤醒无事件/错误\ n",(size_t)arg + 1);perror("errno");返回NULL;}fprintf(stderr,服务器线程%lu被%lu事件唤醒\ n",(size_t)arg + 1,event_count);fprintf(stderr,服务器线程%lu将休眠一秒钟,以使事情发生.\ n",(size_t)arg + 1);睡眠(1);int connfd;struct sockaddr_storage client_addr;socklen_t client_addrlen = sizeof client_addr;/*接受所有连接.我们是非阻塞的,-1 ==没有更多的连接*/如果((connfd = accept(server_fd,(struct sockaddr *)& client_addr,& client_addrlen))> = 0){fprintf(stderr,服务器线程%lu接受了连接并打个招呼.\ n",(size_t)arg + 1);if(write(connfd,arg?"Hello World-来自服务器线程2.":"Hello World-来自服务器线程1.",35) <35)perror(服务器写入失败");关闭(connfd);} 别的 {fprintf(stderr,服务器线程%lu未能接受连接",(size_t)arg + 1);perror(:");}返回NULL;}无效* client_thread(无效* arg){int fd;//设置地址struct addrinfo提示;struct addrinfo * addrinfo;//将指向结果memset(& hints,0,sizeof提示);//确保结构为空hints.ai_family = AF_UNSPEC;//不关心 IPv4 或 IPv6hints.ai_socktype = SOCK_STREAM;//TCP流套接字hints.ai_flags = AI_PASSIVE;//为我填写我的IPif(getaddrinfo(address,port,& hints,& addrinfo)){perror(客户端无法启动地址");返回NULL;}//获取文件描述符fd =套接字(addrinfo-> ai_family,addrinfo-> ai_socktype,addrinfo-> ai_protocol);如果(fd&== 0){perror(客户端无法创建套接字");freeaddrinfo(addrinfo);返回NULL;}//////将套接字阻塞以进行测试.////确保套接字未阻塞//如果(sock_set_non_block(fd)< 0){//freeaddrinfo(addrinfo);//close(fd);//返回-1;//}如果(connect(fd,addrinfo-> ai_addr,addrinfo-> ai_addrlen)< 0&&errno!= EINPROGRESS){fprintf(stderr,客户编号%lu FAILED \ n",(size_t)arg-1);perror(客户端连接失败");关闭(FD);freeaddrinfo(addrinfo);返回NULL;}freeaddrinfo(addrinfo);fprintf(stderr,客户端编号%lu已连接\ n",(size_t)arg-1);字符缓冲区[128];if(read(fd,buffer,35)< 35){perror(客户端:读取错误");关闭(FD);} 别的 {缓冲区[35] = 0;fprintf(stderr,客户端%lu:%s \ n",(size_t)arg-1,缓冲区);关闭(FD);}返回NULL;} 

PS

作为最后的建议,我将考虑每个进程只有一个线程和一个epoll fd .这样,听到的雷声"是没有问题的,并且可以忽略 EPOLLEXCLUSIVE (这仍然是很新的并且没有得到广泛的支持)……唯一仍然暴露出的听到的雷声"是对于有限数量的共享套接字,竞争条件可能适合于负载平衡.


原始答案

我不确定我是否理解混乱,所以我将遍历 EPOLLET EPOLLEXCLUSIVE 以显示它们的组合预期行为.

如您所知:

  • 设置了 EPOLLET (边缘触发)后,事件会在 fd 状态更改而不是 fd 事件.

    此设计明确旨在防止由于正在处理的事件(例如,当 EPOLLIN 被删除时新数据到达时)而导致 epoll_wait 返回已经引发,但尚未调用 read 或未读取所有数据).

    对于侦听套接字,在使用 accept接受所有现有的 listen 积压"套接字之前,不会再次触发 EPOLLIN 事件.

  • EPOLLEXCLUSIVE 标志用于防止听到雷声"行为,因此对于每个 fd 唤醒事件.

    正如我之前指出的,对于边缘触发状态, fd 唤醒事件是在 fd 状态下的更改.因此,所有 EPOLLIN 事件都会引发,直到读取了所有数据(清空监听套接字的待办事项).

合并这些行为时,并按照问题中的示例进行操作,则预期只有一个线程会收到唤醒"调用.该线程有望接受所有等待的连接(清空 listen "backlog"),或者不再为该套接字引发 EPOLLIN 事件.

这同样适用于可读的套接字和管道.唤醒的线程应处理所有可读数据.这样可以防止等待线程尝试同时读取数据并遇到文件锁定争用情况.

如果您打算为每个 epoll_wait 唤醒事件仅调用一次 accept ,我建议您考虑避免边缘触发事件.无论使用 EPOLLEXCLUSIVE ,都存在不清空现有积压"的风险,这样就不会引发新的唤醒事件.

或者,我建议(这是我的工作)将侦听套接字设置为非阻塞模式,并循环调用 accept 直到出现 EAGAIN (或 EWOULDBLOCK )错误出现,表明积压为空.


级别触发的事件

正如纳撒尼尔(Nathaniel)在评论中指出的那样,我似乎完全误解了这个问题……我想我习惯 EPOLLET 是被误解的元素.

那么,正常的,级别触发的事件(不是 EPOLLET )会发生什么?

好吧……预期的行为是边缘触发事件的精确镜像(相反).

对于侦听套接字,只要有新的连接可用,无论是否在上一个事件之后调用了 accept ,都应返回 epoll_wait .

如果没有人正在等待 epoll_wait ...,则事件仅被合并".在这种情况下, epoll_wait 的下一次调用将立即返回.

在问题给出的示例上下文中,由于 epoll_wait 返回,线程B有望唤醒".

在这种情况下,两个线程都将竞争"走向 accept .

但是,这不会破坏 EPOLLEXCLUSIVE 指令或意图.

EPOLLEXCLUSIVE 指令旨在防止出现雷鸣般的"现象.在这种情况下,两个线程正在争相接受两个连接.每个线程可以(大概)安全地调用 accept ,而不会出错.如果使用了三个线程,则第三个线程将保持睡眠状态.

如果未使用 EPOLLEXCLUSIVE ,则只要有可用的连接,所有的 epoll_wait 线程都将被唤醒,这意味着第一个连接到达后,这两个线程都将竞相接受一个连接(导致其中一个可能出现错误).

Suppose the following series of events occurs:

  • We set up a listening socket
  • Thread A blocks waiting for the listening socket to become readable, using EPOLLIN | EPOLLEXCLUSIVE
  • Thread B also blocks waiting for the listening socket to become readable, also using EPOLLIN | EPOLLEXCLUSIVE
  • An incoming connection arrives at the listening socket, making the socket readable, and the kernel elects to wake up thread A.
  • But, before the thread actually wakes up and calls accept, a second incoming connection arrives at the listening socket.

Here, the socket is already readable, so the second connection doesn't change that. This is level-triggered epoll, so according to the normal rules, the second connection can be treated as a no-op, and the second thread doesn't need to be awoken. ...Of course, not waking up the second thread would kind of defeat the whole purpose of EPOLLEXCLUSIVE? But my trust in API designers doing the right thing is not as strong as it once was, and I can't find anything in the documentation to rule this out.

Questions

a) Is the above scenario possible, where two connections arrive but only thread is woken? Or is it guaranteed that every distinct incoming connection on a listening socket will wake another thread?

b) Is there a general rule to predict how EPOLLEXCLUSIVE and level-triggered epoll interact?

b) What about EPOLLIN | EPOLLEXCLUSIVE and EPOLLOUT | EPOLLEXCLUSIVE for byte-stream fds, like a connected TCP socket or a pipe? E.g. what happens if more data arrives while a pipe is already readable?

解决方案

Edited (original answer is after the code used for testing)

To make sure things are clear, I'll go over EPOLLEXCLUSIVE as it relates to edge triggered events (EPOLLET) as well as level-triggered events, to show how these effect expected behavior.

As you well know:

  • Edge Triggered: Once you set EPOLLET, events are triggered only if they change the state of the fd - meaning that only the first event is triggered and no new events will get triggered until that event is fully handled.

    This design is explicitly meant to prevent epoll_wait from returning due to an event that is in the process of being handled (i.e., when new data arrives while the EPOLLIN was already raised but read hadn't been called or not all of the data was read).

    The edge-triggered event rule is simple all same-type (i.e. EPOLLIN) events are merged until all available data was processed.

    In the case of a listening socket, the EPOLLIN event won't be triggered again until all existing listen "backlog" sockets have been accepted using accept.

    In the case of a byte stream, new events won't be triggered until all the the available bytes have been read from the stream (the buffer was emptied).

  • Level Triggered: On the other hand, level triggered events will behave closer to how legacy select (or poll) operates, allowing epoll to be used with older code.

    The event-merger rule is more complex: events of the same type are only merged if no one is waiting for an event (no one is waiting for epoll_wait to return), or if multiple events happen before epoll_wait can return... otherwise any event causes epoll_wait to return.

    In the case of a listening socket, the EPOLLIN event will be triggered every time a client connects... unless no one is waiting for epoll_wait to return, in which case the next call for epoll_wait will return immediately and all the EPOLLIN events that occurred during that time will have been merged into a single event.

    In the case of a byte stream, new events will be triggered every time new data comes in... unless, of course, no one is waiting for epoll_wait to return, in which case the next call will return immediately for all the data that arrive util epoll_wait returned (even if it arrived in different chunks / events).

  • Exclusive return: The EPOLLEXCLUSIVE flag is used to prevent the "thundering heard" behavior, so only a single epoll_wait caller is woken up for each fd wake-up event.

    As I pointed out before, for edge-triggered states, an fd wake-up event is a change in the fd state. So all EPOLLIN events will be raised until all data was read (the listening socket's backlog was emptied).

    On the other hand, for level triggered events, each EPOLLIN will invoke a wake up event. If no one is waiting, these events will be merged.

Following the example in your question:

  • For level triggered events: every time a client connects, a single thread will return from epoll_wait... BUT, if two more clients were to connect while both threads were busy accepting the first two clients, these EPOLLIN events would merge into a single event and the next call to epoll_wait will return immediately with that merged event.

    In the context of the example given in the question, thread B is expected to "wake up" due to epoll_wait returning.

    In this case, both threads will "race" towards accept.

    However, this doesn't defeat the EPOLLEXCLUSIVE directive or intent.

    The EPOLLEXCLUSIVE directive is meant to prevent the "thundering heard" phenomenon. In this case, two threads are racing to accept two connections. Each thread can (presumably) call accept safely, with no errors. If three threads were used, the third would keep on sleeping.

    If the EPOLLEXCLUSIVE weren't used, all the epoll_wait threads would have been woken up whenever a connection was available, meaning that as soon as the first connection arrived, both threads would have been racing to accept a single connection (resulting in a possible error for one of them).

  • For edge triggered events: only one thread is expected to receive the "wake up" call. That thread is expected to accept all waiting connections (empty the listen "backlog"). No more EPOLLIN events will be raised for that socket until the backlog is emptied.

The same applies to readable sockets and pipes. The thread that was woken up is expected to deal with all the readable data. This prevents to waiting threads from attempting to read the data concurrently and experiencing file lock race conditions.

I would recommend (and this is what I do) to set the listening socket to non-blocking mode and calling accept in a loop until an EAGAIN (or EWOULDBLOCK) error is raised, indicating that the backlog is empty. There is no way to avoid the risk of events being merged. The same is true for reading from a socket.

Testing this with code:

I wrote a simple test, with some sleep commands and blocking sockets. Client sockets are initiated only after both threads start waiting for epoll.

Client thread initiation is delayed, so client 1 and client 2 start a second apart.

Once a server thread is woken up, it will sleep for a second (allowing the second client to do it's thing) before calling accept. Maybe the servers should sleep a little more, but it seems close enough to manage the scheduler without resorting to conditional variables.

Here are the results of my test code (which might be a mess, I'm not the best person for test design)...

On Ubuntu 16.10, which supports EPOLLEXCLUSIVE, the test results show that the listening threads are woken up one after the other, in response to the clients. In the example in the question, thread B is woken up.

Test address: <null>:8000
Server thread 2 woke up with 1 events
Server thread 2 will sleep for a second, to let things happen.
client number 1 connected
Server thread 1 woke up with 1 events
Server thread 1 will sleep for a second, to let things happen.
client number 2 connected
Server thread 2 accepted a connection and saying hello.
client 1: Hello World - from server thread 2.
Server thread 1 accepted a connection and saying hello.
client 2: Hello World - from server thread 1.

To compare with Ubuntu 16.04 (without EPOLLEXCLUSIVE support), than both threads are woken up for the first connection. Since I use blocking sockets, the second thread hangs on accept until client # 2 connects.

main.c:178:2: warning: #warning EPOLLEXCLUSIVE undeclared, test is futile [-Wcpp]
 #warning EPOLLEXCLUSIVE undeclared, test is futile
  ^
Test address: <null>:8000
Server thread 1 woke up with 1 events
Server thread 1 will sleep for a second, to let things happen.
Server thread 2 woke up with 1 events
Server thread 2 will sleep for a second, to let things happen.
client number 1 connected
Server thread 1 accepted a connection and saying hello.
client 1: Hello World - from server thread 1.
client number 2 connected
Server thread 2 accepted a connection and saying hello.
client 2: Hello World - from server thread 2.

For one more comparison, the results for level triggered kqueue show that both threads are awoken for the first connection. Since I use blocking sockets, the second thread hangs on accept until client # 2 connects.

Test address: <null>:8000
client number 1 connected
Server thread 2 woke up with 1 events
Server thread 1 woke up with 1 events
Server thread 2 will sleep for a second, to let things happen.
Server thread 1 will sleep for a second, to let things happen.
Server thread 2 accepted a connection and saying hello.
client 1: Hello World - from server thread 2.
client number 2 connected
Server thread 1 accepted a connection and saying hello.
client 2: Hello World - from server thread 1.

My test code was (sorry for the lack of comments and the messy code, I wasn't writing for future maintenance):

#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif

#define ADD_EPOLL_OPTION 0 // define as EPOLLET or 0

#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <limits.h>
#include <netdb.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>

#if !defined(__linux__) && !defined(__CYGWIN__)
#include <sys/event.h>
#define reactor_epoll 0
#else
#define reactor_epoll 1
#include <sys/epoll.h>
#include <sys/timerfd.h>
#endif

int sock_listen(const char *address, const char *port);
void *listen_threard(void *arg);
void *client_thread(void *arg);
int server_fd;
char const *address = NULL;
char const *port = "8000";

int main(int argc, char const *argv[]) {
  if (argc == 2) {
    port = argv[1];
  } else if (argc == 3) {
    port = argv[2];
    address = argv[1];
  }
  fprintf(stderr, "Test address: %s:%s\n", address ? address : "<null>", port);
  server_fd = sock_listen(address, port);
  /* code */
  pthread_t threads[4];

  for (size_t i = 0; i < 2; i++) {
    if (pthread_create(threads + i, NULL, listen_threard, (void *)i))
      perror("couldn't initiate server thread"), exit(-1);
  }
  for (size_t i = 2; i < 4; i++) {
    sleep(1);
    if (pthread_create(threads + i, NULL, client_thread, (void *)i))
      perror("couldn't initiate client thread"), exit(-1);
  }
  // join only server threads.
  for (size_t i = 0; i < 2; i++) {
    pthread_join(threads[i], NULL);
  }
  close(server_fd);
  sleep(1);
  return 0;
}

/**
Sets a socket to non blocking state.
*/
inline int sock_set_non_block(int fd) // Thanks to Bjorn Reese
{
/* If they have O_NONBLOCK, use the Posix way to do it */
#if defined(O_NONBLOCK)
  /* Fixme: O_NONBLOCK is defined but broken on SunOS 4.1.x and AIX 3.2.5. */
  int flags;
  if (-1 == (flags = fcntl(fd, F_GETFL, 0)))
    flags = 0;
  // printf("flags initial value was %d\n", flags);
  return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
#else
  /* Otherwise, use the old way of doing it */
  static int flags = 1;
  return ioctl(fd, FIOBIO, &flags);
#endif
}

/* open a listenning socket */
int sock_listen(const char *address, const char *port) {
  int srvfd;
  // setup the address
  struct addrinfo hints;
  struct addrinfo *servinfo;       // will point to the results
  memset(&hints, 0, sizeof hints); // make sure the struct is empty
  hints.ai_family = AF_UNSPEC;     // don't care IPv4 or IPv6
  hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
  hints.ai_flags = AI_PASSIVE;     // fill in my IP for me
  if (getaddrinfo(address, port, &hints, &servinfo)) {
    perror("addr err");
    return -1;
  }
  // get the file descriptor
  srvfd =
      socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
  if (srvfd <= 0) {
    perror("socket err");
    freeaddrinfo(servinfo);
    return -1;
  }
  // // keep the server socket blocking for the test.
  // // make sure the socket is non-blocking
  // if (sock_set_non_block(srvfd) < 0) {
  //   perror("couldn't set socket as non blocking! ");
  //   freeaddrinfo(servinfo);
  //   close(srvfd);
  //   return -1;
  // }

  // avoid the "address taken"
  {
    int optval = 1;
    setsockopt(srvfd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
  }
  // bind the address to the socket
  {
    int bound = 0;
    for (struct addrinfo *p = servinfo; p != NULL; p = p->ai_next) {
      if (!bind(srvfd, p->ai_addr, p->ai_addrlen))
        bound = 1;
    }

    if (!bound) {
      // perror("bind err");
      freeaddrinfo(servinfo);
      close(srvfd);
      return -1;
    }
  }
  freeaddrinfo(servinfo);
  // listen in
  if (listen(srvfd, SOMAXCONN) < 0) {
    perror("couldn't start listening");
    close(srvfd);
    return -1;
  }
  return srvfd;
}
/* will start listenning, sleep for 5 seconds, then accept all the backlog and
 * finish */
void *listen_threard(void *arg) {

  int epoll_fd;
  ssize_t event_count;
#if reactor_epoll

#ifndef EPOLLEXCLUSIVE
#warning EPOLLEXCLUSIVE undeclared, test is futile
#define EPOLLEXCLUSIVE 0
#endif
  // create the epoll wait fd
  epoll_fd = epoll_create1(0);
  if (epoll_fd < 0)
    perror("couldn't create epoll fd"), exit(1);
  // add the server fd to the epoll watchlist
  {
    struct epoll_event chevent = {0};
    chevent.data.ptr = (void *)((uintptr_t)server_fd);
    chevent.events =
        EPOLLOUT | EPOLLIN | EPOLLERR | EPOLLEXCLUSIVE | ADD_EPOLL_OPTION;
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &chevent);
  }
  // wait with epoll
  struct epoll_event events[10];
  event_count = epoll_wait(epoll_fd, events, 10, 5000);
#else
  // testing on BSD, use kqueue
  epoll_fd = kqueue();
  if (epoll_fd < 0)
    perror("couldn't create kqueue fd"), exit(1);
  // add the server fd to the kqueue watchlist
  {
    struct kevent chevent[2];
    EV_SET(chevent, server_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0,
           (void *)((uintptr_t)server_fd));
    EV_SET(chevent + 1, server_fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0,
           (void *)((uintptr_t)server_fd));
    kevent(epoll_fd, chevent, 2, NULL, 0, NULL);
  }
  // wait with kqueue
  static struct timespec reactor_timeout = {.tv_sec = 5, .tv_nsec = 0};
  struct kevent events[10];
  event_count = kevent(epoll_fd, NULL, 0, events, 10, &reactor_timeout);
#endif

  close(epoll_fd);

  if (event_count <= 0) {
    fprintf(stderr, "Server thread %lu wakeup no events / error\n",
            (size_t)arg + 1);
    perror("errno ");
    return NULL;
  }

  fprintf(stderr, "Server thread %lu woke up with %lu events\n",
          (size_t)arg + 1, event_count);
  fprintf(stderr,
          "Server thread %lu will sleep for a second, to let things happen.\n",
          (size_t)arg + 1);
  sleep(1);
  int connfd;
  struct sockaddr_storage client_addr;
  socklen_t client_addrlen = sizeof client_addr;
  /* accept up all connections. we're non-blocking, -1 == no more connections */
  if ((connfd = accept(server_fd, (struct sockaddr *)&client_addr,
                       &client_addrlen)) >= 0) {
    fprintf(stderr,
            "Server thread %lu accepted a connection and saying hello.\n",
            (size_t)arg + 1);
    if (write(connfd, arg ? "Hello World - from server thread 2."
                          : "Hello World - from server thread 1.",
              35) < 35)
      perror("server write failed");
    close(connfd);
  } else {
    fprintf(stderr, "Server thread %lu failed to accept a connection",
            (size_t)arg + 1);
    perror(": ");
  }
  return NULL;
}

void *client_thread(void *arg) {

  int fd;
  // setup the address
  struct addrinfo hints;
  struct addrinfo *addrinfo;       // will point to the results
  memset(&hints, 0, sizeof hints); // make sure the struct is empty
  hints.ai_family = AF_UNSPEC;     // don't care IPv4 or IPv6
  hints.ai_socktype = SOCK_STREAM; // TCP stream sockets
  hints.ai_flags = AI_PASSIVE;     // fill in my IP for me
  if (getaddrinfo(address, port, &hints, &addrinfo)) {
    perror("client couldn't initiate address");
    return NULL;
  }
  // get the file descriptor
  fd =
      socket(addrinfo->ai_family, addrinfo->ai_socktype, addrinfo->ai_protocol);
  if (fd <= 0) {
    perror("client couldn't create socket");
    freeaddrinfo(addrinfo);
    return NULL;
  }
  // // // Leave the socket blocking for the test.
  // // make sure the socket is non-blocking
  // if (sock_set_non_block(fd) < 0) {
  //   freeaddrinfo(addrinfo);
  //   close(fd);
  //   return -1;
  // }

  if (connect(fd, addrinfo->ai_addr, addrinfo->ai_addrlen) < 0 &&
      errno != EINPROGRESS) {
    fprintf(stderr, "client number %lu FAILED\n", (size_t)arg - 1);
    perror("client connect failure");
    close(fd);
    freeaddrinfo(addrinfo);
    return NULL;
  }
  freeaddrinfo(addrinfo);
  fprintf(stderr, "client number %lu connected\n", (size_t)arg - 1);
  char buffer[128];
  if (read(fd, buffer, 35) < 35) {
    perror("client: read error");
    close(fd);
  } else {
    buffer[35] = 0;
    fprintf(stderr, "client %lu: %s\n", (size_t)arg - 1, buffer);
    close(fd);
  }
  return NULL;
}

P.S.

As a final recommendation, I would consider having no more than a single thread and a single epoll fd per process. This way the "thundering heard" is a non-issue and EPOLLEXCLUSIVE (which is still very new and isn't widely supported) can be disregarded... the only "thundering heard" this still exposes is for the limited amount of shared sockets, where the race condition might be good for load balancing.


Original Answer

I'm not sure I understand the confusion, so I'll go over EPOLLET and EPOLLEXCLUSIVE to show their combined expected behavior.

As you well know:

  • Once you set EPOLLET (edge triggered), events are triggered on fd state changes rather than fd events.

    This design is explicitly meant to prevent epoll_wait from returning due to an event that is in the process of being handled (i.e., when new data arrives while the EPOLLIN was already raised but read hadn't been called or not all of the data was read).

    In the case of a listening socket, the EPOLLIN event won't be triggered again until all existing listen "backlog" sockets have been accepted using accept.

  • The EPOLLEXCLUSIVE flag is used to prevent the "thundering heard" behavior, so only a single epoll_wait caller is woken up for each fd wake-up event.

    As I pointed out before, for edge-triggered states, an fd wake-up event is a change in the fd state. So all EPOLLIN events will be raised until all data was read (the listening socket's backlog was emptied).

When merging these behaviors, and following the example in your question, only one thread is expected to receive the "wake up" call. That thread is expected to accept all waiting connections (empty the listen "backlog") or no more EPOLLIN events will be raised for that socket.

The same applies to readable sockets and pipes. The thread that was woken up is expected to deal with all the readable data. This prevents to waiting threads from attempting to read the data concurrently and experiencing file lock race conditions.

I would recommend that you consider avoiding the edge triggered events if you mean to call accept only once for each epoll_wait wake-up event. Regardless of using EPOLLEXCLUSIVE, you run the risk of not emptying the existing "backlog", so that no new wake-up events will be raised.

Alternatively, I would recommend (and this is what I do) to set the listening socket to non-blocking mode and calling accept in a loop until and an EAGAIN (or EWOULDBLOCK) error is raised, indicating that the backlog is empty.


EDIT 1: Level Triggered Events

It seems, as Nathaniel pointed out in the comment, that I totally misunderstood the question... I guess I'm used to EPOLLET being the misunderstood element.

So, what happens with normal, level-triggered, events (NOT EPOLLET)?

Well... the expected behavior is the exact mirror image (opposite) of edge triggered events.

For listenning sockets, the epoll_wait is expected return whenever a new connected is available, whether accept was called after a previous event or not.

Events are only "merged" if no-one is waiting with epoll_wait... in which case the next call for epoll_wait will return immediately.

In the context of the example given in the question, thread B is expected to "wake up" due to epoll_wait returning.

In this case, both threads will "race" towards accept.

However, this doesn't defeat the EPOLLEXCLUSIVE directive or intent.

The EPOLLEXCLUSIVE directive is meant to prevent the "thundering heard" phenomenon. In this case, two threads are racing to accept two connections. Each thread can (presumably) call accept safely, with no errors. If three threads were used, the third would keep on sleeping.

If the EPOLLEXCLUSIVE weren't used, all the epoll_wait threads would have been woken up whenever a connection was available, meaning that as soon as the first connection arrived, both threads would have been racing to accept a single connection (resulting in a possible error for one of them).

这篇关于epoll的EPOLLEXCLUSIVE模式如何与电平触发相互作用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆