2025年SpringBoot搭建Netty+Socket+Tcp服务端和客户端

SpringBoot搭建Netty+Socket+Tcp服务端和客户端一 服务端 1 启动类 package com idc config netty import io netty bootstrap ServerBootst import io netty channel ChannelFutur import io netty channel ChannelOptio import io netty

大家好,我是讯享网,很高兴认识大家。

一: 服务端 

1: 启动类

package com.idc.config.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; / * @description: netty服务启动类 / @Slf4j @Component public class NettyServer { public void start(InetSocketAddress address) { //配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) // 绑定线程池 .channel(NioServerSocketChannel.class) .localAddress(address) .childHandler(new NettyServerChannelInitializer())//编码解码 .option(ChannelOption.SO_BACKLOG, 128); //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝 // .childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制 // 绑定端口,开始接收进来的连接 ChannelFuture future = bootstrap.bind(address).sync(); log.info("ODF-Socket------netty服务器开始监听端口:" + address.getPort()); //关闭channel和块,直到它被关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } 

讯享网

2: 处理程序

讯享网package com.idc.config.netty; import com.idc.config.udpsocket.UdpServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; / * @description: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器 / public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8)); channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8)); channel.pipeline().addLast(new NettyServerHandler()); } } 
package com.idc.config.netty; import com.idc.common.exception.CommonRuntimeException; import com.idc.entity.odf.dto.LightingStatus; import com.idc.mapper.OdfAlarmMapper; import com.idc.mapper.OdfMapper; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; / * @author wcybaonier * @description: netty服务端处理类 / @Slf4j @Component public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Resource private OdfMapper odfMapper; / * 管理一个全局map,保存连接进服务端的通道数量 */ private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); / * @param ctx * @DESCRIPTION: 有客户端连接服务器会触发此函数 * @return: void */ @Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); int clientPort = insocket.getPort(); //获取连接通道唯一标识 ChannelId channelId = ctx.channel().id(); System.out.println(); //如果map中不包含此连接,就保存连接 if (CHANNEL_MAP.containsKey(channelId)) { log.info("ODF-Socket------客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size()); } else { //保存连接 CHANNEL_MAP.put(channelId, ctx); log.info("ODF-Socket------客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]"); log.info("ODF-Socket------连接通道数量: " + CHANNEL_MAP.size()); } } / * @param ctx * @DESCRIPTION: 有客户端终止连接服务器会触发此函数 * @return: void */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); ChannelId channelId = ctx.channel().id(); //包含此客户端才去删除 if (CHANNEL_MAP.containsKey(channelId)) { //删除连接 CHANNEL_MAP.remove(channelId); System.out.println(); log.info("ODF-Socket------客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]"); log.info("ODF-Socket------连接通道数量: " + CHANNEL_MAP.size()); } } / * @param ctx * @DESCRIPTION: 有客户端发消息会触发此函数 * @return: void */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg == null){ throw new CommonRuntimeException("ODF-Socket------加载客户端报文为空,请联系厂商!"); } log.info("ODF-Socket------加载客户端报文......【" + ctx.channel().id() + "】" + " :" + msg); / * 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 * 在这里可以设置异步执行 提交任务到该channel的taskQueue 中 */ ctx.channel().eventLoop().execute(() -> { String msgStr = String.valueOf(msg); // 如果不包含逗号, 那么格式不对 约定格式为 : 序列号,deviceid,shelfNo,moduleNo,termNo,state if (!msgStr.contains(",")){ throw new CommonRuntimeException("ODF-Socket------加载客户端报文格式不正确,请联系厂商!"); } try { String[] split = msgStr.split(","); if (split.length != 6){ throw new CommonRuntimeException("ODF-Socket------加载客户端报文长度不正确,请联系厂商!"); } //开始修改 admin LightingStatus lightingStatus = new LightingStatus(); lightingStatus.setSerialNumber(split[0]); lightingStatus.setDeviceId(split[1]); lightingStatus.setShelfNo(split[2]); lightingStatus.setModuleNo(split[3]); lightingStatus.setTermNo(split[4]); lightingStatus.setState(split[5]); int i = odfMapper.updateTermStatus(lightingStatus); log.info("ODF-Socket------亮灯状态更新条数......【" + i + "】" ); } catch (Exception e) { e.printStackTrace(); } }); / * 可以设置多个异步任务 * 但是这个会在上面异步任务执行完之后才执行 */ /*ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10*1000); log.info(">>>>>>>>>休眠二十秒"); } catch (InterruptedException e) { e.printStackTrace(); } } });*/ //响应客户端 log.info("ODF-Socket------服务端端返回报文......【" + ctx.channel().id() + "】" + " :" + msg); this.channelWrite(ctx.channel().id(), msg); } / * @param msg 需要发送的消息内容 * @param channelId 连接通道唯一id * @DESCRIPTION: 服务端给客户端发送消息 * @return: void */ public void channelWrite(ChannelId channelId, Object msg) throws Exception { ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); if (ctx == null) { log.info("ODF-Socket------通道【" + channelId + "】不存在"); return; } if (msg == null || msg == "") { log.info("ODF-Socket------服务端响应空的消息"); return; } //将客户端的信息直接返回写入ctx ctx.write(msg); //刷新缓存区 ctx.flush(); } public static void main(String[] args) { System.out.println("序列号,deviceid,shelfNo,moduleNo,termNo,state".split(",").length); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("ODF-Socket------Client: " + socketString + " READER_IDLE 读超时"); ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("ODF-Socket------Client: " + socketString + " WRITER_IDLE 写超时"); ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("ODF-Socket------Client: " + socketString + " ALL_IDLE 总超时"); ctx.disconnect(); } } } / * @param ctx * @DESCRIPTION: 发生异常会触发此函数 * @return: void */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(); ctx.close(); log.info("ODF-Socket------"+ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size()); //cause.printStackTrace(); } } 

