本文介绍: 在上面的例子中,服务器端处理客户端请求串行的,也就是一个请求一个请求处理,如果有多个客户端同时连接,那么处理效率就很低。在前面例子中,socket.send/recv方法都是阻塞等待的,这种阻塞IO的操作会大大降低系统吞吐量。在 socket 编程中,建立长连接的主要概念是让客户端服务器之间的连接保持打开状态,而不是在每次通信后都关闭连接。模块是 Python用于实现 I/O 多路复用一个模块,它提供了对底层选择器selector)的抽象封装以便更方便地进行非阻塞式的 I/O 操作

HTTP协议

HTTP 是一种用于传输超文本例如 HTML)的应用层协议。它是基于请求响应模型的,客户端发送请求服务器返回响应。HTTP 使用 TCP 作为传输层协议。在 Python 中,有一些内置模块用于处理 HTTP 请求响应例如 http.serverurllib

Socket

Socket 是一种通信机制,允许运行不同计算机上的进程之间进行通信。它是网络编程的基础,允许数据网络上传输。Socket 提供了一种统一编程接口,使得程序员能够使用相似方式进行网络通信,而不管底层网络协议细节什么
在 Python 中,可以使用 socket 模块创建 TCP/UDP 服务器客户端

UDP

UDP 是一种面向无连接的协议,它不提供数据可靠性有序性,但具有延迟的优势。UDP 适用于实时通信,如音频视频流。与 TCP 不同,UDP 不需要建立连接,数据包直接从源发送目的地没有握手确认过程

一个简单例子

服务器端
import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server_socket.bind(('localhost', 8080))

# 处理收到数据
while True:
    data, addr = server_socket.recvfrom(1024)
    print(data.decode('utf-8'))
    # 处理数据
    # ...
客户端
import socket

client_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client_socket.sendto(b'Hello, Server!', ('localhost', 8080))

TCP

TCP 是一种面向连接的、可靠的、基于字节流的协议。在 TCP 连接中,通信的两端分别是客户端服务器。TCP 提供了数据完整性和可靠性,确保数据按照发送顺序到达目的地,并且不会发生丢失或损坏。TCP 使用三次握手建立连接,通过序列号确认号进行数据传输最后通过四次握手终止连接。

一个简单例子

在 Socket 编程中,有两种主要的角色服务器客户端服务器负责监听响应来自客户端的请求,而客户端则发送请求并等待服务器响应

服务器端
import socket

# 创建 socket 对象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# ip地址
host = '127.0.0.1'

# 设置端口号
port = 8089

# 绑定地址和端口
server_socket.bind((host, port))

# 监听最多5个连接
server_socket.listen(5)

while True:
    # 建立客户端连接
    client_socket, addr = server_socket.accept()
    print('连接地址:', addr)

    # 发送消息客户
    message = '欢迎访问服务器!'
    client_socket.send(message.encode('utf-8'))

    # 关闭连接
    client_socket.close()
客户
import socket

# 创建 socket 对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# ip地址
host = '127.0.0.1'

# 设置端口号
port = 8089

# 连接服务
client_socket.connect((host, port))

# 接收服务消息
message = client_socket.recv(1024)
print(message.decode('utf-8'))

# 关闭连接
client_socket.close()

并行处理客户端请求

在上面的例子中,服务器端处理客户端请求是串行的,也就是一个请求一个请求的处理,如果有多个客户端同时连接,那么处理效率就很低。一种简单有效的处理办法是,把请求处理逻辑放到线程池中去,这样就达到了同时处理多个客户端请求的目的。
优化后的服务器端代码

import socket
from concurrent.futures import ThreadPoolExecutor

# 服务配置
HOST = 'localhost'
PORT = 8089
MAX_WORKERS = 5  # 线程最大线程


def handle_client(client_socket, addr):
    """
    处理单个客户端连接的函数
    """
    try:
        message = '欢迎访问服务器!'
        client_socket.send(message.encode('utf-8'))
    except Exception as e:
        print(f"Error handling client {addr}: {e}")
    finally:
        # 关闭连接
        print(f"Closing connection from {addr}")
        client_socket.close()


def start_server():
    """
    启动服务器
    """
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((HOST, PORT))
    server_socket.listen(5)

    # 创建线程
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        try:
            while True:
                client_socket, addr = server_socket.accept()
                print(f"Accepted connection from {addr}")

                # 使用线程池处理客户端连接
                executor.submit(handle_client, client_socket, addr)
        except KeyboardInterrupt:
            print("Server shutting down.")
        finally:
            # 关闭服务器套接字
            server_socket.close()


if __name__ == "__main__":
    start_server()

长连接

在 socket 编程中,建立长连接的主要概念是让客户端和服务器之间的连接保持打开状态,而不是在每次通信后都关闭连接。对于高并发的应用来说,减少三次握手频率可以大大提高系统吞吐量

服务器端代码

import socket
from concurrent.futures import ThreadPoolExecutor

