TOC

网络编程

    网络编程是一个非常大的范围,实现网络编程的框架也非常非常的多,但底层用的技术基本上都是Socket编程,Socket编程是非常古老的技术,但是一经发布之后,几乎所有的网络编程都是基于它实现的,它是我们现代网络编程技术中基础的基础,所有的网络编程都必须从Socket开始;

Socket介绍

    Socket在英文中的意思实际上就是插座的意思,但这个插座需要使用东西链接起来,一头在一个插座上,另一头插在另一个插座上,这样就形成了一个端到端的Socket通讯通道,最早是美国加州伯克利大学这边发布了一套Socket的编程接口,一经发布,很受欢迎,随之在Unix/Linux界都进行了采纳,它就变成了,现在网络编程最底层的接口标准;

Socket套接字

    Python也不例外,它对原始的Socket也进行了封装,变成了Python语言中的Socket库,在Python中的socket.py标准库,它也是一个非常底层的接口,它和网络层次没有一一对应的关系,在前面也说过,进程间通讯暂时不用考虑,直接通过网络通讯就可以,即使在本机也是通过网络,使两个进程间通讯,当然,程序在多机间的通讯,也是通过网络,所以说,进程间通讯,往往都是通过网络实现;

协议族

    AF表示Address Family,即IP地址,现在上网都需要路由器,路由器里面有个DHCP就是用来分配地址的,目前最流行的地址是IPV4,但是IPV4的资源枯竭,于是就衍生出了新一代的IPV6,IPV6和我们未来要接入的万物互联是对接起来的,也就是说,基本上我们能看到的设备都能够分配一个IPV6的地址,所以它是为万物互联准备的,但是目前没到这一步,还是IPV4的地址;
    所以在Socket编程时,我们需要准备使用什么样的协议,是IPV4协议还是IPV6协议,还是本地址安全Socket套接字文件协议,但是这个Socket套接字文件协议在Windows上是没有的,只有在Unix/Linux上有,所以说目前来讲,最通用的就是IPV4网络协议来传输数据,如下;
名称
含义
示例
AF_INET
IPV4
192.168.1.1
AF_INET6
IPV6
1050:0000:0000:0000:0005:0600:300c:326b
AF_UNIX
Unix套接字
Unix Domain Socket

Socket类型

    对于目前来讲,最重要的两个协议是TCP协议和UDP协议,这两个协议是目前开发能占到百分之八九十,当然也有一些第三方协议,这些第三方协议要么就是TCP或者UDP的改版,或者升级要么就是基于它又生成了一种协议,这些协议都不多,但是往往都可以在这些第三方协议上追溯到TCP协议或者UDP协议;
    TCP协议是面向链接的,也叫做有链接协议,必须先建立链接,然后再进行通信,换句话说就是端到端之间在链接之前必须先建立一个稳定的传输通道;
    虽然UDP协议也是端到端的通信,但是它这个链接是不预先建立的,它不是面向链接的,它不预先建立专用的传输通道,那么没有这个专用的传输通道,它就有可能出问题;
    TCP协议其实有点像,端到端之间拉了一个专用通道,专线的意思,它可以保证在这个专用的传输通道是稳定的,但是UDP没有这套东西,消息发出去了就发出去了,无法保证这个数据能够稳定的到达对端,它只负责将消息发出去就行了,可能会出现数据丢失的问题;
名称
含义
SOCK_STREAM
面向流的套接字,涉及流就说明我们需要序列化,它是一个字节一个字节的发的的,默认值TCP协议
SOCK_DGRAM
无链接的数据报文套接字,UDP协议

TCP编程

    TCP编程需要用到Socket,现代网络编程都是用的最底层的端到端的Socket编程,它有两端,分别为服务端,和客户端,服务端提供服务,客户端使用服务端来提供的服务;
    类似现在的QQ、微信这种软件,实际上它们都是基于网络的,我们也可以认为他们都是客户端,我们是在使用别人提供的服务,它都会连接到远程的服务器上去,而服务器是提供服务的,所以我们用得所有的软件基本上都是端到端的;
    而我们看不到的那一端,我们称之为Server端,我们作为用户使用的这一端,我们称之为Client,所以这种编程模式就是著名的C/S编程模式,其实我们使用的B/S它本质上也是一个C/S,目前来讲,如果我们从本质上来看,全世界的编程模式就一种,C/S编程模式;