3: 项目启动类

讯享网package com.idc; import com.idc.config.udpsocket.UdpServer; import lombok.extern.slf4j.Slf4j; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; import javax.annotation.Resource; / * @author wcybaonier */ @MapperScan("com.idc.mapper") @SpringBootApplication @Slf4j @EnableScheduling public class IdcPduApplication implements CommandLineRunner { @Value("${netty.host}") private String host; @Value("${netty.port}") private Integer port; @Resource private NettyServer nettyServer; public static void main(String[] args) { SpringApplication.run(IdcPduApplication.class, args); log.info("IdcPduApplication 启动成功!"); } / * netty服务启动 * @param args * @throws Exception */ @Override public void run(String... args) throws Exception { //tcp实现 InetSocketAddress address = new InetSocketAddress(host,port); log.info("neety服务器启动地址: "+host+":"+ port); nettyServer.start(address); } } 

yml配置: 

# 配置Netty通信IP和端口 netty: port: 7101 host: 127.0.0.1

 

完成,启动项目即可自动监听对应端口


讯享网

二: 客户端

1: 启动类

这里为了测试,写了Main方法,可以参考服务端,配置启动类 ,实现跟随项目启动

讯享网package com.ws.aa; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; / * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互) * * 核心文件(与服务端进行数据交互) * * 客户端 * * @author 小辰哥哥 */ public class SocketClient { // 服务端IP static final String HOST = System.getProperty("host", "134.95.3.134"); // 服务端开放端口 static final int PORT = Integer.parseInt(System.getProperty("port", "7101")); // 数据包大小 static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); // 日志打印 private static final Logger LOGGER = LoggerFactory.getLogger(SocketClient.class); // 主函数启动 public static void main(String[] args) throws InterruptedException { sendMessage("1,deviceid123,shelfNo123,moduleNo123,termNo123,state123"); } / * 核心方法(处理:服务端向客户端发送的数据、客户端向服务端发送的数据) * * @param content * @throws InterruptedException * @author 小辰哥哥 */ public static void sendMessage(String content) throws InterruptedException { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); p.addLast(new SocketHandler() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { LOGGER.debug("接收服务端发送过来的消息"); LOGGER.debug("服务端发送过来的数据:" + msg); // 主动与服务端断开连接(客户端触发) //ctx.channel().close(); } }); } }); ChannelFuture future = b.connect(HOST, PORT).sync(); future.channel().writeAndFlush(content); // 程序阻塞 future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } } 

2: 处理程序

package com.ws.aa; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.logging.SocketHandler; / * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互) * * 设置出站和入站的编码器和解码器(该方法在SocketClientConfig.java中被重写) * * 客户端 * * @author wcybaonier */ public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); p.addLast((ChannelHandler) new SocketHandler()); } } 
讯享网package com.ws.aa; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; / * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互) * * 初始化操作、接受服务端发送过来的消息(该方法在SocketClient.java中被重写) * * 客户端 * * @author wcybaonier */ public class SocketHandler extends ChannelInboundHandlerAdapter { // 日志打印 private static final Logger LOGGER = LoggerFactory.getLogger(SocketHandler.class); @Override public void channelActive(ChannelHandlerContext ctx) { LOGGER.debug("SocketHandler Active(客户端)"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { LOGGER.debug("接收服务端发送过来的消息"); LOGGER.debug("SocketHandler read Message:" + msg); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { LOGGER.debug("客户端断开连接"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } 

 

3: 项目启动类

......想写就参考服务端......

三: 测试,完成

有测试的,,,但是忘记截图了................

小讯
上一篇 2025-03-10 13:17
下一篇 2025-02-19 09:21

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/46370.html