MAX_WORKERS = 5  # 线程池最大线程

def start_server():
    # 创建一个 TCP/IP socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 绑定到特定地址和端口
    server_address = ('localhost', 8089)
    server_socket.bind(server_address)

    # 开始监听连接
    server_socket.listen(5)
    print(f"Server listening on {server_address}")


	# 创建线程
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        try:
            while True:
                client_socket, addr = server_socket.accept()
                print(f"Accepted connection from {addr}")

                # 使用线程池处理客户端连接
                executor.submit(handle_client, client_socket, addr)
        except KeyboardInterrupt:
            print("Server shutting down.")
        finally:
            # 关闭服务器套接字
            server_socket.close()


def handle_client(client_socket, client_address):
    try:
        while True:
            data = client_socket.recv(1024)
            if not data:
                break  # 连接关闭时退出循环
            # 处理接收到的数据
            message = data.decode('utf-8')
            print(f"Received data from {client_address}: {message}")

            # 这里可以根据需要处理数据,并回复客户端
            response = "Server received your message."
            client_socket.send(response.encode('utf-8'))
    except Exception as e:
        print(f"Error handling client {client_address}: {e}")
    finally:
        # 关闭连接
        print(f"Closing connection from {client_address}")
        client_socket.close()

if __name__ == "__main__":
    start_server()

客户端代码

import socket

# 创建 socket 对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 获取主机名
host = '127.0.0.1'

# 设置端口号
port = 8089

# 连接服务器
client_socket.connect((host, port))

data = input(">")
while data != 'exit':
    # 发送数据
    client_socket.send(data.encode('utf-8'))
    # 接收数据
    message = client_socket.recv(1024)
    print(message.decode('utf-8'))
    data = input(">")

# 关闭连接
client_socket.close()

阻塞IO

非阻塞 I/O 允许程序等待数据到达时继续执行其他任务,而不必一直等待数据到来。在前面例子中,socket.send/recv方法都是阻塞等待的,这种阻塞IO的操作会大大降低系统吞吐量

import socket
import select
from concurrent.futures import ThreadPoolExecutor


def handle_client(sock, clients):
    data = sock.recv(1024)
    if data:
        print(f"Received data from {sock.getpeername()}: {data.decode('utf-8')}")
        # 这里可以根据需要处理数据,并回复客户端
        response = "Server received your message."
        sock.send(response.encode('utf-8'))
    else:
        # 连接关闭
        print(f"Closing connection from {sock.getpeername()}")
        clients.remove(sock)
        sock.close()


# 创建非阻塞 socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 8088))
server_socket.listen(5)
server_socket.setblocking(False)  # 设置为非阻塞

# 存储客户端连接的列表
clients = [server_socket]
executor = ThreadPoolExecutor(5)
while True:
    # 使用 select 检查是否准备好的套接字
    readable, _, _ = select.select(clients, [], [], 1)

    for sock in readable:
        if sock == server_socket:
            # 有新连接
            client_socket, client_address = server_socket.accept()
            print(f"Accepted connection from {client_address}")
            client_socket.setblocking(False)  # 设置为非阻塞
            clients.append(client_socket)
        else:
            # 有数据可读
            executor.submit(handle_client, sock, clients)

    # 可以处理其他任务,但是要注意,不能耗时太久,否则socket无法得到及时处理
    # 其他任务

这个例子中,select 函数用于监视一组套接字,返回准备好的套接字列表。主循环通过检查可读的套接字列表判断是否有新的连接到达或者是否有数据可读。由于select方法是非阻塞的,那么在while True循环里面,除了处于可读的socket之外,还可以处理其他任务。

基于selectors库的IO多路复用

前面例子中,使用select库来实现非阻塞,Python内置库 selectors我们封装操作系统底层的IO多路复用模型,提供了更便捷的使用方式
selectors 模块是 Python 中用于实现 I/O 多路复用的一个模块,它提供了对底层选择器selector)的抽象封装以便更方便地进行非阻塞式的 I/O 操作selectors 模块在 Python 3.4 及以上版本可用

I/O 多路复用是一种通过一个线程同时监听多个文件描述符机制,以提高程序的并发性能selectors 模块支持多种底层选择器,包括 selectpollepoll(仅限 Linux)、kqueue(仅限 BSD 和 macOS)等。

以下是一个简单的例子,演示如何使用 selectors 模块进行非阻塞的 Socket 通信:

import socket
import selectors
from concurrent.futures import ThreadPoolExecutor

# 创建默认选择器
selector = selectors.DefaultSelector()
executor = ThreadPoolExecutor(5)

def accept(sock, mask):
    conn, addr = sock.accept()
    print(f"Accepted connection from {addr}")
    conn.setblocking(False)
    selector.register(conn, selectors.EVENT_READ, read)

