TOC

SocketServer

    Socket编程是每一个程序员都应该掌握的网络基础编程,通过它,我们看到了网络通讯的底层到底是如何实现的,那么也是由于Socket太过于底层,那么带来的问题就是使用起来极为麻烦,写一个简单的聊天功能,需要写一堆代码,开发起来很没效率,而且它内部还有很多细节需要注意,非常的繁琐,虽然编程有套路,但是想要写出健壮的代码,还是比较困难的;
    于是就有很多语言都对Socket底层进行了封装,Python也是如此,Python最为流行的Socket封装库就是SocketServer,一看这名字就是用来做Socket服务端的,因为客户端太简单了,在前面TCP、UDP的原生例子,也能看出来,Client考虑的问题很少,而Server考虑的问题则很多;
    也是如此诸多问题,所以Python提供的SocketServer模块就是让开发者快速实现基于Socket的Server的实现,那么现在对于企业级开发也都是基于SocketServer开始的,甚至很多库要么仿照SocketServer的思想来封装,要么就基于SocketServer来实现;

SocketServer同步类

    SocketServer提供了四个"同步阻塞类",SocketServer主类为BaseServer,它是socketserver模块下所有"同步阻塞类"的基类,它约定了所有子类的基本的数据和方法,其子类TCPServer主要实现基于TCP的Socket的实现,然后TCPServer下的一个子类UnixStreamServer,就是基于套接字的TCP的socket,然后BaseServer的子类UDPServer也主要是实现基于Socket的UDP的Socket的实现,然后UDPServer下的子类UnixDatagramServer,它和UnixStreamServer一样,它主要实现基于套接字的UDP的socket;

    虽然socketserver提供了UnixDatagramServer和UnixStreamServer类型的socket,但是一般情况不会使用的,因为它无法跨平台使用,这种套接字类型的无法在windows平台上使用,这种套接字模式的socket离开unix环境就没法使用了,所以一般情况下我们只使用TCPServer和UDPServer,并且UDPServer还是TCPServer的一个子类,它将TCPServer不一样的地方覆盖掉了,所以它就使用了继承TCPServer的方式来实现的UDP的socket服务端;

Mixin异步类

    TCPServer和UDPServer它门都是同步阻塞的Server,就一个主线程运行,来一个链接接入一个连接,接入连接的那一刻其他连接只能阻塞,因为它们就是一个同步阻塞Server,这和我们之前写的TCPServer第一版是一个逻辑,只有一个主线程在运行,只能同时处理一个客户端;
    那么也是因为如此,SocketServer给我们提供了几个Mixin类,ForkingMixin和ThreadingMixin类,ForkingMixin是创建多进程的一个类,但是由于ForkingMixin无法在主流windows平台使用,windows不支持Fork,所以我们也基本不会使用它;
    所以一般情况下都会使用ThreadingMixin类来实现异步处理,它主要用来实现多线程异步操作,它使用Mixin实现了一种组合继承的模式,这个组合放在一个列表里面,将Mixin类尽量往前放,但是因为Mixin类写在继承列表的最前面,所以通过mro的算法,保证了写在继承列表前面的类里面的方法优先使用,那么SocketServer所供的两个Mixin类,又衍生出了如下四个异步类;
