支持持续接收数、可发送数据、可多端口连接。
废话少说,直接上代码!
如果写的可以,记得点个赞~
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
@Slf4j
@AllArgsConstructor
public class TcpIpService {
// 保存消息数据的容器,会单独开一个线程进行监听里面是否有数据然后进行业务处理
//(我设置的最大存放个数,视需求而定)
public static ArrayBlockingQueue<HashMap> messageQueue = new ArrayBlockingQueue<HashMap>(10);
/**
* 开始加载监听Tcp/Ip端口
* @throws IOException
*/
public static void ServerSocketD(int portNumber) throws IOException {
MQMonitor mqMonitor = new MQMonitor();
mqMonitor.start();
InterlinkageMonitorServer thread = new InterlinkageMonitorServer(portNumber);
thread.start();
}
/**
* 发送消息
* @param data
* @return
* @throws IOException
*/
public static ReturnDataState giveOrder(String data) throws IOException, InterruptedException {
//向客户端发送消息
try {
log.info("发:"+data);
ServerThread.outputStream = ServerThread.socket.getOutputStream();
ServerThread.outputStream.write(data.getBytes("GBK"));
} catch (IOException e) {
e.printStackTrace();
}
return new ReturnDataState(0, "");
}
}
/**
* 监听用户链接
*/
@Slf4j
class InterlinkageMonitorServer extends Thread {
//监听端口
private static int PORT = 0;
public static Socket socket;
/**
* @param portNumber:端口号
*/
public InterlinkageMonitorServer(int portNumber){
this.PORT = portNumber;
}
@SneakyThrows
public void run(){
log.info("TcpIp消息:>>>>>>>>>>>>>>> 开始监听用户链接 <<<<<<<<<<<<<");
ServerSocket serverSocket = null;
try {
//建立服务器的 Socket,并设定一个监听的端口 PORT
if (PORT<1024){
log.error("TcpIp消息:监听的端口数值不能为0或小于1024");
}else if (PORT > 65535){
log.error("TcpIp消息:监听的端口数值不能大于65535");
}
serverSocket = new ServerSocket(PORT);
//由于需要进行循环监听,因此获取消息的操作应放在一个 while 大循环中
while(true){
try {
//建立跟客户端的连接
socket = serverSocket.accept();
} catch (Exception e) {
log.info("TcpIp消息:建立与客户端的连接出现异常");
e.printStackTrace();
}
if (socket != null){
log.info("TcpIp消息:>>>>>>>>>>>>>>> 有客户端链接,开启新线程 <<<<<<<<<<<<<");
//注:视需求做,我们的需求是就没什么人链接,就新开了一个线程做处理
MessageServerThread thread = new MessageServerThread(socket);
thread.start();
socket = null;
}
// 不能让他跑的太快,需要在一定程度上让他跑慢点(视需求而定)
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
serverSocket.close();
}
}
}
/**
* 每一个链接上的,都会单独进行监听消息
*/
@Slf4j
class MessageServerThread extends Thread {
public static Socket socket ;
InputStream inputStream;
public MessageServerThread(Socket socket){
this.socket=socket;
}
public void run(){
try {
//接收客户端的消息并打印
inputStream = socket.getInputStream();
byte[] bytes = new byte[5120];
while (inputStream.read(bytes) != -1){
//解决乱码的问题
String string = new String(bytes, "GB2312");
//解决 byte 数组为空或者填不满的问题
HashMap originalData = JSON.parseObject(string.trim(), HashMap.class);
log.info("收:"+originalData.toString());
// 消息存入
produce(originalData);
//推送后清空数组
bytes = new byte[5120];
}
} catch (Exception e) {
log.error("WebSocket消息:客户端的主动断开连接了,关闭线程");
}
//操作结束,关闭socket
try{socket.close();}catch(IOException e){ log.error("WebSocket消息:关闭连接出现异常"); }
}
// 生产消息
public static void produce(HashMap msg) {
if (Server.messageQueue.offer(msg)) {
log.info("MQ消息:成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + Server.messageQueue.size());
} else {
log.info("MQ消息:消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
}
log.info("=======================");
}
}
/**
* 收到数据后的业务处理
*/
@Slf4j
class MQMonitor extends Thread {
public MQMonitor() {}
@SneakyThrows
public void run(){
while (true) {
// 取出收到的值
HashMap<String, Object> consume = consume();
while (!ObjectUtils.isEmpty(consume)) {
log.info("MQ消息:开始消化数据");
// 业务内容。。。。。
break;
}
// 不为空时会快速对数据进行处理
if (TcpIpService.messageQueue.size() == 0)
Thread.sleep( 1000 );
}
}
// 消费消息
public static HashMap consume() {
HashMap msg = Server.messageQueue.poll();
if (msg != null) {
// 消费条件满足情况,从消息容器中取出一条消息
log.info("MQ消息:已经消费消息:" + msg + ",当前暂存的消息数量是:" + Server.messageQueue.size());
} else {
log.info("MQ消息:消息处理中心内没有消息可供消费!");
}
log.info("=======================");
return msg;
}
}
原文地址:https://blog.csdn.net/qq_38382365/article/details/134715529
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.7code.cn/show_18271.html
如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱:suwngjj01@126.com进行投诉反馈,一经查实,立即删除!
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。