This commit is contained in:
zhangjinbang 2023-12-25 17:25:10 +08:00
commit 28e36cb4d2
20 changed files with 1143 additions and 0 deletions

113
iot-energy/pom.xml Normal file
View File

@ -0,0 +1,113 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
<relativePath />
</parent>
<groupId>org.example</groupId>
<artifactId>iot-energy</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.70</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>com.ghgande</groupId>
<artifactId>j2mod</artifactId>
<version>2.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包成一个可执行jar -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.1.1.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,30 @@
package org.example;
import org.example.socket.BootNettySocketServerThread;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@MapperScan({"org.example.background.dao"})
@EnableScheduling //允许自动调用
public class IotApplication implements CommandLineRunner {
@Autowired
private BootNettySocketServerThread socketServerThread;
public static void main(String[] args) {
SpringApplication app = new SpringApplication(IotApplication.class);
app.run(args);
}
@Override
public void run(String... args) throws Exception {
socketServerThread.start();
}
}

View File

@ -0,0 +1,36 @@
package org.example.background.condition;
import org.apache.ibatis.jdbc.SQL;
import org.example.background.entity.EnergyDataEntity;
import org.example.background.utils.FieldUtils;
import org.springframework.cglib.beans.BeanMap;
public class EnergyDataCondition {
public String insert(EnergyDataEntity data){
SQL sql = new SQL();
sql.INSERT_INTO("energy_data");
BeanMap beanMap = BeanMap.create(data);
for (Object key : beanMap.keySet()) {
Object val = beanMap.get(key);
if (val != null) {
sql.VALUES(FieldUtils.camelToLine(key + ""), "#{" + key + "}");
}
}
return sql.toString();
}
public String updateByPrimaryKey(EnergyDataEntity data) {
SQL sql = new SQL();
sql.UPDATE("energy_data");
BeanMap beanMap = BeanMap.create(data);
for (Object key : beanMap.keySet()) {
Object val = beanMap.get(key);
if (val != null) {
sql.SET(FieldUtils.camelToLine(key + "") + "=#{" + key + "}");
}
}
sql.WHERE("energy_id=#{energyId}");
return sql.toString();
}
}

View File

@ -0,0 +1,20 @@
package org.example.background.dao;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.UpdateProvider;
import org.example.background.condition.EnergyDataCondition;
import org.example.background.entity.EnergyDataEntity;
public interface EnergyDataDao {
@InsertProvider(type = EnergyDataCondition.class, method = "insert")
Integer insert(EnergyDataEntity data);
@Select("SELECT * FROM energy_data WHERE device_Id = #{deviceId} AND acquisition_time = #{acquisitionTime}")
EnergyDataEntity select(EnergyDataEntity data);
@UpdateProvider(type = EnergyDataCondition.class, method = "updateByPrimaryKey")
Integer updateByPrimaryKey(EnergyDataEntity data);
}

View File

@ -0,0 +1,69 @@
package org.example.background.entity;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.io.Serializable;
import java.util.Date;
@Data
public class EnergyDataEntity implements Serializable{
private Long energyId;
private String deviceId;
//@ApiModelProperty(value = "瞬时流量")
private Float instantaneousDelivery;
//@ApiModelProperty(value = "瞬时热量")
private Float instantHeat;
//@ApiModelProperty(value = "正累积流量")
private Float positiveTotalFlow;
//@ApiModelProperty(value = "负累积流量")
private Float negativeTotalFlow;
//@ApiModelProperty(value = "净累积流量")
private Float cumulativeTraffic;
//@ApiModelProperty(value = "供水温度")
private Float supplyWaterTemperature;
//@ApiModelProperty(value = "回水温度")
private Float returnWaterTemperature;
//@ApiModelProperty(value = "流体速度")
private String fluidVelocity;
//@ApiModelProperty(value = "测量流体声速")
private String fluidVelocitySound;
//@ApiModelProperty(value = "正累积热量")
private Float positiveCumulativeHeat;
//@ApiModelProperty(value = "负累积热量")
private Float negativeCumulativeHeat;
//@ApiModelProperty(value = "净累积热量")
private Float netCumulativeHeat;
//@ApiModelProperty(value = "今天累积流量")
private Float accumulatedTrafficDay;
//@ApiModelProperty(value = "本月累积流量")
private Float accumulatedTrafficMonth;
//@ApiModelProperty(value = "今年累积流量")
private Float accumulatedTrafficYear;
//@ApiModelProperty(value = "入库时间")
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
//@ApiModelProperty(value = "整点采集时间")
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private Date acquisitionTime;
}

