TOC

I/O多路复用

    首先,我们需要知道的是,不管是select、poll还是epoll,他们是一个东西,他们都是实现I/O多路复用的手段,只不过他们底层的数据结构、消息通知等特性都有不同的实现,所以他们统称为I/O多路复用技术,只是底层实现的手段稍有差异;
    对于目前来讲,大多数操作系统都支持了select和poll,但是由于select、poll这两种方式在对计算机技术的应用中存在众多问题,于是就有了epoll这种模型,epoll是对select的再次升级,它不仅对底层的文件描述符的数量进行了突破,还对底层多路I/O存储的数据结构进行了优化以及内存优化等高级特性;
    对于epoll模型来讲,它是在Linux内核的2.5+之后才开始支持的,也就是说,它是一项比较新式的技术,并不是特别的通用,比如windows就不支持原生的epoll模型,但是它的替代品是iocp;
    在Python中,对I/O多路复用的系统调用底层也进行了一次封装,即select库,它里面实现了select、poll这种类型的系统调用,并且部分实现了epoll,是部分实现,但是由于这个库太过原始,用起来非常的麻烦,如果想要更好的使用它,那需要知道select、poll、epoll底层的各种细节,所以Python从3.4开始提供了一个selectors库,这是一个对I/O多路复用的高级封装库,如下就是这个selectors的源码层次结构;
selectors层次结构
    BaseSelector
    +-- SelectSelector  # 实现select
    +-- PollSelector    # 实现poll
    +-- EpollSelector   # 实现epoll
    +-- DevpollSelector # 实现devpoll
    +-- KqueueSelector  # 实现kqueue
    上面也说过select和poll才是所有操作系统通用的模型,但是这两种模型需要遍历I/O,才能知道到底哪个或者哪几个I/O的数据已经准备好了,所以它带来的问题就是性能较低,但是如果我们选择epoll的话,就有适配问题了,因为对于epoll来讲,不同平台均有不相同的实现,BSD、MAC下面支持kqueue,Solaris下面支持/dev/poll,而windows则是IOCP,因为selectors里面并没有提供IOCP的支持,也就是说在windows下是没法使用epoll的,只能退化为原生的select,这一点,通过源码就能看到;

select/poll

    Select之所以效率低就是因为它内部维护了一个数组的线性结构,select将所有需要监控的I/O的文件描述符对象,都保存在这个数组里面,一旦监控的对象多了,线性结构最大的问题是遍历的时候效率过低,带来的性能问题极大,同时select对底层的文件描述符的大小是有限制的,虽然可以修改,但是依然是有一定的界限;
    随着技术的发展,高并发的需求,I/O多路复用技术慢慢渗透到各行各业,随着技术的发展,需求量的不断增加,后来描述符1024的限制就改进了,衍生出了poll,poll是用链表,但是链表一样,它最大的问题还是遍历的性能低下,哪一个I/O事件完成了(数据准备好了),调用者不得而知,必须得遍历所有的I/O文件描述符对象,所以他们的效率都很差;

epoll

    那么到了后来,epoll的技术模型就面世了,epoll采用了事件通知的机制,也就是我们说的回调,一个意思,内部采用什么数据结构,我们就没必要关心了,因为它采用的是回调的通知机制,当I/O完成了(数据准备好了),就立即调用回调函数,将具体哪一个I/O事件完成了(数据准备好了)通知给调用者,性能得到了大大的提高;
    并且它也解决了select/poll存在的一个非常重大的一个问题,即内存管理,对于select/poll模型来讲,它们必须从将数据从内核缓冲区复制到用户缓冲区,这个复制虽然很快,依然会消耗大量时间,在高并发情况下,I/O的时间是一个非常重要的技术参数之一;
    而epoll是通过共享内存实现的,将内核空间对应的内存地址的引用交给用户空间的进程,调用者就不用拷贝来拷贝去了,节省了大量的时间,性能得到了非常大的提高;

selectors

    selectors库,通过源码可以看得出来,它主要是实现了原生select、poll、epoll以及mac平台的kqueue和unix的devpoll,selectors库也做出了一个优先级排序epoll|kqueue|devpoll > poll > select,这说明优先使用epoll和类epoll的模型;
