package org.example.socket; import io.netty.channel.*; import org.example.background.service.MakeInfoService; import org.example.socket.utils.CommonConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author zjb * @since 2023-12-11 10:31:26 */ @Component @EnableAsync @ChannelHandler.Sharable public class BootNettySocketChannelInboundHandler extends ChannelInboundHandlerAdapter { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private MakeInfoService makeInfoService; private Map ctxMap = new ConcurrentHashMap(16); /** * 从客户端收到新的数据时,这个方法会在收到消息时被调用 */ @Override public void channelRead(ChannelHandlerContext ctx, Object frame) { // int[] ints = makeInfoService.sendElectricMessage("10.29.16.202", 25); String result = makeInfoService.convertPlaintext((byte[]) frame); ChannelFuture channelFuture1; if(result.equals("-1")){ logger.info("CLIENT:CRC校验错误,收到的数据不完整或错误"); System.out.println("CS校验对错误或者长度不符合预定,收到的数据不完整或错误"); }else { logger.info("CLIENT"+getClientIP(ctx)+":"+result); System.out.println("ip为"+getClientIP(ctx)+"的客户端发送:"+result); int[] ints = makeInfoService.detectingHeartbeat(result, getClientIP(ctx)); if(ints.length > 0) { //确认登录 channelFuture1 = ctx.channel().writeAndFlush(ints); channelFuture1.addListener((ChannelFutureListener) future2 -> { if (future2.isSuccess()) { sendEnergyMessages(ints, getClientIP(ctx)); System.out.println("ip为" + getClientIP(ctx) + "确认登录成功"); } else { System.out.println("ip为" + getClientIP(ctx) + "确认登录失败"); } }); }else{ Integer res = makeInfoService.convertTextToData(result, getClientIP(ctx)); int[] data2; switch (res) { case 25: //发送补充指令 发送第二条指令 data2 = makeInfoService.sendElectricMessage(getClientIP(ctx), 129); channelFuture1 = ctx.channel().writeAndFlush(data2); channelFuture1.addListener((ChannelFutureListener) future2 -> { if (future2.isSuccess()) { sendEnergyMessages(data2, getClientIP(ctx)); } else { logger.info("SERVER:第二条指令发送失败"); System.out.println("ip为" + getClientIP(ctx) + "的第二条指令发送失败"); } }); break; case 129: //发送补充指令 发送第二条指令 data2 = makeInfoService.sendElectricMessage(getClientIP(ctx), 131); channelFuture1 = ctx.channel().writeAndFlush(data2); channelFuture1.addListener((ChannelFutureListener) future2 -> { if (future2.isSuccess()) { sendEnergyMessages(data2, getClientIP(ctx)); } else { logger.info("SERVER:第三条指令发送失败"); System.out.println("ip为" + getClientIP(ctx) + "的第三条指令发送失败"); } }); break; default: break; } } } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 客户端断开连接的处理逻辑 System.out.println("客户端断开连接"); ctxMap.remove(getClientIP(ctx)); super.channelInactive(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端(重新)连接:" + ctx.channel().remoteAddress()); ctxMap.put(getClientIP(ctx),ctx); super.channelActive(ctx); } /** * 客户端与服务端 断连时执行 channelInactive方法之后执行 */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { ctxMap.remove(getClientIP(ctx)); super.channelUnregistered(ctx); } /** * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } @Scheduled(cron = "0 0 */1 * * *") //每小时的第0分钟第0秒 // @Scheduled(fixedRate = 10000) //10秒 // @Scheduled(fixedRate = 600000) //600秒 // @Scheduled(cron = "*/5 * * * * *") //每5秒 private void sendIntervals() { int[] data = makeInfoService.sendElectricMessage(CommonConfig.electric1, 25); ChannelHandlerContext ctx = ctxMap.get(CommonConfig.electric1); if(!StringUtils.isEmpty(ctx)) { ChannelFuture channelFuture = ctx.channel().writeAndFlush(data); // 添加监听器,检查发送是否成功 channelFuture.addListener((ChannelFutureListener) future1 -> { if(future1.isSuccess()) { sendEnergyMessages(data,CommonConfig.electric1); sendIntervals2(); }else{ logger.info("SERVER:第一条指令发送失败"); System.out.println("第一条指令发送失败"); } }); }else{ logger.info("SERVER:ip为"+CommonConfig.electric2+"的客户端不在线。无法传递指令"); System.out.println("ip为"+CommonConfig.electric1+"的客户端不在线。无法传递指令"); } } // @Scheduled(cron = "0 0 */1 * * *") //每小时的第0分钟第0秒 // @Scheduled(cron = "*/5 * * * * *") //每5秒 // @Scheduled(fixedRate = 10000) //10秒 private void sendIntervals2() { int[] data = makeInfoService.sendElectricMessage(CommonConfig.electric2, 25); ChannelHandlerContext ctx = ctxMap.get(CommonConfig.electric2); if(!StringUtils.isEmpty(ctx)) { ChannelFuture channelFuture = ctx.channel().writeAndFlush(data); // 添加监听器,检查发送是否成功 channelFuture.addListener((ChannelFutureListener) future1 -> { if(future1.isSuccess()) { sendEnergyMessages(data,CommonConfig.electric2); }else{ logger.info("SERVER:第一条指令发送失败"); System.out.println("第一条指令发送失败"); } }); }else{ logger.info("SERVER:ip为"+CommonConfig.electric2+"的客户端不在线。无法传递指令"); System.out.println("ip为"+CommonConfig.electric2+"的客户端不在线。无法传递指令"); } } public void sendEnergyMessages(int [] data,String ip){ StringBuffer stringBuffer = new StringBuffer(); for(int num = 0; num < data.length; num++){ if (num == data.length - 1) { stringBuffer.append(Integer.toHexString(data[num])); }else { stringBuffer.append(Integer.toHexString(data[num])+" "); } } System.out.println("成功发送指令:"+stringBuffer); logger.info("SERVER:成功发送指令:"+stringBuffer); } public String getClientIP(ChannelHandlerContext ctx){ String ip = ctx.channel().remoteAddress().toString(); return ip.substring(1, ip.indexOf(":")); } }