View File

@ -0,0 +1,198 @@
package org.example.background.service;
import com.ghgande.j2mod.modbus.util.ModbusUtil;
import org.example.background.dao.EnergyDataDao;
import org.example.background.entity.EnergyDataEntity;
import org.example.utils.IdService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* author zjb
* 服务端发送指令给下位
*/
@Component
public class MakeInfoService {
@Resource
private EnergyDataDao energyDataDao;
@Resource
private IdService idService;
private Lock lock = new ReentrantLock();
//读取0001-0100的数据共100个寄存器 能量表
public int[] sendEnergyMessage () {
int [] data = new int[8];
data[0] = 0x01;
data[1] = 0x03;
data[2] = 0x00;
data[3] = 0x00;
data[4] = 0x00;
data[5] = 0x64;
String result = Integer.toHexString(getCrc(data));
data[6] = Integer.parseInt(result.substring(2, 4), 16);
data[7] = Integer.parseInt(result.substring(0, 2), 16);
return data;
}
//读取1529的数据共2个寄存器 获取设备编号 能量表
public int[] sendEnergyGetDevId () {
int [] data = new int[8];
data[0] = 0x01;
data[1] = 0x03;
data[2] = 0x05;
data[3] = 0xF9;
data[4] = 0x00;
data[5] = 0x01;
String result = Integer.toHexString(getCrc(data));
data[6] = Integer.parseInt(result.substring(2, 4), 16);
data[7] = Integer.parseInt(result.substring(0, 2), 16);
return data;
}
//读取0101-0150的数据共50个寄存器 获取设备编号 能量表
public int[] sendEnergyGetDevId50 () {
int [] data = new int[8];
data[0] = 0x01;
data[1] = 0x03;
data[2] = 0x00;
data[3] = 0x64;
data[4] = 0x00;
data[5] = 0x32;
String result = Integer.toHexString(getCrc(data));
data[6] = Integer.parseInt(result.substring(2, 4), 16);
data[7] = Integer.parseInt(result.substring(0, 2), 16);
return data;
}
//获取指令最后两位(CRC校验位)
public int getCrc (int [] bytes) {
int crc = 0xFFFF; //初始化寄存器
int polynomial = 0xA001; // 多项式A001对应的二进制数
for (int i = 0; i < bytes.length - 2; i++) {
crc ^= bytes[i];
for (int j = 0; j < 8; j++) {
if ((crc & 0x0001) == 1) {
crc >>= 1;
crc ^= polynomial;
} else {
crc >>= 1;
}
}
}
return crc;
}
//响应报文转明文
public String convertPlaintext (byte [] data) {
//校验响应的数据是否完整且正确 crc校验
int[] ints = ModbusUtil.calculateCRC(data, 0, data.length - 2);
if(Integer.toHexString(ints[0]).equals(Integer.toHexString(data[data.length - 2] & 0xFF)) && Integer.toHexString(ints[1]).equals(Integer.toHexString(data[data.length - 1] & 0xFF))){
//将寄存器值转换为可读的明文
return ModbusUtil.toHex(data);
}else{
return "-1";
}
}
//能量表解析 解析0000-0100的响应数据共100个寄存器的数据 16进制(浮点数) 转10进制 IEEE-754标准浮点数计算方法 浮点数 3412
public Integer convertTextToData (String param,String clientIp) {
lock.lock();
Integer result = 0;
String[] data = param.split(" ");
if(data[2].equals("C8")) {
result = energy1_100(data, clientIp);
}else if(data[2].equals("64")) {
energy101_150(data,clientIp);
}else {
System.out.println("客户端发送的指令不符合预期");
}
lock.unlock();
return result;
}
public Integer energy1_100(String [] data,String clientIp){
Integer dataCount = Integer.parseInt(data[2], 16); //数据数量
EnergyDataEntity energyDataEntity = new EnergyDataEntity();
energyDataEntity.setEnergyId(idService.gen());
energyDataEntity.setDeviceId(clientIp.replace(".",""));
energyDataEntity.setInstantaneousDelivery(hexToFloat(data[5] + data[6] + data[3] + data[4])); //瞬时流量
energyDataEntity.setInstantHeat(hexToFloat(data[9] + data[10] + data[7] + data[8])); //瞬时热量
energyDataEntity.setFluidVelocity(String.valueOf(hexToFloat(data[13] + data[14] + data[11] + data[12]))); //流体速度
energyDataEntity.setFluidVelocitySound(String.valueOf(hexToFloat(data[17]+data[18]+data[15]+data[16]))); //测量流体声速
energyDataEntity.setPositiveTotalFlow((Long.parseLong(data[21] + data[22] + data[19] + data[20],16) + hexToFloat(data[25] + data[26] + data[23] + data[24]))); //正累计流量
energyDataEntity.setCumulativeTraffic((Long.parseLong(data[53] + data[54] + data[51] + data[52],16) + hexToFloat(data[57] + data[58] + data[55] + data[56]))); //累计流量
energyDataEntity.setNegativeTotalFlow((hexToLongWithSigned(data[27] ,data[28] , data[29] , data[30]) + hexToFloat(data[33] + data[34] + data[31] + data[32]))); //负累计流量
energyDataEntity.setPositiveCumulativeHeat((Long.parseLong(data[37]+data[38]+data[35]+data[36],16) + hexToFloat(data[41]+data[42]+data[39]+data[40]))); //正累计热量
energyDataEntity.setNegativeCumulativeHeat((hexToLongWithSigned(data[43],data[44],data[45],data[46]) + hexToFloat(data[49]+data[50]+data[47]+data[48]))); //负累计热量
energyDataEntity.setNetCumulativeHeat((Long.parseLong(data[61]+data[62]+data[59]+data[60],16) + hexToFloat(data[65]+data[66]+data[63]+data[64])));
energyDataEntity.setSupplyWaterTemperature(hexToFloat(data[69] + data[70] + data[67] + data[68])); //供水温度
energyDataEntity.setReturnWaterTemperature(hexToFloat(data[73] + data[74] + data[71] + data[72])); //回水温度
energyDataEntity.setCreateTime(new Date());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:00:00");
try {
energyDataEntity.setAcquisitionTime(formatter.parse(formatter.format(new Date())));
}catch (ParseException e) {
e.printStackTrace();
}
Integer insert = energyDataDao.insert(energyDataEntity);
System.out.println("已入库");
return insert;
}
public void energy101_150(String [] data, String clientIp) {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:00:00");
Integer dataCount = Integer.parseInt(data[2], 16); //数据数量
EnergyDataEntity entity = new EnergyDataEntity();
entity.setDeviceId(clientIp.replace(".",""));
try {
entity.setAcquisitionTime(formatter.parse(formatter.format(new Date())));
}catch (ParseException e) {
e.printStackTrace();
}
EnergyDataEntity energyDataEntity = energyDataDao.select(entity);
energyDataEntity.setAccumulatedTrafficDay((Long.parseLong(data[77]+data[78]+data[75]+data[76],16) + hexToFloat(data[81]+data[82]+data[79]+data[80])));
energyDataEntity.setAccumulatedTrafficMonth((Long.parseLong(data[85]+data[86]+data[83]+data[84],16) + hexToFloat(data[89]+data[90]+data[87]+data[88])));
energyDataEntity.setAccumulatedTrafficYear((Long.parseLong(data[93]+data[94]+data[91]+data[92],16) + hexToFloat(data[97]+data[98]+data[95]+data[96])));
energyDataDao.updateByPrimaryKey(energyDataEntity);
System.out.println("已经填补剩余数据");
}
public static float hexToFloat(String hex) {
byte[] bytes = new byte[4];
for (int i = 0; i < 4; i++) {
bytes[i] = (byte) Integer.parseInt(hex.substring(i * 2, i * 2 + 2), 16);
}
return ByteBuffer.wrap(bytes).getFloat();
}
//有符号型16进制转long
public static long hexToLongWithSigned(String a1,String a2,String a3,String a4) {
Integer [] arr = new Integer[2];
arr[0] = Integer.parseInt(a1+a2, 16);
arr[1] = Integer.parseInt(a3+a4, 16);
Arrays.sort(arr);
String hexStr = Integer.toHexString(arr[1]) + Integer.toHexString(arr[0]);
long unsignedValue = Long.parseUnsignedLong(hexStr, 16);
String substring = Long.toBinaryString(unsignedValue).substring(0, 1);
if(substring.equals("0")){
unsignedValue = 0;
} else if(substring.equals("1")){
unsignedValue = (long) -(Math.pow(2,32) - unsignedValue);
}else if(substring.equals("0")){
unsignedValue = (long) (Math.pow(2,32) - unsignedValue);
}
return unsignedValue;
}
}

