本文介绍: NIO中Reactor模式的基本组成部分Selector(选择器):Selector是 Reactor 模式的核心,负责监听各个通道上的事件,如连接、接收、读取和写入事件。通过 Selector,一个单独的线程可以有效地管理多个通道,使得系统可以在一个线程内同时处理多个连接。Channel(通道):Channel 是数据的载体,可以是文件、套接字等。通道向 Selector 注册,告诉 Selector 哪些事件它关心,然后 Selector 将根据事件的发生情况通知对应的通道。

目录

原生JDK网络编程BIO

BIO通信模型服务端代码

BIO通信模型客户端代码

伪异步模型服务端代码(客户端跟之前一致)

原生JDK网络编程NIO

什么是NIO?

NIO和BIO的主要区别

阻塞与非阻塞IO

NIO之Reactor模式

NIO中Reactor模式的基本组成部分

NIO代码实现


原生JDK网络编程BIO

       BIO意为Blocking I/O,即阻塞的 I/O。在BIO中类 ServerSocket 负责绑定 IP 地址,启动监听端口,等待客户连接;客户端 Socket 类的实例发起连接操作,ServerSocket 接受连接后产生一个新的服务端socket实例负责和客户端 socket 实例通过输入和输出流进行通信。

BIO的阻塞,主要体现在两个地方:

1. 若一个服务器启动就绪,那么主线程就一直在等待着客户端的连接,这个等待过程中主线程就一直在阻塞。

2. 在连接建立之后,在读取到socket信息之前,线程也是一直在等待,一直处于阻塞的状态下的。

       传统BIO通信模型:采用 BIO 通信模型的服务端,通常由一个独立的 Acceptor 线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答模型,同时数据的读取写入也必须阻塞在一个线程内等待其完成。

             

BIO通信模型服务端代码

public class Server {

    public static void main(String[] args) throws IOException {
        //服务端启动必备
        ServerSocket serverSocket = new ServerSocket();
        //表示服务端在哪个端口上监听
        serverSocket.bind(new InetSocketAddress(10001));
        System.out.println("Start Server ....");
        try{
            while(true){
                new Thread(new ServerTask(serverSocket.accept())).start();
            }
        }finally {
            serverSocket.close();
        }
    }

    //每个和客户端的通信都会打包成一个任务,交个一个线程来执行
    private static class ServerTask implements Runnable{

        private Socket socket = null;
        public ServerTask(Socket socket){
            this.socket = socket;
        }