register(fileobj, events, data=None):注册类文件对象(fileobj),监控类文件对象的I/O状态,返回与fileobj关联的SelectorKey对象;
unregister(fileobj):注销一个类文件对象,取消对它的状态监控,它返回与类文件对象关联的SelectorKey对象;
select():默认永久阻塞,可以设置超时时间,等待注册的fileobje的读或写事件准备好,即开始监控,返回值是一个(key, events)的元组,其中key是一个SelectorKey类的实例, 而events就是event,也就是读或写;
get_map():返回fileobj对象和其关联的SelectorKey的映射,这个映射是一个字典字典的key为fd,value为SelectorKey,其中SelectorKey就是注册的这个类文件对象的SelectorKey对象,fd就是这个类文件对象的文件描述符;
modify(fileobj, events, data):用于修改一个注册过的文件对象,比如从监听可读变为监听可写,它其实就是register() 后再跟unregister(),但是使用modify()更高效;
get_key(fileobj):返回已注册的类文件对象的SelectorKey;
    selectors库,下面提供一个DefaultSelector的类,它会自动选择适合当前平台最有效、性能最高的select模型,但是由于selectors库并没有实现windows下的IOCP,所以对于windows平台只能退化为SelectSelector,即原生select,如如下源代码;
# Choose the best implementation, roughly:
#    epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector
注册事件
    对于I/O多路复用来讲,我们需要监控一个I/O,就必须将需要监控的I/O对象交给select,由select去对这些I/O进行状态监控,那么常见的I/O就两种,磁盘I/O和网络I/O;
    selectors库中任何一种I/O多路复用模型,都有一个register的I/O注册方法,这个register方法就是调用select系统调用注册一个类文件对象的监控,注册完成之后,内核就会调用相应的I/O复用模型去监视它的I/O事件,事件分两种,读事件和写事件,此外它还接受一个data参数,这个data参数,它是我们在注册一个类文件对象时,附带的数据,这个数据可以是任何合法的数据类型,所以我们可以将回调函数也塞进去,当I/O状态变化时去调用这个回调函数,实现I/O状态的通知;
class SelectSelector(_BaseSelectorImpl):
    """Select-based selector."""
    def register(self, fileobj, events, data=None): pass
# fileobj:需要监听的类文件对象;
# events:监听的事件,是读事还是写事件;
# data:注册一个类文件对象需要绑定的data,主要用于回调;
    注册函数,需要提供一个类文件对象,和监听事件,可以看到是events说明可以监听多个事件,但是最多也就两种,即读事件和写事件,如下介绍;
EVENT_READ:可读,内核空间的数据已经准备完成,可以开始读了;
EVENT_WIRTE:可写,内核空间的数据已经准备完成,可以开始写了;
# 源码如下
EVENT_READ = (1 << 0):表示读写状态,读为1;
EVENT_WRITE = (1 << 1):表示读写状态,写为2;
    当I/O模型向select系统调用发起注册请求之后,会返回一个SelectorKey对象,这个SelectorKey对象就保存着,我们调用registry方法时传递的所有参数,它有四个属性,第一个为fileobj,当初从registry传入的file对象,第二个是fd,fd就是fileobj获得的文件描述符,第三个是events,当初从registry传入的events对象,第四个为data,当初从registry传入的data参数;
SelectorKey
    通过源码来看,SelectorKey是一个命名元祖,即namedtuple,这个命名元祖的名称为SelectorKey,它有四个元素fileobj、fd、events和data,说白了,它就是我们将一个类文件对象调用select的registry方法提供的参数,全部存储到这个名为SelectorKey的命名元祖里面,并且将这个类文件对象的文件描述符也存储了进去,就是对每个已注册的类文件对象做了个记录;
SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data'])
基本使用
    对于selectors库来讲,它是使用方法还是非常简单的,也就那么几个方法可以调用,如下,就是一个使用selectors实现的多路I/O监控去监控socket文件描述符的I/O事件;
    因为需要监控I/O对象,所以首先创建了一个socket具有网络I/O特性对象,然后使其监听在一个指定的端口之上,最后使用我们的selectors来监控这个socket文件描述符的I/O事件,因为一般作为服务端来讲,都是等待客户端来访问自己,那么对于自己这一端来讲,就是一种读的网络I/O事件;
    那么我们就利用selectors去监控这个socket的I/O状态,一旦客户端访问自己,并且请求已经从物理网卡达到操作系统内核的缓冲区时,就会被select捕获到;
    经过如下代码测试,可以看到,一旦用户链接进来就断开,因为用户链接进来,对于自己就是一种读操作,select在后台一直都监控着这个socket,一旦这个socket对象产生了读的I/O事,并数据已经到达了内核的缓冲区就不在阻塞,所以,整个程序就会直接断开;
server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 开启端口复用,避免端口已占用
server.bind(("127.0.0.1",8080))
server.listen()
server.setblocking(False) # selectors要求监控对象为非阻塞(经测试,阻塞也可以)