View File

@ -0,0 +1,52 @@
package org.example.background.utils;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class FieldUtils {
private static Pattern linePattern = Pattern.compile("_(\\w)");
/**
* 下划线转驼峰
*/
public static String lineToCamel(String str) {
str = str.toLowerCase();
Matcher matcher = linePattern.matcher(str);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, matcher.group(1).toUpperCase());
}
matcher.appendTail(sb);
return sb.toString();
}
private static Pattern humpPattern = Pattern.compile("[A-Z]");
/**
* 驼峰转下划线,效率比上面高
*/
public static String camelToLine(String str) {
Matcher matcher = humpPattern.matcher(str);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase());
}
matcher.appendTail(sb);
return sb.toString();
}
/**
* 驼峰转下划线,效率比上面高
*/
public static String camelToLines(String str) {
str = str.substring(0, 1).toLowerCase() + str.substring(1);
Matcher matcher = humpPattern.matcher(str);
StringBuffer sb = new StringBuffer();
while (matcher.find()) {
matcher.appendReplacement(sb, "_"+ matcher.group(0).toLowerCase());
}
matcher.appendTail(sb);
return sb.toString();
}
}

View File

@ -0,0 +1,25 @@
package org.example.druid;
public enum DBTypeEnum {
/**
* 主库
*/
MASTER("master"),
/**
* 从库
*/
SLAVE("slave");
private final String value;
DBTypeEnum(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}

View File

@ -0,0 +1,33 @@
package org.example.druid;
public class DbContextHolder {
//使用ThreadLocal防止线程安全问题
private static final ThreadLocal CONTEXT_HOLDER = new ThreadLocal();
/**
* 设置数据源
*
* @param dbTypeEnum 数据库类型
*/
public static void setDbType(DBTypeEnum dbTypeEnum) {
CONTEXT_HOLDER.set(dbTypeEnum.getValue());
}
/**
* 取得当前数据源
*
* @return dbType
*/
public static String getDbType() {
return (String) CONTEXT_HOLDER.get();
}
/**
* 清除上下文数据
*/
public static void clearDbType() {
CONTEXT_HOLDER.remove();
}
}

View File

@ -0,0 +1,46 @@
package org.example.druid;
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
/**
* druid 配置属性
*/
@Configuration
public class DruidProperties
{
@Bean
@ConfigurationProperties("spring.datasource.dynamic.datasource.primary")
public DataSource primaryDataSource()
{
return DruidDataSourceBuilder.create().build();
}
// @Bean
// @ConfigurationProperties(prefix = "spring.datasource.slave")
// public DataSource slaveDataSource(){
// return new DruidDataSourceBuilder().build();
// }
@Bean
@Primary
public DataSource multipleDataSource(DataSource masterDataSource){
DynamicDataSource multipleDataSource = new DynamicDataSource();
Map<Object, Object> dataSources = new HashMap<>();
dataSources.put(DBTypeEnum.MASTER.getValue(), masterDataSource);
// dataSources.put(DBTypeEnum.SLAVE.getValue(), slaveDataSource);
multipleDataSource.setTargetDataSources(dataSources);
multipleDataSource.setDefaultTargetDataSource(masterDataSource);
return multipleDataSource;
}
}

View File

@ -0,0 +1,10 @@
package org.example.druid;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DbContextHolder.getDbType();
}
}

