最近入职了一家新公司,依旧是物联网行业,不过没用之前的Mqtt协议进行跟DTU交互,使用的TCP进行交互的,之前一直理解的TCP就是根据IP地址与端口号,就可以直接发送数据了,当来公司一段时间后,发现了新大陆,使用Netty这种异步非阻塞的高性能框架进行长链接
Netty 封装了 Java 原生 NIO 的复杂性(如 ByteBuffer 操作、Selector 空轮询、并发问题等),提供了简洁易用的 API,同时保留了 NIO 的高性能特性,是目前 Java 生态中最主流的网络通信框架
使用 Netty 的核心原因是:它解决了 Java 原生网络编程的痛点,在「高性能、易用性、可靠性、扩展性」上做到了极致平衡,让开发者无需关注底层复杂细节,就能快速构建稳定、高并发的网络应用
@Component
public class NettyServer {
private static final Logger log = LoggerFactory.getLogger(NettyServer.class);
@Autowired
private ServerChannelInitializer serverChannelInitializer;
public static Map<String, ChannelHandlerContext> ctxMap = new ConcurrentHashMap();
public static Map<String, ChannelHandlerContext> meterMap = new ConcurrentHashMap();
public static Map<String, ChannelHandlerContext> waterMap = new ConcurrentHashMap();
public static Map<String, CountDownLatch> latchMap = new ConcurrentHashMap();
}
监听消息
/**
* Netty TCP服务端核心处理器
* 功能:
* 1. 处理DTU/网关设备的TCP连接(连接建立/断开)
* 2. 向DTU发送指定十六进制指令(36 03 00 06 00 44 04 88)
* 3. 接收设备上行数据(心跳/业务报文),解析并更新设备状态
* 4. 心跳检测(空闲超时断开),维护设备-通道映射关系
*
* @ChannelHandler.Sharable:标记处理器可被多个Channel共享(需保证线程安全)
* @Component:Spring托管,支持依赖注入
*/
@ChannelHandler.Sharable
@Component
public class ServerHandler extends ChannelInboundHandlerAdapter {
// Redis操作模板(临时存储报文片段)
@Autowired
private StringRedisTemplate stringRedisTemplate;
// 注:0x88超过byte正数范围,强制转换为byte(二进制值正确)
private static final byte[] DTU_COMMAND = {
(byte) 0x36, (byte) 0x03, (byte) 0x00, (byte) 0x06,
(byte) 0x00, (byte) 0x21, (byte) 0x61, (byte) 0x94
};
/**
* 【核心回调】Channel激活时触发(DTU/网关成功连接服务端)
* 执行逻辑:记录连接日志 → 发送预设指令给DTU
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 打印DTU客户端的远程地址(IP+端口)
log.info("DTU客户端已连接:{}", ctx.channel().remoteAddress());
// 向DTU发送预设指令
sendDtuCommand(ctx);
// 调用父类方法(保留Netty默认逻辑)
super.channelActive(ctx);
}
/**
* 向DTU发送指令的核心方法
*
* @param ctx 通道上下文(持有DTU的TCP连接通道)
*/
public void sendDtuCommand(ChannelHandlerContext ctx) {
// 前置检查:通道是否激活(未断开)且可写(缓冲区未满)
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
// 1. 将指令字节数组封装为Netty的ByteBuf(非堆内存,高性能)
ByteBuf cmdBuf = Unpooled.copiedBuffer(DTU_COMMAND);
// 2. 异步发送指令(write+flush确保立即刷出缓冲区)
ctx.writeAndFlush(cmdBuf)
// 监听发送结果(异步回调)
.addListener(future -> {
if (future.isSuccess()) {
log.info("指令发送成功:{}", DTU_COMMAND);
} else {
log.error("指令发送失败:{}", future.cause().getMessage());
// 发送失败时关闭通道(触发channelInactive)
ctx.channel().close();
}
});
} else {
log.error("通道不可用(未激活/不可写),无法发送指令");
}
}
/**
* 【核心回调】Channel失效时触发(DTU/网关断开连接)
* 执行逻辑:更新设备离线状态 → 移除通道映射 → 记录日志
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
}
/**
* 【核心回调】接收设备上行数据时触发(DTU/网关向服务端发送报文)
* 处理逻辑:
* 1. 数据格式化(去空格/转大写)
* 2. 按端口区分设备类型(电表/水表)
* 3. 解析心跳/注册报文,维护设备-通道映射
* 4. 解析业务报文,转发给专用处理器
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 原始报文转字符串(msg为Netty的ByteBuf,toString()默认按UTF-8解析)
String bufStr = msg.toString();
// 格式化:去首尾空格 + 转大写(统一格式)
String message = bufStr.trim().toUpperCase();
// 打印原始心跳消息日志
log.info("收到的原始心跳消息:{}", bufStr);
}
/**
* 【异常回调】通道发生异常时触发(如报文解析失败/网络异常)
* 执行逻辑:打印异常栈 → 记录设备故障日志
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace(); // 打印异常栈(调试用)
String gatewayNo = getNo(ctx);
log.info("设备【{}】数据解析故障", gatewayNo);
// 注:可补充异常后关闭通道/重试等逻辑
}
/**
* 【空闲检测回调】Netty IdleStateHandler触发空闲事件时执行
* 执行逻辑:读空闲(长时间未收设备数据)→ 判定设备离线 → 关闭通道
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
// 判定是否为空闲状态事件
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
// 读空闲(READER_IDLE):服务端长时间未收到设备上行数据
}
}
/**
* 工具方法:根据Channel上下文获取设备编号(网关SN)
* 逻辑:按端口区分设备类型 → 遍历对应映射表 → 匹配Channel → 返回设备SN
*/
public static String getNo(ChannelHandlerContext ctx) {
// 提取服务端监听端口
String port = ctx.pipeline().channel().localAddress().toString().split(":")[1];
}
}
@Component
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Autowired
private ServerHandler serverHandler;
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast("decoder", new MyDecoder());
channel.pipeline().addLast("encoder", new StringEncoder());
channel.pipeline().addLast(new IdleStateHandler(300L, 0L, 0L, TimeUnit.SECONDS));
channel.pipeline().addLast(serverHandler);
}
}
评论