TCP服务端编程

    那么我们要进程服务端开发时,我们首先得创建一个Socket对象,然后绑定TCP协议要求,即绑定IP地址和端口,然后开始监听,将指定的IP的指定端口监听上,最后只需要等待客户端的链接即可,这是基本的流程;
    服务端往往都是被动的,它需要等待客户端的链接,需要等,这个链接来了之后,它对通过Server端的Accept()方法,于客户端建立链接(允许客户端的链接),一建立链接之后,服务端会这个链接再创建一个新的Socket对象和客户端地址的二元组地址(IPV4是以一个IP+PORT的二元组),然后客户端就可以使用这个新的Socket开始通信了;
    也就是说,一次通信是需要建立两个Socket的,第一个Socket是用来链接的,另一个Socket是用来通信的,对于Server端来讲,是有成千上万的用户使用的,所以说,第一个Socket就是用来等待客户端链接的,它不会用来通信,仅仅用来等待客户端的链接;
    最后,我们就可以进行正常的通信了,发送数据、接受数据,那么由此,我们也可以等到一张图,如下;

    可以看到上图,两个客户端,第一个客户端与服务端发起链接,服务端一旦允许链接(accpet),服务端就会创建一个新的Socket负责与客户端的后期交互,那么第二个客户端来了之后,也是一个流程。这是TCP的Socket编程比较特殊的地方;
    此外,Socket是一种资源,它会占用文件描述符,它和我们文件对象是一样的,他们这些对象都称之为like文件对象,又因为文件描述符的打开数量是有上限的,它是一种有限的资源,所以打开了之后,必须关闭close,否则就会形成文件数量打开过多,无法继续创建新的Socket;
# 客户端属性
connect(address):连接到address处的套接字,address格式为元组(hostname,port),如果连接出错抛出socket.error异常;
connect_ex(address):同上,只不过connect_ex有返回值,连接成功返回 0 ,连接失败时候返回编码,例如:10061;
# 服务端属性
bind(address):将套接字绑定到地址,address地址的格式取决于地址族,在AF_INET下为元组以(host,port)的形式表示;
listen(backlog):开始监听客户端连接。backlog表示的是服务器拒绝(超过/proc/sys/net/core/somaxconn限制的数量)连接之前,操作系统可以挂起的最大连接数量,backlog也可以看作是排队的数量;
accept():接受连接并返回(conn,address),其中conn是新的套接字对象,可以用来接收和发送数据。address是连接客户端的地址;
sendfile(file,offset=0,count=None):表示是否需要在用户空间和内核空间来回拷贝,因为代码进入到内核态之后,需要先从磁盘上读到内核的内存空间,再从内核空间搬到应用程序的内存空间才能使用,因为内核空间是普通指令不能操作的,所以数据就从磁盘上,搬到内核的内存空间,然后又从内核内存空间复制到用户的内存空间,然后程序又要发送出去,然后用户内存空间的数据,又得复制到内核空间,然后通过缓冲区发送出去,这里面就来回好几次拷贝过程,如果设置sendfile,就可以直接将从磁盘上读取到的数据,直接发送出去,减少复制的过程,大大提高效率,但是windows不支持;
fileno():套接字的文件描述符;
recv(bufsize[,flag]):接受套接字的数据,默认阻塞。数据以编码后的字符串形式返回,bufsize指定最多可以接收多大的数量;
recvfrom(bufsize[.flag]):与recv()类似,但返回值是(data,address)。其中data是包含接收数据的字符串,address是发送数据的套接字地址;
send(string[,flag]):将string中的数据发送到连接的套接字。返回值是要发送的字节数量,该数量可能小于string的字节大小,可能未将指定内容全部发送;
sendall(string[,flag]):将string中的数据发送到连接的套接字,但在返回之前会尝试发送所有数据。成功返回None,失败则抛出异常,内部通过递归调用send,将所有内容发送出去;
sendto(string[,flag],address):将数据发送到套接字,address是形式为(ipaddr,port)的元组,指定远程地址。返回值是发送的字节数。该函数主要用于UDP协议;
close():关闭套接字;
# socket工作模式设定
getpeername():返回连接套接字的远程地址。返回值通常是元组(ipaddr,port);
getsockname():返回套接字自己的地址。通常是一个元组(ipaddr,port);
setblocking(flag):如果flag为0,则将套接字设置为非阻塞模式,否则为阻塞模式(默认值),非阻塞模式下,如果调用recv()没有发现任何数据,或者send()调用,无法立即发送数据,那么将引起socket.error异常;
settimeout(flag):设置套接字操作的超时期,timeout是一个浮点数,单位是秒,值为None表示没有超时时间,一般超时时间应该在刚创建套接字时设置;
setsocketopt(level,optname,value):设置套接字选项的值,比如缓冲区大小,等,不同操作系统都有不同版本;
基本使用
    针对TCP服务来讲,服务端需要监听在一个地址之上,等待接受客户端的请求,那么作为客户端来讲,它如果需要和服务端交互,也需要绑定在某个主机的某个端口之上,一般来讲,客户端的这个端口都是随机的,但是需要注意,这个端口地址,一般都是比较大的值,一般来讲,都是在1024和65535之间;
    当客户端找到一个随机端口之后,就有资格向服务端发起链接了,那么当服务端收到链接之后,会进行一个三次握手,那么在三次握手完成之后,accept函数就会立即返回,返回一个二元组,也就是说如果链接被允许,那么链接通道就会建立成功,链接通道建立成功之后,就会得到一个新的socket通道和客户端的信息;