class ForkingUDPServer(ForkingMixIn, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServer(ThreadingMixIn, UDPServer): pass
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

编程接口

    SocketServer最基础的类为BaseServer,BaseServer决定了,由它衍生的TCPServer都有的一些属性, BaseServer需要两个参数,第一个参数为server_addrees,server_addrees是个二元组,就是地址和端口;
    第二个是RequestHandlerClass,它主要决定,当一个TCP或者UDP请求来了之后如何处理和如何响应,并且RequestHandlerClass还必须的BaseRequestHandler的子类,在BaseServer中的代码可以看到;
socketserver.BaseServer(server_address, RequestHandlerClass)
    参数二BaseRequestHandler是一个请求处理的类,它已经把基本的框架给构建好了,我们只需要编写一个请求处理的类,并且继承它、重写BaseRequestHandler定义的属性即可,这就是框架,框架就是一种处理流程,只需要将里面不符合我们期望的逻辑更换掉就行了,定义了数据规范,定义了接口规范;
常用属性和方法
server_address:服务器正在监听的地址和端口,在不同协议格式不一样。Internet协议上是一个元组(“127.0.0.1”,80);
socket:服务器正在监听的套接字对象;
request_queue_size:请求队列的大小。如果处理单个请求需要很长时间,那么在服务器繁忙时到达的任何请求都会被放入队列中,直到request_queue_size请求为止。一旦队列满了,来自客户机的进一步请求将得到一个“连接被拒绝”错误。默认值通常是5,但是可以被子类覆盖;
address_family:服务器套接字所属的协议族。常见的例子是套接字,AF_INET socket.AF_UNIX;
socket_type:服务器使用的套接字类型;套接字。SOCK_STREAM套接字。SOCK_DGRAM是两个常见的值;
timeout:超时持续时间,以秒为单位度量,如果不需要超时,则为None。如果handle_request()在超时期间没有收到传入的请求,则调用handle_timeout()方法。
handle_request():处理单个请求,同步执行,这个函数按顺序调用以下方法:get_request()、verify_request()和process_request()。如果处理程序类的用户提供的handle()方法引发异常,将调用服务器的handle_error()方法。如果在超时秒内没有收到任何请求,那么将调用handle_timeout()并返回handle_request()。
server_forever(poll_interval=0.5):异步执行,处理请求,每隔poll_interval秒轮询一次,忽略timeout属性,还会调用service_actions(),在ForkingMixIn的子类中定义,可以用来清理僵尸进程;
shutdown():告诉serve_forever循环停止,并等待他结束;
server_close():关闭服务器;
finish_request(request,client_address):通过实例化RequestHandlerClass并调用它的handle()方法来处理请求;
server_bind():由服务器的构造函数调用,以将套接字绑定到所需的地址,可能会被覆盖;
verify_request(request,client_address):必须返回一个布尔值;如果值为True,请求将被处理,如果值为False,请求将被拒绝。可以重写此函数来实现服务器的访问控制。默认实现总是返回True;

创建服务器流程

    那么针对socketserver创建一个TCP或者UDP服务端的流程,主要分为如下四大步;
1、从BaseRequestHandler类中派生出子类,并覆盖其handler()方法来创建请求处理程序类,此方法将处理传入请求;
2、实例化一个服务端类,传入监听地址和端口以及请求处理类;
3、调用服务器实例的handle_request()方法提供一次服务或者serve_forever()方法永久提供服务,等待客户端链接;
4、调用server_close()关闭套接字;
BaseRequestHandler
    BaseRequestHandler就是一个请求处理的基础类,每一个处理请求的类都需要继承自BaseRequestHandler,那么通过源码我们可以看到,它只定义了请求处理类的基础框架,定义了三个参数,定一了三个方法,三个方法全部都是pass,因为它是作为一个请求处理类的基类的,所以它只负责基础架构的定义,定义的数据的流转流程,其他的什么都没做,这就是它的作用;
class BaseRequestHandler:
    def __init__(self, request, client_address, server):
        self.request = request # 与客户端新链接后产生的new_socket
        self.client_address = client_address  # 客户端地址
        self.server = server # socketserver的socket对象
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()
    def setup(self):  # 每一个链接初始化要做的事
        pass
    def handle(self):  # 每一个链接请求的处理
        pass
    def finish(self):  # 每一个链接的清理工作
        pass
    那么针对这三个方法各有含义,self.setup()方法,是在处理数据之前要做什么事就重写self.setup()去编写相应的逻辑,self.handle()是当请求接入之后,需要对数据做什么处理,就在self.handle()中去完成,而self.finish()则是当数据处理完之后,需要做什么清理工作;
TCPServer
    SocketServer的TCPServer类继承自BaseServer,它是一个单线程的同步阻塞类,需要注意的是同步阻塞,可以将下面的代码作为TCP服务端,然后启动多个TCP客户端,我们会发现,当第一个客户端连接到Server之后,是可以进行数据的双向交互的;
    但是在不中断第一个客户端链接的情况下,再启动一个TCP客户端,我们会发现,虽然我们链接到了服务端,但是我们发送的数据,服务端是无法向我们转发回来的,这主要原因就是TCPServer为一个单线程同步阻塞的TCP Socket服务端,它没法同时接受多个用户的访问;
class MyHandler(socketserver.BaseRequestHandler): # 需要继承BaseRequestHandler
    def handle(self):
        super().handle() # 调用一下父类的方法
        while True:
            data=self.request.recv(1024)
            self.request.send(data)
            print(data.decode())

address=("127.0.0.1",8083)
TcpServer = socketserver.TCPServer(address,MyHandler)
TcpServer.serve_forever() # 永久提供服务,一直等待客户端的链接
    上述也说到了,同时只允许一个客户端能够进行数据的双向交互,那么其他进来的链接只能等待第一个链接进来的用户终止后,才能轮到他们中的其中一个,所以在第一个链接没有结束之前,他们是只能阻塞的,所以新进来的链接其实进入了一个队列,即blocklog队列,blocklog其实就是干这个事儿的,那么如果这个blocklog满了,其他的链接就直接拒绝了,而这个blocklog的大小为5,也就是最大接收6个链接,第7个链接就直接无法链接了,如下,重写TCPServer类,实现最多只能接收2个链接,即blocklog+1;
class MyHandler(socketserver.BaseRequestHandler): # 需要继承BaseRequestHandler
    def handle(self):
        super().handle() # 调用一下父类的方法
        while True:
            data=self.request.recv(1024)
            self.request.send(data)
            print(data.decode())

class MyServer(socketserver.TCPServer):
    def server_activate(self):
        self.socket.listen(1)

address=("127.0.0.1",8083)
TcpServer = MyServer(address,MyHandler)
TcpServer.serve_forever() # 永久提供服务,一直等待客户端的链接
ThreadingTCPServer
    那么也是由于TCPServer内部的"同步阻塞"的原因,所以socketserver提供了ThreadingTCPServer,它是一个Mixin类,它继承自ThreadingMixIn和TCPServer,ThreadingMixIn重写了process_request,从而实现了异步处理,每一个客户端连接到当前的socket,就开辟一个线程,去处理new_socket和客户端之间的交互;
    它的使用也很简单,和TCPServer是一摸一样的,因为TCPServer里面也有process_request,而ThreadingMixIn只是将TCPServer里面的process_request加入了多线程的技术,从而实现多线程异步处理;
class MyHandler(socketserver.BaseRequestHandler): # 需要继承BaseRequestHandler
    def handle(self):
        super().handle() # 调用一下父类的方法
        while True:
            data=self.request.recv(1024)
            self.request.send(data)
            print(data.decode())

address=("127.0.0.1",8083)
TcpServer = socketserver.ThreadingTCPServer(address,MyHandler) # 使用异步的TCPServer
TcpServer.serve_forever() # 永久提供服务,一直等待客户端的链接
    如上,就是一个基于ThreadingTCPServer的异步TCPServer服务端,我们此时可以通过如上代码测试一下,多个客户端同时接入,我们会发现,它们互不干扰,各自处理;

EchoServer的实现

    EchoServer顾名思义,来什么消息回什么消息,其实在前面的原生版的TCPServer和UDPServer都实现过,以前我们使用原生socket编写EchoServer,还得利用多线程解决accept阻塞主线程的问题,也得解决recv的问题;
    现在,通过SocketServer实现之后,发现这些解决accpet或者recv阻塞主线程问题全部都可以使用ThreadingTCPServer来负责,我们只需要对每一个请求来负责,负责作为Server与客户端之间的通信,也就是说我们只需要管self.handler()方法即可;
    所以我们可以看到如下代码,使用ThreadingTCPServer改进了EchoServer之后,原本需要几十行的代码,使用ThreadingTCPServer仅仅需要不到10行代码就可以完成了,非常之便捷;
class MyHandler(socketserver.BaseRequestHandler): # 需要继承BaseRequestHandler
    def handle(self):
        super().handle() # 调用一下父类的方法
        while True:
            data=self.request.recv(1024)
            self.request.send(data)

address=("127.0.0.1",8083)
TcpServer =  socketserver.ThreadingTCPServer(address,MyHandler)

群聊工具开发

    在前面的TCP或者UDP原生群聊的实现,我们都会将客户端的信息给记录下来,那么针对于socketserver也是一样的,都需要记录客户端的信息,那么问题来了,这个客户端的信息存在哪里呢,这是一个问题,有很多人存在了self.steup()方法里面,这是不行的,因为每一个连接接入都会执行一遍self.steup(),也就是说,如果我们在self.steup()里面写一个self.clients=[],那么每个连接刚接入都self.clients都是空的,所以,这是不可行的,所以此时,我们也必须将其做成类属性,在Server启动的那一刻,就开始创建self.clients;
    另外,还要考虑一个self.clients清除的,问题,针对self.clients的清除,其实我们完全可以放在self.finish()方法里面,不管客户端正常退出还是不正常退出,只要只要服务器断开,就都会进入到self.finish()里面,因为self.finish()在finally里面,所以我们只需要在self.finish()里面清除即可;
    所以,最后,其实我们只需要创建一个群聊方法即可,如下;
class MyHandler(socketserver.BaseRequestHandler): # 需要继承BaseRequestHandler
    clients={}
    def setup(self):
        self.clients[self.client_address]=self.request
    def handle(self):
        super().handle() # 调用一下父类的方法
        while True:
            data=self.request.recv(1024)
            self.request.send(data)
            self.group_send(data)
    def group_send(self,message): # 消息群发方法
        for client in self.clients.values():
            if client == self.request:
                print(message.decode())
            else:
                client.send(message)

    def finish(self):
        self.clients.pop(self.client_address)

address=("127.0.0.1",8083)
TcpServer = socketserver.ThreadingTCPServer(address,MyHandler)
TcpServer.serve_forever() # 永久提供服务,一直等待客户端的链接
    可以看到,我们只需要不到30行代码,完成了当初需要上百行代码的功能,这就是框架,我们作为开发者只需要关注如何处理用户发来的请求数据,然后处理完成之后如何给它响应即可,实际上后来也是这样,未来的B/S编程也是这样的,到最后,关注的仅仅只是请求来了如何处理如何响应的问题,剩下的框架已经全部给我们解决了;

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注