View File

@ -0,0 +1,138 @@
package org.example.socket;
import com.ctc.wstx.util.StringUtil;
import io.netty.channel.*;
import org.example.background.service.MakeInfoService;
import org.example.utils.CommonConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author zjb
* @since 2023-12-11 10:31:26
*/
@Component
@EnableAsync
@ChannelHandler.Sharable
public class BootNettySocketChannelInboundHandler extends ChannelInboundHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@Resource
private MakeInfoService makeInfo;
private Map<String, ChannelHandlerContext> ctxMap = new ConcurrentHashMap<String, ChannelHandlerContext>(16);
/**
* 从客户端收到新的数据时这个方法会在收到消息时被调用
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object frame) {
// int[] ints = makeInfo.sendEnergyGetDevId50();
String result = makeInfo.convertPlaintext((byte[]) frame);
if(result.equals("-1")){
System.out.println("CRC校验错误收到的数据不完整或错误");
}else{
System.out.println("客户端发送:"+result);
Integer res = makeInfo.convertTextToData(result, getClientIP(ctx));
if(res > 0) {
//发送补充指令
//发送第二条指令
int[] data2 = makeInfo.sendEnergyGetDevId50();
ChannelFuture channelFuture1 = ctx.channel().writeAndFlush(data2);
channelFuture1.addListener((ChannelFutureListener) future2 -> {
if (future2.isSuccess()) {
sendEnergyMessages(data2);
} else {
System.out.println("第二条指令发送失败");
}
});
}
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 客户端断开连接的处理逻辑
System.out.println("客户端断开连接");
ctxMap.remove(getClientIP(ctx));
super.channelInactive(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端(重新)连接:" + ctx.channel().remoteAddress());
ctxMap.put(getClientIP(ctx),ctx);
super.channelActive(ctx);
}
/**
* 客户端与服务端 断连时执行 channelInactive方法之后执行
*/
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctxMap.remove(getClientIP(ctx));
super.channelUnregistered(ctx);
}
/**
* 当出现 Throwable 对象才会被调用即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// ctxMap.remove(getClientIP(ctx));
super.exceptionCaught(ctx, cause);
}
@Scheduled(cron = "0 0 */1 * * *") //每小时的第0分钟第0秒
// @Scheduled(fixedRate = 10000) //10秒
private void sendIntervals() {
int[] data = makeInfo.sendEnergyMessage();
ChannelHandlerContext ctx = ctxMap.get(CommonConfig.energy1);
if(!StringUtils.isEmpty(ctx)) {
ChannelFuture channelFuture = ctx.channel().writeAndFlush(data);
// 添加监听器检查发送是否成功
channelFuture.addListener((ChannelFutureListener) future1 -> {
if(future1.isSuccess()) {
sendEnergyMessages(data);
}else{
System.out.println("第一条指令发送失败");
}
});
}else{
System.out.println("ip为"+CommonConfig.energy1+"的客户端不在线。无法传递指令");
}
}
public void sendEnergyMessages(int [] data){
System.out.println("成功发送指令:");
for(int num = 0; num < data.length; num++){
if (num == data.length - 1) {
System.out.print(Integer.toHexString(data[num]));
System.out.print("\n");
}else {
System.out.print(Integer.toHexString(data[num]) + " ");
}
}
}
public String getClientIP(ChannelHandlerContext ctx){
String ip = ctx.channel().remoteAddress().toString();
return ip.substring(1, ip.indexOf(":"));
}
}