import socket
server=socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) # 创建Socket对象
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # socket重用
server.bind(("127.0.0.1",8083))
server.listen()
new_socket,client_info=server.accept() # 返回一个新的socket对象和客户端信息
new_socket.send("我是你大爷".encode("utf8")) # 发送数据
result=new_socket.recv(1024).decode("gbk") # 接受数据
new_socket.close() # 关闭链接
print(result) # 你大爷
server.close()

群聊工具开发一
    可以看到,Socket作为互联网最基础的通信协议是非常强大的,我们可以通过Socket来实现简单端对端通信,并且实现信息交互,那么下面就来测试一下开发一款类似QQ的群聊软件,此次使用面向对象的方式来进行整个软件项目的开发,借助面向对象的方式为代码赋能;
import socket

class Server():
    def __init__(self,bind_ip="127.0.0.1",bind_port=8083):
        self.address=(bind_ip,bind_port)
        self.server=socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    def start(self):
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # socket重用,解决地址被占用问题
        self.server.bind(self.address) # 监听在指定的IP和端口之上
        self.server.listen(5) # 最大并发连接数为5
        new_socket,client_info=self.server.accept() # 阻塞,直至用户链接
        while True: # 循环收发消息
            message=new_socket.recv(1024).decode("gbk").rstrip() # 阻塞,等待信息
            if message:
                print(message)
            else: # 如果消息为空,就关闭通信链接
                new_socket.close()
                break
    def __exit__(self, exc_type, exc_val, exc_tb): # 退出时关闭socket server主链接
        self.server.close()
s1=Server()
s1.start()
群聊工具开发二
    可以看到上述的代码,虽然实现了一对一的循环聊天,但是这代码也存在一个问题,就是每次代码在执行到new_socket.recv(1024)的时候,都会阻塞主线程,在一般情况下,主线程一般都不会负责具体的事情的,具体消息接受与发送都是子线程来实现的,所以,我们加入多线程的概念对此进行优化;
import socket,threading

class Server():
    def __init__(self,bind_ip="127.0.0.1",bind_port=8083):
        self.address=(bind_ip,bind_port)
        self.server=socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    def create_conn(self,socket_channel):
        while True: # 循环收发消息
            message=socket_channel.recv(1024).decode("gbk").rstrip() # 阻塞,等待信息
            if message:
                print(message)
            else:
                socket_channel.close()
                break
    def start(self):
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # socket重用
        self.server.bind(self.address)
        self.server.listen()
        new_socket,client_info=self.server.accept()
        threading.Thread(target=self.create_conn,args=(new_socket,)).start()
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.server.close()
s1=Server()
s1.start()
    可以看到,我们在原有的基础之上新加入了多线程来实现消息的循环发送,这样不仅解决了阻塞主线程的问题,同时给高并发也带来了很好的用户体验,一个用户接入,就开启一个新的线程,让它们在后台去运行;