select=selectors.DefaultSelector() # 创建一个select对象
key=select.register(server,events=selectors.EVENT_READ) # 将需要状态监控的类文件对象调用select系统调用,注册到监控器里面去,它返回一个SelectorKey对象
events=select.select() # 开始状态监控,select()方法默认阻塞,直至所监控的所有I/O对象,有一个或者一部分产生了I/O满足events事件
print(events) # 返回已满足事件的I/O对象,一个或者多个
select.close()
实现回调机制
    可以看到上述的代码,已经实现了基本的I/O监控,但是我们还有一个问题,就是当I/O事件完成第一阶段之后,什么都没做,这监控的意义就不存在了;
    在上面的SelectorKey中就说到,在调用selectors的registry时,我们可以给定一个data的参数,它的值可以是任何合法类型,那么我们就可以将我们的回调函数通过data给传递进去,实现回调机制,当I/O的数据达到内核缓冲区之后,就调用data这个传递进来的函数,进行回调,通知调用者将数据从内核缓冲区取走;
    如下,在如上代码加入回调功能,当客户端发起链接请求的请求数据达到内核缓冲区之后,就调用回调函数,在回调函数里面调用socket的accpet方法,获得客户端通信socket和客户的信息;
server=socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 开启端口复用,避免端口已占用
server.bind(("127.0.0.1",8080))
server.listen()
server.setblocking(False) # selectors要求监控对象为非阻塞(经测试,阻塞也可以)

def accept(sock): # 回调函数
    new_sock,client_info=sock.accept()
    print(client_info)

select=selectors.DefaultSelector()
select.register(server,events=selectors.EVENT_READ,data=accept) # data为当server读事件准备好了之后,需要用到的函数,该函数主要用于回调

ready_list=select.select() # 开始状态监控,select()方法默认阻塞,直至所监控的所有I/O对象,有一个或者一部分产生了I/O满足events事件,返回值为产生I/O的类文件对象列表
for key,event in ready_list: # 迭代所有数据准备好I/O对象
    callback=key.data # key.data为registry时传递的函数,其实这就是一个回调函数
    callback(key.fileobj) # key.fileobj就是我们传入监控的socket对象,将key.fileobj传入我们的回调函数,进行回调
select.close()
# ('127.0.0.1', 53342)
实现EchoServer
    selectors库的select()方法,默认是永久阻塞的,当然我们也可以设置超时时间,它返回的条件是,select监控的所有的I/O对象,有最少一个数据准备好了,那么对于这些数据没有准备好的I/O对象,依旧会在后台进行实时状态监控,那么又因为,即使I/O对象的数据准备好,并且数据已经被取走了,I/O对象并不会从select的监控器里面移除,除非我们使用close()或者unregistry方法将这个I/O对象从select监控器里面移除,,一个I/O的监控,不是一次性的,一般都是需要不间断的监控的,就类似聊天软件,双方不是聊一次就不再通信了,这就是I/O多路复用,所以我们需要对其循环监控,因此加入到while循环中,不间断监控;
    那么同时,我们也可以看到,我们的回调是通过key.data来实现的,这个key点data就是我们当初在registry这个socket的类文件对象时,传递的回调函数,所以当数据准备好之后,我们就可以利用这个函数来进行回调,需要注意的是,每个fileobj的回调函数可能都不一致,也可能一致,如下就是一个借助selectors库实现的基于I/O多路复用的EchoServer;
server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) # 开启端口复用,避免端口已占用
server.bind(("127.0.0.1",8080))
server.listen()
server.setblocking(False) # selectors要求监控对象为非阻塞(经测试,阻塞也可以)

# 消息收发回调函数
def recv(new_socket):
    data=new_socket.recv(1024)
    if data: # 判断客户端是否已异常断开
        new_socket.send(data)
    else:
        select.unregister(new_socket) # 如果客户端断开,记得将这个new_socket从队列中移除,因为客户端断开服务端会收到一个空串,这个消息如果不收,select就会进入死循环
# 客户端链接回调函数
def accept(sock):
    new_sock,client_info=sock.accept()
    select.register(new_sock,selectors.EVENT_READ,recv) # 将newsocket也加入select队列,使得select能对newsocket得读写事件进行监听

select=selectors.DefaultSelector()
select.register(server,events=selectors.EVENT_READ,data=accept)

while True: # 开启循环,循环监听
    events=select.select() # 开启监听,默认阻塞,直至监控的I/O满足事件,监控第一阶段,返回数据已准备好的所有fileobj
    for key,event in events:  # 迭代所有数据准备好得SelectorKey对象
        key.data(key.fileobj) # 开始进行回调,key.data就是我们registry时传入得回调函数,key.fileobj就是我们传入监控的socket对象
  • 重点:当一个fileobj产生了I/O事件,并且第一阶段已完成,它的数据在内核缓冲区如果未被读走,那么它将一直都会被select()返回,从而出现死循环,如socket的客户端断开链接会发送一个空串,如果这个程序采用了selectors实现多路复用,那么在客户端断开时,需要捕捉这个空串,一旦出现这个空串就unregistry,否则就会出现死循环的问题;