View File

@ -0,0 +1,75 @@
package org.example.socket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.example.utils.MyDecoder;
import org.example.utils.MyEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author zjb
* @since 2023-12-11 10:39:00
*/
@Component
public class BootNettySocketServer {
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
@Autowired
private BootNettySocketChannelInboundHandler bootNettySocketChannelInboundHandler;
/**
* 启动服务
*/
public void startup(int port) {
try {
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.SO_RCVBUF, 10485760);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast("encoder", new MyEncoder());
channelPipeline.addLast("decoder", new MyDecoder());
channelPipeline.addLast(bootNettySocketChannelInboundHandler);
}
});
ChannelFuture f = bootstrap.bind(port).sync();
if(f.isSuccess()){
System.out.println("成功侦听端口:" + port);
f.channel().closeFuture().sync();
} else {
System.out.println("侦听端口失败:" + port);
}
} catch (Exception e) {
System.out.println("start exception"+e.toString());
}
}
}

View File

@ -0,0 +1,21 @@
package org.example.socket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author zjb
* @since 2023-12-11 10:35:00
*/
@Component
public class BootNettySocketServerThread extends Thread {
private final int port = 8234;
@Autowired
private BootNettySocketServer bootNettySocketServer;
public void run() {
bootNettySocketServer.startup(this.port);
}
}