        @Override
        public void run() {
            //实例化与客户端通信的输入输出流
            try(ObjectInputStream inputStream =
                    new ObjectInputStream(socket.getInputStream());
                ObjectOutputStream outputStream =
                    new ObjectOutputStream(socket.getOutputStream())){

                //接收客户端的输出,也就是服务器的输入
                String userName = inputStream.readUTF();
                System.out.println("Accept client message:"+userName);

                //服务器的输出,也就是客户端的输入
                outputStream.writeUTF("Hello,"+userName);
                outputStream.flush();
            }catch(Exception e){
                e.printStackTrace();
            }finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

BIO通信模型客户端代码

public class Client {

    public static void main(String[] args) throws IOException {
        //客户端启动必备
        Socket socket = null;
        //实例化与服务端通信的输入输出流
        ObjectOutputStream output = null;
        ObjectInputStream input = null;
        //服务器的通信地址
        InetSocketAddress addr
                = new InetSocketAddress("127.0.0.1",10001);

        try{
            socket = new Socket();
            socket.connect(addr);//连接服务器
            System.out.println("Connect Server success!!");
            output = new ObjectOutputStream(socket.getOutputStream());
            input = new ObjectInputStream(socket.getInputStream());
            System.out.println("Ready send message.....");
            /*向服务器输出请求*/
            output.writeUTF("zhangsan");
            output.flush();

            //接收服务器的输出
            System.out.println(input.readUTF());
        }finally{
            if (socket!=null) socket.close();
            if (output!=null) output.close();
            if (input!=null) input.close();
        }
    }
}

       根据以上模型编写的代码最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程 个数和客户端并发访问数呈 1:1 的正比关系,Java 中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降。

       我们可以使用线程池来管理这些线程,使用 CachedThreadPool 线程池,其实除了能自动帮我们管理线程(复用),看起来也就像是 1:1 的客户 端:线程数模型。而使用 FixedThreadPool 我们就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N:M的伪异步 I/O 模型。

          

伪异步模型服务端代码(客户端跟之前一致)

public class ServerPool {

    private static ExecutorService executorService
            = Executors.newFixedThreadPool(
                    Runtime.getRuntime().availableProcessors());

    public static void main(String[] args) throws IOException {
        //服务端启动必备
        ServerSocket serverSocket = new ServerSocket();
        //表示服务端在哪个端口上监听
        serverSocket.bind(new InetSocketAddress(10001));
        System.out.println("Start Server ....");
        try{
            while(true){
                executorService.execute(new ServerTask(serverSocket.accept()));
            }
        }finally {
            serverSocket.close();
        }
    }

    //每个和客户端的通信都会打包成一个任务,交个一个线程来执行
    private static class ServerTask implements Runnable{

        private Socket socket = null;
        public ServerTask(Socket socket){
            this.socket = socket;
        }

        @Override
        public void run() {
            //实例化与客户端通信的输入输出流
            try(ObjectInputStream inputStream =
                    new ObjectInputStream(socket.getInputStream());
                ObjectOutputStream outputStream =
                    new ObjectOutputStream(socket.getOutputStream())){

                //接收客户端的输出,也就是服务器的输入
                String userName = inputStream.readUTF();
                System.out.println("Accept client message:"+userName);

                //服务器的输出,也就是客户端的输入
                outputStream.writeUTF("Hello,"+userName);
                outputStream.flush();
            }catch(Exception e){
                e.printStackTrace();
            }finally {
                try {
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

        以上这种实现正因为限制了线程数量,如果发生读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。


原生JDK网络编程NIO

什么是NIO?

       NIO库是在JDK 1.4中引入的。NIO弥补了原来的BIO的不足,它在标准Java代码中提供了高速的、面向块的I/O。NIO被称为no-blocking io或者new io都说得通。

NIO和BIO的主要区别

面向流与面向缓冲

       Java NIO 和 IO 之间第一个最大的区别是,IO 是面向流的,NIO 是面向缓冲区的。 Java IO 面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。 Java NIO 的缓冲导向方法略有不同。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

阻塞与非阻塞IO

        Java NIO 的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。线程通常将非阻塞 IO 的空闲时间用于在其它通道上执行 IO 操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。


NIO之Reactor模式

       Java NIO(New I/O)中使用了 Reactor 模式。 Reactor 模式是一种事件处理模式,它通过一个称为 Reactor 的中心调度器来响应输入事件,并将事件分发给相应的处理程序(也称为处理器或回调函数)进行处理。

NIO中Reactor模式的基本组成部分

Selector(选择器)
       Selector是 Reactor 模式的核心,负责监听各个通道上的事件,如连接、接收、读取和写入事件。通过 Selector,一个单独的线程可以有效地管理多个通道,使得系统可以在一个线程内同时处理多个连接。
Channel(通道)
        Channel 是数据的载体,可以是文件、套接字等。通道向 Selector 注册,告诉 Selector 哪些事件它关心,然后 Selector 将根据事件的发生情况通知对应的通道。
SelectionKey(选择键)
        当一个通道向 Selector 注册时,会创建一个 SelectionKey 对象。这个对象包含了通道和 Selector 之间的关联关系,以及通道感兴趣的事件(如OP_READ读、OP_WRITE写、OP_CONNECT连接、OP_ACCEPT接收连接事件)。当事件发生时,Selector 将通知对应的 SelectionKey,通过 SelectionKey 可以获取关联的通道和发生的事件类型。
Handler(处理器)
        处理器是用户编写的处理事件的逻辑,它通过回调函数的方式响应事件。当事件发生时,Reactor 模式调用处理器的回调函数来执行相应的业务逻辑。

       在Java NIO中,应用程序通过将通道注册到 Selector,然后在事件发生时由 Reactor 调用相应的处理器来处理。这种模型使得在单线程内可以有效地处理多个并发连接,提高了系统的性能和扩展性。


NIO代码实现

服务端

public class NioServer {
    private static NioServerHandle nioServerHandle;

    public static void main(String[] args){
        nioServerHandle = new NioServerHandle(8888);
        new Thread(nioServerHandle,"Server").start();
    }
}
public class NioServerHandle implements Runnable{

    private volatile boolean started;
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;

    /**
     * 构造方法
     * @param port 指定要监听的端口号
     */
    public NioServerHandle(int port) {
        try {
            /*创建选择器的实例*/
            selector = Selector.open();
            /*创建ServerSocketChannel的实例*/
            serverSocketChannel = ServerSocketChannel.open();

            /*设置通道为非阻塞模式*/
            serverSocketChannel.configureBlocking(false);
            /*绑定端口*/
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            /*注册事件,表示关心客户端连接*/
            serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

            started = true;
            System.out.println("服务器已启动,端口号:"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(started){
            try {
                /*获取当前有哪些事件*/
                selector.select(1000);
                /*获取事件的集合*/
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
                    如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
                    的键出现,这会导致我们尝试再次处理它。*/
                    iterator.remove();
                    handleInput(key);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /*处理事件的发生*/
    private void handleInput(SelectionKey key) throws IOException {
        if(key.isValid()){
            /*处理新接入的客户端的请求*/
            if(key.isAcceptable()){
                /*获取关心当前事件的Channel*/
                ServerSocketChannel ssc
                        = (ServerSocketChannel) key.channel();
                /*接受连接*/
                SocketChannel sc = ssc.accept();
                System.out.println("==========建立连接=========");
                sc.configureBlocking(false);
                /*关注读事件*/
                sc.register(selector,SelectionKey.OP_READ);
            }
            /*处理对端的发送的数据*/
            if(key.isReadable()){
                SocketChannel sc = (SocketChannel) key.channel();
                /*创建ByteBuffer,开辟一个缓冲区*/
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                /*从通道里读取数据,然后写入buffer*/
                int readBytes = sc.read(buffer);
                if(readBytes>0){
                    /*将缓冲区当前的limit设置为position,position=0,
                    用于后续对缓冲区的读取操作*/
                    buffer.flip();
                    /*根据缓冲区可读字节数创建字节数组*/
                    byte[] bytes = new byte[buffer.remaining()];
                    /*将缓冲区可读字节数组复制到新建的数组中*/
                    buffer.get(bytes);
                    String message = new String(bytes,"UTF-8");
                    System.out.println("服务器收到消息:"+message);
                    /*处理数据*/
                    String result = Const.response(message);
                    /*发送应答消息*/
                    doWrite(sc,result);

                }else if(readBytes<0){
                    /*取消特定的注册关系*/
                    key.cancel();
                    /*关闭通道*/
                    sc.close();
                }
            }
        }
    }

    /*发送应答消息*/
    private void doWrite(SocketChannel sc,String response) throws IOException {
        byte[] bytes = response.getBytes();
        ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
        buffer.put(bytes);
        buffer.flip();
        sc.write(buffer);
    }

    public void stop(){
        started = false;
    }
}

客户端

public class NioClient {
    private static NioClientHandle nioClientHandle;

    public static void start(){
        nioClientHandle = new NioClientHandle("127.0.0.1",8888);
        new Thread(nioClientHandle,"client").start();
    }
    //向服务器发送消息
    public static boolean sendMsg(String msg) throws Exception{
        nioClientHandle.sendMsg(msg);
        return true;
    }
    public static void main(String[] args) throws Exception {
        start();
        Scanner scanner = new Scanner(System.in);
        while(NioClient.sendMsg(scanner.next()));

    }
}
public class NioClientHandle implements Runnable{
    private String host;
    private int port;
    private volatile boolean started;
    private Selector selector;
    private SocketChannel socketChannel;

    public NioClientHandle(String ip, int port) {
        this.host = ip;
        this.port = port;

        try {
            /*创建选择器的实例*/
            selector = Selector.open();
            /*创建ServerSocketChannel的实例*/
            socketChannel = SocketChannel.open();
            /*设置通道为非阻塞模式*/
            socketChannel.configureBlocking(false);

            started = true;
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public void stop(){
        started = false;
    }
    @Override
    public void run() {
        try{
            doConnect();
        }catch(IOException e){
            e.printStackTrace();
            System.exit(1);
        }

        //循环遍历selector
        while(started){
            try{
                //无论是否有读写事件发生,selector每隔1s被唤醒一次
                selector.select(1000);
                //获取当前有哪些事件可以使用
                Set<SelectionKey> keys = selector.selectedKeys();
                //转换为迭代器
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while(it.hasNext()){
                    key = it.next();
                    /*我们必须首先将处理过的 SelectionKey 从选定的键集合中删除。
                    如果我们没有删除处理过的键,那么它仍然会在主集合中以一个激活
                    的键出现,这会导致我们尝试再次处理它。*/
                    it.remove();
                    try{
                        handleInput(key);
                    }catch(Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        //selector关闭后会自动释放里面管理的资源
        if(selector != null)
            try{
                selector.close();
            }catch (Exception e) {
                e.printStackTrace();
            }
    }

    //具体的事件处理方法
    private void handleInput(SelectionKey key) throws IOException{
        if(key.isValid()){
            //获得关心当前事件的channel
            SocketChannel sc = (SocketChannel) key.channel();
            //连接事件
            if(key.isConnectable()){
                if(sc.finishConnect()){
                    socketChannel.register(selector,
                        SelectionKey.OP_READ);}
                else System.exit(1);
            }
            //有数据可读事件
            if(key.isReadable()){
                //创建ByteBuffer,并开辟一个1M的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取请求码流,返回读取到的字节数
                int readBytes = sc.read(buffer);
                //读取到字节,对字节进行编解码
                if(readBytes>0){
                    //将缓冲区当前的limit设置为position,position=0,
                    // 用于后续对缓冲区的读取操作
                    buffer.flip();
                    //根据缓冲区可读字节数创建字节数组
                    byte[] bytes = new byte[buffer.remaining()];
                    //将缓冲区可读字节数组复制到新建的数组中
                    buffer.get(bytes);
                    String result = new String(bytes,"UTF-8");
                    System.out.println("客户端收到消息:" + result);
                }
                //链路已经关闭,释放资源
                else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }
            }
        }
    }

    private void doWrite(SocketChannel channel,String request)
            throws IOException {
        //将消息编码为字节数组
        byte[] bytes = request.getBytes();
        //根据数组容量创建ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        //将字节数组复制到缓冲区
        writeBuffer.put(bytes);
        //flip操作
        writeBuffer.flip();
        //发送缓冲区的字节数组
        /*关心事件和读写网络并不冲突*/
        channel.write(writeBuffer);
    }

    private void doConnect() throws IOException{
        /*非阻塞的连接*/
        if(socketChannel.connect(new InetSocketAddress(host,port))){
            socketChannel.register(selector,SelectionKey.OP_READ);
        }else{
            socketChannel.register(selector,SelectionKey.OP_CONNECT);
        }
    }

    //写数据对外暴露的API
    public void sendMsg(String msg) throws Exception{
        doWrite(socketChannel, msg);
    }
}

原文地址:https://blog.csdn.net/m0_61853556/article/details/135981787

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。

如若转载,请注明出处:http://www.7code.cn/show_67383.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注