支持持续接收数、可发送数据、可多端口连接
废话少说,直接上代码
如果写的可以,记得点个赞~

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;

@Slf4j
@AllArgsConstructor
public class TcpIpService {

    // 保存消息数据容器,会单独开一个线程进行监听里面是否数据然后进行业务处理
    //(我设置最大存放个数,视需求而定)
    public static ArrayBlockingQueue<HashMap> messageQueue = new ArrayBlockingQueue<HashMap>(10);

    /**
     * 开始加载监听Tcp/Ip端口
     * @throws IOException
     */
    public static void ServerSocketD(int portNumber) throws IOException {
        MQMonitor mqMonitor = new MQMonitor();
        mqMonitor.start();

        InterlinkageMonitorServer thread = new InterlinkageMonitorServer(portNumber);
        thread.start();
    }

    /**
     * 发送消息
     * @param data
     * @return
     * @throws IOException
     */
    public static ReturnDataState giveOrder(String data) throws IOException, InterruptedException {
        //向客户端发送消息
        try {
            log.info("发:"+data);
            ServerThread.outputStream = ServerThread.socket.getOutputStream();
            ServerThread.outputStream.write(data.getBytes("GBK"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new ReturnDataState(0, "");
    }
}

/**
 * 监听用户链接
 */
@Slf4j
class InterlinkageMonitorServer extends Thread {
    //监听端口
    private static int PORT = 0;

    public static Socket socket;

    /**
     * @param portNumber:端口号
     */
    public InterlinkageMonitorServer(int portNumber){
        this.PORT = portNumber;
    }

    @SneakyThrows
    public void run(){
        log.info("TcpIp消息:>>>>>>>>>>>>>>> 开始监听用户链接 <<<<<<<<<<<<<");
        ServerSocket serverSocket = null;
        try {
            //建立服务器的 Socket,并设定一个监听端口 PORT
            if (PORT<1024){
                log.error("TcpIp消息:监听的端口数值不能为0或小于1024");
            }else if (PORT > 65535){
                log.error("TcpIp消息:监听的端口数值不能大于65535");
            }
            serverSocket = new ServerSocket(PORT);
            //由于需要进行循环监听,因此获取消息的操作应放在一个 while循环while(true){
                try {
                    //建立跟客户端连接
                    socket = serverSocket.accept();
                } catch (Exception e) {
                    log.info("TcpIp消息:建立与客户端连接出现异常");
                    e.printStackTrace();
                }
                if (socket != null){
                    log.info("TcpIp消息:>>>>>>>>>>>>>>> 有客户端链接,开启线程 <<<<<<<<<<<<<");
                    //注:视需求做,我们需求是就没什么人链接,就新开了一个线程处理
                    MessageServerThread thread = new MessageServerThread(socket);
                    thread.start();
                    socket = null;
                }
                // 不能让他跑的太快,需要在一定程度上让他跑慢点(视需求而定)
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            serverSocket.close();
        }
    }
}

/**
 * 每一个链接上的,都会单独进行监听消息
 */
@Slf4j
class MessageServerThread extends Thread {
    public static Socket socket ;
    InputStream inputStream;

    public  MessageServerThread(Socket socket){
        this.socket=socket;
    }
    public void run(){
        try {
            //接收客户端的消息并打印
            inputStream = socket.getInputStream();
            byte[] bytes = new byte[5120];

            while (inputStream.read(bytes) != -1){
                //解决乱码问题
                String string = new String(bytes, "GB2312");
                //解决 byte 数组为空或者填不满的问题
                HashMap originalData = JSON.parseObject(string.trim(), HashMap.class);
                log.info("收:"+originalData.toString());

                // 消息存入
                produce(originalData);

                //推送清空数组
                bytes = new byte[5120];
            }
        } catch (Exception e) {
            log.error("WebSocket消息:客户端的主动断开连接了,关闭线程");
        }
        //操作结束关闭socket
        try{socket.close();}catch(IOException e){ log.error("WebSocket消息:关闭连接出现异常"); }
    }

    // 生产消息
    public static void produce(HashMap msg) {
        if (Server.messageQueue.offer(msg)) {
            log.info("MQ消息:成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + Server.messageQueue.size());
        } else {
            log.info("MQ消息:消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
        }
        log.info("=======================");
    }
}

/**
 * 收到数据后的业务处理
 */
@Slf4j
class MQMonitor extends Thread {

    public MQMonitor() {}

    @SneakyThrows
    public void run(){
        while (true) {
            // 取出收到的值
            HashMap<String, Object> consume = consume();
            while (!ObjectUtils.isEmpty(consume)) {
                log.info("MQ消息:开始消化数据");
                // 业务内容。。。。。
                break;
            }
            // 不为空时会快速数据进行处理
            if (TcpIpService.messageQueue.size() == 0)
                Thread.sleep( 1000 );
        }
    }

    // 消费消息
    public static HashMap consume() {
        HashMap msg = Server.messageQueue.poll();
        if (msg != null) {
            // 消费条件满足情况,从消息容器中取出一条消息
            log.info("MQ消息:已经消费消息:" + msg + ",当前暂存的消息数量是:" + Server.messageQueue.size());
        } else {
            log.info("MQ消息:消息处理中心没有消息可供消费!");
        }
        log.info("=======================");
        return msg;
    }
}

原文地址:https://blog.csdn.net/qq_38382365/article/details/134715529

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_18271.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注