群聊工具开发三
    上述代码,虽然实现了多线程循环收发消息,但是,仔细分析代码,我们可以看到,其实上述代码,作为Server端,无法接受多个用户的链接,也就是说这个工具只能有一个客户端,其主要原因就是每次执行到start方法,之后开启了一个线程去处理这个线程的事物之后整个start方法的生命周期就结束了,所以,为了解决这个问题,我们需要让self.server.accept()方法也循环起来,那么同时为了提升更高的并发效率,我们将等待用户链接的accept函数,单独放到一个进程里面,使得它能够有一个CPU核心的处理能去处理用户的链接,使其有更高的处理效率;
import socket,threading,multiprocessing

class Server():
    def __init__(self,bind_ip="127.0.0.1",bind_port=8083):
        self.address=(bind_ip,bind_port)
        self.server=socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    def socket_accept(self):
        while True: # 不断接受用户的链接
            new_socket,client_info=self.server.accept()
            threading.Thread(target=self.create_conn,args=(new_socket,client_info)).start()
    def create_conn(self,socket_channel,client):
        while True: # 循环收发消息
            message=socket_channel.recv(1024).decode("gbk").rstrip() # 阻塞,等待信息
            if message:
                print("%s:%s:%s"%(*client,message))
            else:
                socket_channel.close()
                break
    def start(self):
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # socket重用
        self.server.bind(self.address)
        self.server.listen()
        return multiprocessing.Process(target=self.socket_accept).start()
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.server.close()
s1=Server()
s1.start()
    其实网络编程是一项I/O密集型的任务,因为它有大量的等待I/O,网络I/O和磁盘I/O,再加上在整个Socket通信过程中,经常会阻塞,如accept和recv,所以,他们特别适合使用多线程,使用了多线程之后,如果出现阻塞的情况,那么CPU就不调度就行了;
群聊工具开发四
    上述代码虽然解决了一对一单聊的各种问题,但是还是没能实现我们最终的目的,即群聊,那么群聊的基本实现思路实际上很简单,因为每个TCP链接的建立都会创建两个socket通道,第一个socket通道为建立链接的通道,当创建出来了第二个链接之后就关闭,第二个socket通道为数据交互通道,它主要是用于通信的一个socket,所以我们可以将这个socket给存储到一个数据结构里面,一旦有人发送群聊消息,那么就给这个数据结构里面所有的已建立链接的socket发送消息;
import socket, threading, multiprocessing

class Server():
    def __init__(self, bind_ip="127.0.0.1", bind_port=8083):
        self.address = (bind_ip, bind_port)
        self.server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.clients = [] # 该列表主要用于实现群聊,存储已建立链接的所有客户端
    def socket_accept(self):
        while True:  # 不断接受用户的链接
            new_socket, client_info = self.server.accept()
            self.clients.append(new_socket)
            threading.Thread(target=self.create_conn, args=(new_socket, client_info)).start()

    def create_conn(self, socket_channel, client):
        while True:  # 循环收发消息
            message = socket_channel.recv(1024).decode("gbk").rstrip()  # 阻塞,等待信息
            self.group_recv(message) # 一旦有人发送消息,就调用群聊方法
            if message: # 判断是否异常断开,异常断开message为空
                print("%s:%s:%s" % (*client, message))
            else:
                socket_channel.close()
                self.clients.remove(socket_channel) # 如果socket通道已关闭,需要及时将它移除
                break

    def group_recv(self, message):
        for sock in self.clients: # 给所有的已建立链接的客户端发送群聊信息
            sock.send(message.encode())

    def start(self):
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # socket重用
        self.server.bind(self.address)
        self.server.listen()
        return multiprocessing.Process(target=self.socket_accept).start()

    def __exit__(self, exc_type, exc_val, exc_tb):
        for client_socket in self.clients:
            client_socket.close()
        self.server.close()

s1 = Server()
s1.start()
    需要注意的是,客户端可能会异常断开,所以我们需要捕获到这个异常断开,一旦某一个socket已和server断开,那么就需要将这个socket在这个数据结构里面移除,否则,群聊循环发送消息时,会抛出异常,因为它会给一个已断开链接的socket发送群聊消息;
    所以我们需要在客户端主动断开或者异常断开时,告知服务端,来关闭客户端的链接,和在数据结构里面删除这个socket对象,主动断开就不说了,这是可以和服务端协商的,那么异常断开,服务端其实会收到一条信息,这条信息是一个空串,所以我们可以利用这个空串来判断客户端是否异常断开;
    并且,我们上面使用的数据结构是list,在移除的时候使用的是remove,那么当我们客户端数量巨大的时候,使用remove来进行移除元素,可想而知,性能是机器底下的,所以我们还需要做一个改进,将其数据结构改为字典;