基于I/O多路复用实现群聊
    如下就是基于I/O多路复用的群聊程序,使用select监控多路socket I/O,这些I/O包括socket用来接受用户请求的socket,同时也包括服务端与客户端实现通信的socket,当这些Socket I/O第一阶段数据准备好了,就立即进行回调到指定的回调函数,进行下一步操作,此处需要注意一点的就是,当客户端断开时,切记不要忘记将这个客户端的通信socket从select队列中移除,以避免造成死循环的可能;
    此外,以往实现群聊,我们需要将所有的已经链接的客户端存储到一个容器里面,那么对于selectors库来讲,所有的socket对象都会放在select的监视器中,所以对于I/O多路复用的群聊来讲,我们可以直接在这个监视器中拿到所有的socket对象;
class ChatServer():
    def __init__(self,bind_address="127.0.0.1",bind_port=8083):
        self.bind_address=bind_address,bind_port
        self.server=socket.socket(family=socket.AF_INET,type=socket.SOCK_STREAM)
        self.select=selectors.DefaultSelector()
        self.event=threading.Event() # 创建一个Flag
    def start(self):
        self.server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
        self.server.bind(self.bind_address)
        self.server.listen()
        self.server.setblocking(False) # 设置socket为非阻塞
        self.select.register(self.server,selectors.EVENT_READ,{"callback":self.accept,"recv":False}) # 将accept函数与当前的fileobj进行绑定
        while not self.event.isSet(): # 借助Event来实现死循环
            events=self.select.select() # 使用I/O多路复用监控多路I/O的读事件,默认阻塞,返回一个列表,每个元素为一个二元组,元组内的第一个元素SelectorKey,第二个元素为events
            for key,evnet in events:
                key.data.get("callback")(key.fileobj) # 借助registry时提供的data函数,实现回调
    def accept(self,sock): # 客户端链接回调函数
        new_socket,client_info=sock.accept()
        self.select.register(new_socket,selectors.EVENT_READ,{"callback":self.recv,"recv":True}) # 将accept函数与当前的fileobj进行绑定
    def recv(self,new_socket): # 信息交互回调函数
        data=new_socket.recv(1024)
        if data or data.decode=="q":
            self.group_send(data)
        else: # 垃圾清理
            new_socket.close()
            self.select.unregister(new_socket)
    def group_send(self,data): # 群聊的实现
        for fd,key in self.select.get_map().items(): # 从select的监视器里面拿到所有的与客户端通信的socket
            if key.data.get("recv"): # 判断,如果当前socket为accept的socket就不进行数据发送
                key.fileobj.send(data)
    def stop(self):
        self.event.set()
        for fd,key in self.select.get_map().items():
            key.fileobj.close()
        self.select.close()
        self.server.close()
c1=ChatServer()
c1.start()

总结

    回顾在最初使用socket库来编写TCP的群聊程序,为了解决accept阻塞的问题,我们加入了线程,又为了解决每一个与客户端通信的netsocket又加入了线程,每个客户端链接的通信信道创建一个线程,使用这种方式最大的问题是在于,这种交互式聊天程序是I/O密集型程序,I/O密集型使用多线程是合适的,因为它等待I/O时会阻塞,GIL这把锁就释放了,但是我们需要知道的是,当客户端非常多的时候,线程也会随之增多,这会带来非常大的性能消耗;
    作为进程这个概念来讲,它里面有100个线程没问题,1000个线程问题也不大,那么如果有10000个线程呢,那带来的结果就是性能非常低下,线程切换的问题,这个线程切换,带来的消耗是非常巨大的,那么同时每个线程都会开辟自己的栈空间,这样也会浪费巨大的内存空间,消耗内存太大了;
    我们需要知道的是,这些线程大部分都会进入阻塞态 ,因为他们要等待I/O,假设现在有1000个客户端,可能在某一个时刻只有那么几十个人在交互数据,其他的几百的线程全都处于阻塞态;
    所以在规模不大的情况下,这种阻塞式配合多线程未必不是一种比较好的选择,因为大多数情况下,线程都是进入到I/O阻塞,那么上述的多线程配合多路复用I/O它类似一种单线程配合一个监控器来实现的,这个监控器来监控所有的客户端,一旦客户端发送数据来到,当监视器看到数据已经到达自己的内核空间时,直接使用回调函数来进行处理,单线程就可以搞定,单线程首先线程少了,线程所占用的内存也就少了,单线程也解决线程切换问题,以及锁竞争问题;
    所以这种模型就简单很多,甚至我们使用到类似epoll这种技术技术时,还能突破文件描述符的限制,我们可以接入更多的链接,在这种情况下,I/O的并发量就提高了;

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注