View File

@ -0,0 +1,9 @@
package org.example.utils;
public class CommonConfig {
public static final String energy0 = "127.0.0.1";
public static final String energy1 = "10.17.184.195";
}

View File

@ -0,0 +1,15 @@
package org.example.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class IdService {
@Autowired
private SnowflakeIdGenerator generator;
public Long gen() {
return generator.nextId();
}
}

View File

@ -0,0 +1,47 @@
package org.example.utils;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/**
* @author ZJB
* @since 2023-12-11 10:50:00
*/
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
byte[] b = new byte[buffer.readableBytes()];
buffer.readBytes(b);
out.add(b);
}
public String bytesToHexString(byte[] bArray) {
StringBuffer sb = new StringBuffer(bArray.length);
String sTemp;
for (int i = 0; i < bArray.length; i++) {
sTemp = Integer.toHexString(0xFF & bArray[i]);
if (sTemp.length() < 2)
sb.append(0);
if(i < bArray.length - 1){
sb.append(sTemp.toUpperCase()).append(" ");
}else{
sb.append(sTemp.toUpperCase());
}
}
return sb.toString();
}
public static String toHexString1(byte b) {
String s = Integer.toHexString(b & 0xFF);
if (s.length() == 1) {
return "0" + s;
} else {
return s;
}
}
}

View File

@ -0,0 +1,43 @@
package org.example.utils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
/**
* @author zjb
* @since 2023-12-11 10:49:32
*/
public class MyEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 对消息进行编码
ByteBuf encodedMsg = encode2(msg);
// 将编码后的消息发送到下一个 ChannelHandler
ctx.write(encodedMsg, promise);
}
// 自定义的编码方法 16进制解码 字符串
private ByteBuf encode(Object msg) throws DecoderException {
// 实现编码逻辑
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(Hex.decodeHex(msg.toString().replaceAll(" ", "").toCharArray()));
return buf;
}
//编码 十进制数组编码
private ByteBuf encode2(Object msg) throws DecoderException {
// 创建一个新的ByteBuf对象
ByteBuf byteBuf = Unpooled.buffer();
// 遍历数组并写入到ByteBuf中
for (int i : (int [])msg) {
byteBuf.writeByte(i);
}
return byteBuf;
}
}

View File