def handle_client(sock):
    data = sock.recv(1024)
    if data:
        print(f"Received data from {sock.getpeername()}: {data.decode('utf-8')}")
        # 这里可以根据需要处理数据,并回复客户端
        response = "Server received your message."
        sock.send(response.encode('utf-8'))
    else:
        # 连接关闭
        print(f"Closing connection from {sock.getpeername()}")
        selector.unregister(sock)
        sock.close()

def read(sock, mask):
    executor.submit(handle_client, sock)

# 创建服务器套接字
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('localhost', 8089))
server_socket.listen(5)
server_socket.setblocking(False)  # 设置为非阻塞

# 注册服务器套接字,监听连接事件
selector.register(server_socket, selectors.EVENT_READ, accept)

try:
    while True:
        events = selector.select()  # 阻塞,直到有事件发生
        for key, mask in events:
            callback = key.data
            callback(key.fileobj, mask)
finally:
    selector.close()
    server_socket.close()
    executor.shutdown()

这个例子中,使用了 selectors.DefaultSelector() 创建了默认选择器,并注册了服务器套接字,监听连接事件。当有连接事件发生时,会调用 accept 函数。当连接建立后,又注册了连接套接字,监听读事件调用 read 函数

这个例子中的关键是使用了非阻塞的套接字和 selectors 模块,使得程序能够同时处理多个连接,而不会阻塞在某个连接的 I/O 操作上。

asyncio 实现非阻塞IO

asyncio基于 selectors 模块,但 asyncio 提供了更高层次的抽象,使得编写异步代码更为方便。selectors 模块通常在 asyncio 的底层被使用,而 asyncio 在其基础上添加协程、任务和事件循环等概念,使得异步编程更加直观和易用。
selectors 模块提供了底层的 I/O 多路复用机制,而 asyncio 库则构建在其之上,提供了更高层次的异步编程框架,方便开发者编写异步代码
asyncio通过事件循环(Event Loop)和回调函数来实现异步编程使得一个线程能够同时处理多个连接,而不会阻塞其他操作

import asyncio


async def handle_client(reader, writer):
    data = await reader.read(100)
    if not data:
        print("Closing the connection")
        writer.close()
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message} from {addr}")

    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()


async def main():
    server = await asyncio.start_server(
        handle_client, '127.0.0.1', 8089)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()


asyncio.run(main())

在上面的示例中,asyncio.start_server 创建了一个异步服务器,而 handle_client 函数是一个异步的处理客户端连接的方法await 关键字用于等待异步操作完成

终级boss:socketserver库

socketserver 模块是 Python 中用于编写网络服务器的高级模块。它提供了一组基于套接字的服务器类,使得开发者能够轻松地创建各种类型网络服务器,包括支持多线程、多进程异步多路复用不同模型的服务器。

import socketserver


class MyTCPHandler(socketserver.BaseRequestHandler):
    def handle(self):
        addr = self.client_address
        print(f"Accepted connection from {addr}")

        try:
            data = self.request.recv(1024)
            if not data:
                break 
            message = data.decode('utf-8')
            print(f"Received data from {addr}: {message}")
            response = f"Server received your message:{message}"
            self.request.sendall(response.encode('utf-8'))
        except Exception as e:
            print(f"Error handling client {addr}: {e}")
        finally:
            print(f"Closing connection from {addr}")


if __name__ == "__main__":
    HOST, PORT = "localhost", 8089

    # 创建多线程的 TCP 服务器
    server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)

    try:
        print(f"Server listening on {HOST}:{PORT}")
        server.serve_forever()
    except KeyboardInterrupt:
        print("Server shutting down.")
        server.shutdown()
  • socketserver.BaseRequestHandler :一个基础的请求处理器类,用于处理客户端的请求。开发者需要继承这个类并实现自己handle 方法,该方法会在每个客户端连接时被调用,用于处理客户端的请求。BaseRequestHandler 提供了访问客户端套接字、客户端地址等信息属性
  • socketserver.TCPServer :一个基本的 TCP 服务器类,它继承socketserver.BaseServerTCPServer 用于创建基于 TCP 的服务器,可以通过指定地址和端口来监听客户端的连接请求。
  • socketserver.UDPServer :一个基本的 UDP 服务器类,它继承socketserver.BaseServerUDPServer 用于创建基于 UDP 的服务器,同样可以通过指定地址和端口来监听客户端的连接请求。
  • socketserver.ThreadingMixInsocketserver.ForkingMixIn :用于实现多线程和多进程的 Mixin 类。通过将它们与 TCPServerUDPServer 结合使用,可以实现多线程或多进程的服务器。
  • socketserver.ThreadingTCPServersocketserver.ForkingTCPServer :已经混合了多线程和多进程功能的 TCP 服务器类。它们继承TCPServer,并使用了 ThreadingMixInForkingMixIn
  • socketserver.UnixStreamServersocketserver.UnixDatagramServer :用于创建基于 Unix 域套接字的服务器类,分别用于处理流式和数据报式的连接。

原文地址:https://blog.csdn.net/weixin_39987031/article/details/134689078

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_45056.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注