群聊工具开发五
    由于GIL的存在,内建数据结构是原子性的,也就是,我们单独操作某一项,给它k/v赋值,没有问题,但是在遍历的过程是线程不安全的,遍历中可能被打断,因为其他线程如果对字典进行增加、弹出,都会影响字典的size,就会抛出异常,所以还是要加Lock来解决一下线程安全问题,如下;
import socket, threading, multiprocessing

class Server():
    def __init__(self, bind_ip="127.0.0.1", bind_port=8083):
        self.address = (bind_ip, bind_port)
        self.server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.clients = {}  # 线程不安全
        self.lock = threading.Lock() # 加锁,对self.clients加入线程安全

    def socket_accept(self):
        while True: # 循环等待客户链接
            new_socket, client_info = self.server.accept()
            with self.lock:
                self.clients[client_info] = new_socket
            threading.Thread(target=self.create_conn, args=(new_socket, client_info)).start()

    def create_conn(self, socket_channel, client):
        while True:  # 循环收发消息
            message = socket_channel.recv(1024).decode("gbk").rstrip()
            self.group_recv(message)
            if message:
                print("%s:%s:%s" % (*client, message))
            else:
                socket_channel.close()
                with self.lock: # 加锁,对self.clients加入线程安全
                    self.clients.pop(client)  # 当当前客户端断开时,从容器中移除
                break

    def group_recv(self, message):
        with self.lock: # 加锁,对self.clients加入线程安全
            for sock in self.clients.values():
                sock.send(message.encode())

    def start(self):
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # socket重用
        self.server.bind(self.address)
        self.server.listen()
        threading.Thread(target=self.socket_accept).start()

    def stop(self):
        with self.lock: # 加锁,对self.clients加入线程安全
            for client_socket in self.clients.values(): # 迭代或者close可能会有点慢,所以我们其实也可以先在加锁之前先拿到所有的values,然后再遍历
                client_socket.close()
            self.clients.clear()
        self.server.close()

s1 = Server()
s1.start()

while True:
    choice=input("a: ").rstrip()
    if choice == 'q':
        s1.stop()
        break
  • 注意:此代码在结束时,你会发现很难结束,那是因为这个程序里面开启了多个子线程,并且子线程都是non-daemon线程,主进程提前,子进程并不会结束,主进程会一直等待子进程退出之后,才结束,所以要解决这种问题,就需要将线程设置为daemon=True;

MakeFile

    在程序的世界里面,一切接文件,我们可以将socket对象设置为一个like文件对象,然后对它进行read和write操作,也就是说将,recv和read操作变成了read和write,而且我们操作的东西已经变成了文本了,再也不是bytes了,具体使用不在此赘述,自行了解;

TCP客户端编程

    对于TCP的客户端编程也比较简单,它无需创建一个监听的Socket链接,只需要通过connect方法链接到服务端即可,然后就可以使用这个socket对象来完成数据交互,那么在交互过程中呢,也建议加入多线程的技术,因为socket的recv方法它是会阻塞主线程的,所以建议将其放在一个线程里面,直接让操作系统去阻塞这个线程,至到收到服务端发来消息之后,它才从阻塞态转为运行态;
import socket,threading

class client():
    def __init__(self,server_ip="127.0.0.1",server_port=8083):
        self.client=socket.socket()
        self.address=server_ip,server_port
        self.event=threading.Event()
    def send(self,messgae):
        self.client.send(messgae.encode())
    def recv(self):
        while not self.event.is_set(): # 默认为False
            try:
                data=self.client.recv(1024).decode()
                if data:
                    print("\n%s"%data)
                else:
                    break
            except Exception as e:
                print(e)
                break
    def interaction(self):
        while True:
            message=input("请输入消息:")
            if message.lower() == "q" or message == "quit":
                self.stop()
                break
            self.send(message)
    def start(self):
        self.client.connect(self.address)
        self.send("hello server")
        threading.Thread(target=self.recv).start()
    def stop(self):
        self.send("q")
        self.event.set()
        self.client.close()
c1=client()
c1.start()
c1.interaction()

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注