@ -0,0 +1,131 @@
package org.example.utils;
import org.springframework.stereotype.Component;
@Component
public class SnowflakeIdGenerator {
// ==============================Fields===========================================
/**
* 开始时间截 (2015-01-01)
*/
private final long twepoch = 1420041600000L;
/**
* 机器id所占的位数
*/
private final long workerIdBits = 5L;
/**
* 数据标识id所占的位数
*/
private final long datacenterIdBits = 5L;
/**
* 支持的最大机器id结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
*/
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);
/**
* 支持的最大数据标识id结果是31
*/
private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);
/**
* 序列在id中占的位数
*/
private final long sequenceBits = 12L;
/**
* 机器ID向左移12位
*/
private final long workerIdShift = sequenceBits;
/**
* 数据标识id向左移17位(12+5)
*/
private final long datacenterIdShift = sequenceBits + workerIdBits;
/**
* 时间截向左移22位(5+5+12)
*/
private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;
/**
* 生成序列的掩码这里为4095 (0b111111111111=0xfff=4095)
*/
private final long sequenceMask = -1L ^ (-1L << sequenceBits);
/**
* 工作机器ID(0~31)
*/
private long workerId;
/**
* 数据中心ID(0~31)
*/
private long datacenterId;
/**
* 毫秒内序列(0~4095)
*/
private long sequence = 0L;
/**
* 上次生成ID的时间截
*/
private long lastTimestamp = -1L;
//==============================Constructors=====================================
/**
* 构造函数 * @param workerId 工作ID (0~31) * @param datacenterId 数据中心ID (0~31)
*/
public SnowflakeIdGenerator(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
} // ==============================Methods==========================================
public SnowflakeIdGenerator() {
}
/**
* 获得下一个ID (该方法是线程安全的) * @return SnowflakeId
*/
public synchronized long nextId() {
long timestamp = timeGen();
//如果当前时间小于上一次ID生成的时间戳说明系统时钟回退过这个时候应当抛出异常
// if (timestamp < lastTimestamp) {
// throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
// }
//如果是同一时间生成的则进行毫秒内序列
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
//毫秒内序列溢出
if (sequence == 0) {
//阻塞到下一个毫秒,获得新的时间戳
timestamp = tilNextMillis(lastTimestamp);
}
}
//时间戳改变毫秒内序列重置
else {
sequence = 0L;
}
//上次生成ID的时间截
lastTimestamp = timestamp;
//移位并通过或运算拼到一起组成64位的ID
return ((timestamp - twepoch) << timestampLeftShift)
| (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence;
}
/**
* 阻塞到下一个毫秒直到获得新的时间戳 * @param lastTimestamp 上次生成ID的时间截 * @return 当前时间戳
*/
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
/**
* 返回以毫秒为单位的当前时间 * @return 当前时间(毫秒)
*/
protected long timeGen() {
return System.currentTimeMillis();
}
}

View File

@ -0,0 +1,32 @@
spring.application.name=netty
server.servlet.encoding.force=true
server.port=7781
lombok.var.flagUsage = ALLOW
# Single file max size
multipart.maxFileSize=50Mb
# All files max size
multipart.maxRequestSize=50Mb
spring.datasource.dynamic.datasource.primary=primary
spring.datasource.dynamic.datasource.primary.url=jdbc:mysql://10.16.56.3:3308/jcxt?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.dynamic.datasource.primary.username=root
spring.datasource.dynamic.datasource.primary.password=Unity3du#d112233
spring.datasource.dynamic.datasource.primary.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.dynamic.druid.initial-size=10
spring.datasource.dynamic.druid.max-idle=20
spring.datasource.dynamic.druid.min-idle=5
spring.datasource.dynamic.druid.max-active=50
spring.datasource.dynamic.druid.log-abandoned=true
spring.datasource.dynamic.druid.remove-abandoned=true
spring.datasource.dynamic.druid.remove-abandoned-timeout-millis=120
spring.datasource.dynamic.druid.max-wait=1000
spring.datasource.dynamic.druid.test-while-idle=true
spring.datasource.dynamic.druid.validation-query=select 1 from dual
spring.datasource.dynamic.druid.test-on-borrow=true
spring.datasource.dynamic.druid.min-evictable-idle-time-millis=100000
spring.datasource.dynamic.druid.max-evictable-idle-time-millis=100000
spring.datasource.dynamic.druid.time-between-eviction-runs-millis=100000
spring.datasource.dynamic.druid.break-after-acquire-failure=true
spring.datasource.dynamic.druid.connection-error-retry-attempts=5
spring.datasource.dynamic.druid.fail-fast=true
spring.datasource.dynamic.druid.time-between-connect-error-millis=10000