From 28e36cb4d2da9fcbb67be6e828eff13ae661a327 Mon Sep 17 00:00:00 2001 From: zhangjinbang <13002584422@163.com> Date: Mon, 25 Dec 2023 17:25:10 +0800 Subject: [PATCH] 111 --- iot-energy/pom.xml | 113 ++++++++++ .../main/java/org/example/IotApplication.java | 30 +++ .../condition/EnergyDataCondition.java | 36 ++++ .../example/background/dao/EnergyDataDao.java | 20 ++ .../background/entity/EnergyDataEntity.java | 69 ++++++ .../background/service/MakeInfoService.java | 198 ++++++++++++++++++ .../example/background/utils/FieldUtils.java | 52 +++++ .../java/org/example/druid/DBTypeEnum.java | 25 +++ .../org/example/druid/DbContextHolder.java | 33 +++ .../org/example/druid/DruidProperties.java | 46 ++++ .../org/example/druid/DynamicDataSource.java | 10 + .../BootNettySocketChannelInboundHandler.java | 138 ++++++++++++ .../example/socket/BootNettySocketServer.java | 75 +++++++ .../socket/BootNettySocketServerThread.java | 21 ++ .../java/org/example/utils/CommonConfig.java | 9 + .../java/org/example/utils/IdService.java | 15 ++ .../java/org/example/utils/MyDecoder.java | 47 +++++ .../java/org/example/utils/MyEncoder.java | 43 ++++ .../example/utils/SnowflakeIdGenerator.java | 131 ++++++++++++ .../src/main/resources/application.properties | 32 +++ 20 files changed, 1143 insertions(+) create mode 100644 iot-energy/pom.xml create mode 100644 iot-energy/src/main/java/org/example/IotApplication.java create mode 100644 iot-energy/src/main/java/org/example/background/condition/EnergyDataCondition.java create mode 100644 iot-energy/src/main/java/org/example/background/dao/EnergyDataDao.java create mode 100644 iot-energy/src/main/java/org/example/background/entity/EnergyDataEntity.java create mode 100644 iot-energy/src/main/java/org/example/background/service/MakeInfoService.java create mode 100644 iot-energy/src/main/java/org/example/background/utils/FieldUtils.java create mode 100644 iot-energy/src/main/java/org/example/druid/DBTypeEnum.java create mode 100644 iot-energy/src/main/java/org/example/druid/DbContextHolder.java create mode 100644 iot-energy/src/main/java/org/example/druid/DruidProperties.java create mode 100644 iot-energy/src/main/java/org/example/druid/DynamicDataSource.java create mode 100644 iot-energy/src/main/java/org/example/socket/BootNettySocketChannelInboundHandler.java create mode 100644 iot-energy/src/main/java/org/example/socket/BootNettySocketServer.java create mode 100644 iot-energy/src/main/java/org/example/socket/BootNettySocketServerThread.java create mode 100644 iot-energy/src/main/java/org/example/utils/CommonConfig.java create mode 100644 iot-energy/src/main/java/org/example/utils/IdService.java create mode 100644 iot-energy/src/main/java/org/example/utils/MyDecoder.java create mode 100644 iot-energy/src/main/java/org/example/utils/MyEncoder.java create mode 100644 iot-energy/src/main/java/org/example/utils/SnowflakeIdGenerator.java create mode 100644 iot-energy/src/main/resources/application.properties diff --git a/iot-energy/pom.xml b/iot-energy/pom.xml new file mode 100644 index 0000000..3fca93d --- /dev/null +++ b/iot-energy/pom.xml @@ -0,0 +1,113 @@ + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 2.1.1.RELEASE + + + + org.example + iot-energy + 1.0-SNAPSHOT + + + 1.8 + 1.8 + UTF-8 + + + + + io.netty + netty-all + 4.1.6.Final + + + net.sf.json-lib + json-lib + 2.4 + jdk15 + + + + com.alibaba + fastjson + 1.2.70 + + + + mysql + mysql-connector-java + 5.1.49 + + + + com.baomidou + mybatis-plus-boot-starter + 3.4.1 + + + com.baomidou + dynamic-datasource-spring-boot-starter + 3.4.1 + + + + com.alibaba + druid-spring-boot-starter + 1.1.10 + + + + org.projectlombok + lombok + 1.18.0 + provided + + + + commons-codec + commons-codec + 1.4 + + + + com.ghgande + j2mod + 2.5.3 + + + + + com.fasterxml.jackson.dataformat + jackson-dataformat-xml + 2.16.0 + + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + 2.1.1.RELEASE + + + + repackage + + + + + + + + + \ No newline at end of file diff --git a/iot-energy/src/main/java/org/example/IotApplication.java b/iot-energy/src/main/java/org/example/IotApplication.java new file mode 100644 index 0000000..13173a9 --- /dev/null +++ b/iot-energy/src/main/java/org/example/IotApplication.java @@ -0,0 +1,30 @@ +package org.example; + +import org.example.socket.BootNettySocketServerThread; +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.scheduling.annotation.EnableScheduling; + +@SpringBootApplication(exclude = DataSourceAutoConfiguration.class) +@MapperScan({"org.example.background.dao"}) +@EnableScheduling //允许自动调用 +public class IotApplication implements CommandLineRunner { + + @Autowired + private BootNettySocketServerThread socketServerThread; + + public static void main(String[] args) { + SpringApplication app = new SpringApplication(IotApplication.class); + app.run(args); + } + + @Override + public void run(String... args) throws Exception { + socketServerThread.start(); + } + +} \ No newline at end of file diff --git a/iot-energy/src/main/java/org/example/background/condition/EnergyDataCondition.java b/iot-energy/src/main/java/org/example/background/condition/EnergyDataCondition.java new file mode 100644 index 0000000..9186c78 --- /dev/null +++ b/iot-energy/src/main/java/org/example/background/condition/EnergyDataCondition.java @@ -0,0 +1,36 @@ +package org.example.background.condition; + +import org.apache.ibatis.jdbc.SQL; +import org.example.background.entity.EnergyDataEntity; +import org.example.background.utils.FieldUtils; +import org.springframework.cglib.beans.BeanMap; + +public class EnergyDataCondition { + + public String insert(EnergyDataEntity data){ + SQL sql = new SQL(); + sql.INSERT_INTO("energy_data"); + BeanMap beanMap = BeanMap.create(data); + for (Object key : beanMap.keySet()) { + Object val = beanMap.get(key); + if (val != null) { + sql.VALUES(FieldUtils.camelToLine(key + ""), "#{" + key + "}"); + } + } + return sql.toString(); + } + + public String updateByPrimaryKey(EnergyDataEntity data) { + SQL sql = new SQL(); + sql.UPDATE("energy_data"); + BeanMap beanMap = BeanMap.create(data); + for (Object key : beanMap.keySet()) { + Object val = beanMap.get(key); + if (val != null) { + sql.SET(FieldUtils.camelToLine(key + "") + "=#{" + key + "}"); + } + } + sql.WHERE("energy_id=#{energyId}"); + return sql.toString(); + } +} diff --git a/iot-energy/src/main/java/org/example/background/dao/EnergyDataDao.java b/iot-energy/src/main/java/org/example/background/dao/EnergyDataDao.java new file mode 100644 index 0000000..ed07850 --- /dev/null +++ b/iot-energy/src/main/java/org/example/background/dao/EnergyDataDao.java @@ -0,0 +1,20 @@ +package org.example.background.dao; + + +import org.apache.ibatis.annotations.InsertProvider; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.UpdateProvider; +import org.example.background.condition.EnergyDataCondition; +import org.example.background.entity.EnergyDataEntity; + +public interface EnergyDataDao { + + @InsertProvider(type = EnergyDataCondition.class, method = "insert") + Integer insert(EnergyDataEntity data); + + @Select("SELECT * FROM energy_data WHERE device_Id = #{deviceId} AND acquisition_time = #{acquisitionTime}") + EnergyDataEntity select(EnergyDataEntity data); + + @UpdateProvider(type = EnergyDataCondition.class, method = "updateByPrimaryKey") + Integer updateByPrimaryKey(EnergyDataEntity data); +} diff --git a/iot-energy/src/main/java/org/example/background/entity/EnergyDataEntity.java b/iot-energy/src/main/java/org/example/background/entity/EnergyDataEntity.java new file mode 100644 index 0000000..18537d4 --- /dev/null +++ b/iot-energy/src/main/java/org/example/background/entity/EnergyDataEntity.java @@ -0,0 +1,69 @@ +package org.example.background.entity; + +import lombok.Data; +import com.fasterxml.jackson.annotation.JsonFormat; + +import java.io.Serializable; +import java.util.Date; + + +@Data +public class EnergyDataEntity implements Serializable{ + + private Long energyId; + + private String deviceId; + + //@ApiModelProperty(value = "瞬时流量") + private Float instantaneousDelivery; + + //@ApiModelProperty(value = "瞬时热量") + private Float instantHeat; + + //@ApiModelProperty(value = "正累积流量") + private Float positiveTotalFlow; + + //@ApiModelProperty(value = "负累积流量") + private Float negativeTotalFlow; + + //@ApiModelProperty(value = "净累积流量") + private Float cumulativeTraffic; + + //@ApiModelProperty(value = "供水温度") + private Float supplyWaterTemperature; + + //@ApiModelProperty(value = "回水温度") + private Float returnWaterTemperature; + + //@ApiModelProperty(value = "流体速度") + private String fluidVelocity; + + //@ApiModelProperty(value = "测量流体声速") + private String fluidVelocitySound; + + //@ApiModelProperty(value = "正累积热量") + private Float positiveCumulativeHeat; + + //@ApiModelProperty(value = "负累积热量") + private Float negativeCumulativeHeat; + + //@ApiModelProperty(value = "净累积热量") + private Float netCumulativeHeat; + + //@ApiModelProperty(value = "今天累积流量") + private Float accumulatedTrafficDay; + + //@ApiModelProperty(value = "本月累积流量") + private Float accumulatedTrafficMonth; + + //@ApiModelProperty(value = "今年累积流量") + private Float accumulatedTrafficYear; + + //@ApiModelProperty(value = "入库时间") + @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date createTime; + + //@ApiModelProperty(value = "整点采集时间") + @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + private Date acquisitionTime; +} diff --git a/iot-energy/src/main/java/org/example/background/service/MakeInfoService.java b/iot-energy/src/main/java/org/example/background/service/MakeInfoService.java new file mode 100644 index 0000000..38d9631 --- /dev/null +++ b/iot-energy/src/main/java/org/example/background/service/MakeInfoService.java @@ -0,0 +1,198 @@ +package org.example.background.service; + +import com.ghgande.j2mod.modbus.util.ModbusUtil; +import org.example.background.dao.EnergyDataDao; +import org.example.background.entity.EnergyDataEntity; +import org.example.utils.IdService; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.nio.ByteBuffer; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * author zjb + * 服务端发送指令给下位 + */ +@Component +public class MakeInfoService { + + @Resource + private EnergyDataDao energyDataDao; + + @Resource + private IdService idService; + + private Lock lock = new ReentrantLock(); + + //读取0001-0100的数据共100个寄存器 能量表 + public int[] sendEnergyMessage () { + int [] data = new int[8]; + data[0] = 0x01; + data[1] = 0x03; + data[2] = 0x00; + data[3] = 0x00; + data[4] = 0x00; + data[5] = 0x64; + String result = Integer.toHexString(getCrc(data)); + data[6] = Integer.parseInt(result.substring(2, 4), 16); + data[7] = Integer.parseInt(result.substring(0, 2), 16); + return data; + } + + //读取1529的数据共2个寄存器 获取设备编号 能量表 + public int[] sendEnergyGetDevId () { + int [] data = new int[8]; + data[0] = 0x01; + data[1] = 0x03; + data[2] = 0x05; + data[3] = 0xF9; + data[4] = 0x00; + data[5] = 0x01; + String result = Integer.toHexString(getCrc(data)); + data[6] = Integer.parseInt(result.substring(2, 4), 16); + data[7] = Integer.parseInt(result.substring(0, 2), 16); + return data; + } + + //读取0101-0150的数据共50个寄存器 获取设备编号 能量表 + public int[] sendEnergyGetDevId50 () { + int [] data = new int[8]; + data[0] = 0x01; + data[1] = 0x03; + data[2] = 0x00; + data[3] = 0x64; + data[4] = 0x00; + data[5] = 0x32; + String result = Integer.toHexString(getCrc(data)); + data[6] = Integer.parseInt(result.substring(2, 4), 16); + data[7] = Integer.parseInt(result.substring(0, 2), 16); + return data; + } + + + //获取指令最后两位(CRC校验位) + public int getCrc (int [] bytes) { + int crc = 0xFFFF; //初始化寄存器 + int polynomial = 0xA001; // 多项式A001对应的二进制数 + for (int i = 0; i < bytes.length - 2; i++) { + crc ^= bytes[i]; + for (int j = 0; j < 8; j++) { + if ((crc & 0x0001) == 1) { + crc >>= 1; + crc ^= polynomial; + } else { + crc >>= 1; + } + } + } + return crc; + } + + //响应报文转明文 + public String convertPlaintext (byte [] data) { + //校验响应的数据是否完整且正确 crc校验 + int[] ints = ModbusUtil.calculateCRC(data, 0, data.length - 2); + if(Integer.toHexString(ints[0]).equals(Integer.toHexString(data[data.length - 2] & 0xFF)) && Integer.toHexString(ints[1]).equals(Integer.toHexString(data[data.length - 1] & 0xFF))){ + //将寄存器值转换为可读的明文 + return ModbusUtil.toHex(data); + }else{ + return "-1"; + } + } + + //能量表解析 解析0000-0100的响应数据共100个寄存器的数据 16进制(浮点数) 转10进制 IEEE-754标准浮点数计算方法 浮点数 3412 + public Integer convertTextToData (String param,String clientIp) { + lock.lock(); + Integer result = 0; + String[] data = param.split(" "); + if(data[2].equals("C8")) { + result = energy1_100(data, clientIp); + }else if(data[2].equals("64")) { + energy101_150(data,clientIp); + }else { + System.out.println("客户端发送的指令不符合预期"); + } + lock.unlock(); + return result; + } + public Integer energy1_100(String [] data,String clientIp){ + Integer dataCount = Integer.parseInt(data[2], 16); //数据数量 + EnergyDataEntity energyDataEntity = new EnergyDataEntity(); + energyDataEntity.setEnergyId(idService.gen()); + energyDataEntity.setDeviceId(clientIp.replace(".","")); + energyDataEntity.setInstantaneousDelivery(hexToFloat(data[5] + data[6] + data[3] + data[4])); //瞬时流量 + energyDataEntity.setInstantHeat(hexToFloat(data[9] + data[10] + data[7] + data[8])); //瞬时热量 + energyDataEntity.setFluidVelocity(String.valueOf(hexToFloat(data[13] + data[14] + data[11] + data[12]))); //流体速度 + energyDataEntity.setFluidVelocitySound(String.valueOf(hexToFloat(data[17]+data[18]+data[15]+data[16]))); //测量流体声速 + energyDataEntity.setPositiveTotalFlow((Long.parseLong(data[21] + data[22] + data[19] + data[20],16) + hexToFloat(data[25] + data[26] + data[23] + data[24]))); //正累计流量 + energyDataEntity.setCumulativeTraffic((Long.parseLong(data[53] + data[54] + data[51] + data[52],16) + hexToFloat(data[57] + data[58] + data[55] + data[56]))); //累计流量 + energyDataEntity.setNegativeTotalFlow((hexToLongWithSigned(data[27] ,data[28] , data[29] , data[30]) + hexToFloat(data[33] + data[34] + data[31] + data[32]))); //负累计流量 + energyDataEntity.setPositiveCumulativeHeat((Long.parseLong(data[37]+data[38]+data[35]+data[36],16) + hexToFloat(data[41]+data[42]+data[39]+data[40]))); //正累计热量 + energyDataEntity.setNegativeCumulativeHeat((hexToLongWithSigned(data[43],data[44],data[45],data[46]) + hexToFloat(data[49]+data[50]+data[47]+data[48]))); //负累计热量 + energyDataEntity.setNetCumulativeHeat((Long.parseLong(data[61]+data[62]+data[59]+data[60],16) + hexToFloat(data[65]+data[66]+data[63]+data[64]))); + energyDataEntity.setSupplyWaterTemperature(hexToFloat(data[69] + data[70] + data[67] + data[68])); //供水温度 + energyDataEntity.setReturnWaterTemperature(hexToFloat(data[73] + data[74] + data[71] + data[72])); //回水温度 + energyDataEntity.setCreateTime(new Date()); + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:00:00"); + try { + energyDataEntity.setAcquisitionTime(formatter.parse(formatter.format(new Date()))); + }catch (ParseException e) { + e.printStackTrace(); + } + Integer insert = energyDataDao.insert(energyDataEntity); + System.out.println("已入库"); + return insert; + } + + public void energy101_150(String [] data, String clientIp) { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:00:00"); + Integer dataCount = Integer.parseInt(data[2], 16); //数据数量 + EnergyDataEntity entity = new EnergyDataEntity(); + entity.setDeviceId(clientIp.replace(".","")); + try { + entity.setAcquisitionTime(formatter.parse(formatter.format(new Date()))); + }catch (ParseException e) { + e.printStackTrace(); + } + EnergyDataEntity energyDataEntity = energyDataDao.select(entity); + energyDataEntity.setAccumulatedTrafficDay((Long.parseLong(data[77]+data[78]+data[75]+data[76],16) + hexToFloat(data[81]+data[82]+data[79]+data[80]))); + energyDataEntity.setAccumulatedTrafficMonth((Long.parseLong(data[85]+data[86]+data[83]+data[84],16) + hexToFloat(data[89]+data[90]+data[87]+data[88]))); + energyDataEntity.setAccumulatedTrafficYear((Long.parseLong(data[93]+data[94]+data[91]+data[92],16) + hexToFloat(data[97]+data[98]+data[95]+data[96]))); + energyDataDao.updateByPrimaryKey(energyDataEntity); + System.out.println("已经填补剩余数据"); + } + + + public static float hexToFloat(String hex) { + byte[] bytes = new byte[4]; + for (int i = 0; i < 4; i++) { + bytes[i] = (byte) Integer.parseInt(hex.substring(i * 2, i * 2 + 2), 16); + } + return ByteBuffer.wrap(bytes).getFloat(); + } + + //有符号型16进制转long + public static long hexToLongWithSigned(String a1,String a2,String a3,String a4) { + Integer [] arr = new Integer[2]; + arr[0] = Integer.parseInt(a1+a2, 16); + arr[1] = Integer.parseInt(a3+a4, 16); + Arrays.sort(arr); + String hexStr = Integer.toHexString(arr[1]) + Integer.toHexString(arr[0]); + long unsignedValue = Long.parseUnsignedLong(hexStr, 16); + String substring = Long.toBinaryString(unsignedValue).substring(0, 1); + if(substring.equals("0")){ + unsignedValue = 0; + } else if(substring.equals("1")){ + unsignedValue = (long) -(Math.pow(2,32) - unsignedValue); + }else if(substring.equals("0")){ + unsignedValue = (long) (Math.pow(2,32) - unsignedValue); + } + return unsignedValue; + } +} diff --git a/iot-energy/src/main/java/org/example/background/utils/FieldUtils.java b/iot-energy/src/main/java/org/example/background/utils/FieldUtils.java new file mode 100644 index 0000000..96caa5c --- /dev/null +++ b/iot-energy/src/main/java/org/example/background/utils/FieldUtils.java @@ -0,0 +1,52 @@ +package org.example.background.utils; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class FieldUtils { + private static Pattern linePattern = Pattern.compile("_(\\w)"); + + /** + * 下划线转驼峰 + */ + public static String lineToCamel(String str) { + str = str.toLowerCase(); + Matcher matcher = linePattern.matcher(str); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + matcher.appendReplacement(sb, matcher.group(1).toUpperCase()); + } + matcher.appendTail(sb); + return sb.toString(); + } + + private static Pattern humpPattern = Pattern.compile("[A-Z]"); + + /** + * 驼峰转下划线,效率比上面高 + */ + public static String camelToLine(String str) { + Matcher matcher = humpPattern.matcher(str); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + matcher.appendReplacement(sb, "_" + matcher.group(0).toLowerCase()); + } + matcher.appendTail(sb); + return sb.toString(); + } + + + /** + * 驼峰转下划线,效率比上面高 + */ + public static String camelToLines(String str) { + str = str.substring(0, 1).toLowerCase() + str.substring(1); + Matcher matcher = humpPattern.matcher(str); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + matcher.appendReplacement(sb, "_"+ matcher.group(0).toLowerCase()); + } + matcher.appendTail(sb); + return sb.toString(); + } +} diff --git a/iot-energy/src/main/java/org/example/druid/DBTypeEnum.java b/iot-energy/src/main/java/org/example/druid/DBTypeEnum.java new file mode 100644 index 0000000..40cfd4c --- /dev/null +++ b/iot-energy/src/main/java/org/example/druid/DBTypeEnum.java @@ -0,0 +1,25 @@ +package org.example.druid; + +public enum DBTypeEnum { + /** + * 主库 + */ + MASTER("master"), + + /** + * 从库 + */ + SLAVE("slave"); + + private final String value; + + DBTypeEnum(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + +} + diff --git a/iot-energy/src/main/java/org/example/druid/DbContextHolder.java b/iot-energy/src/main/java/org/example/druid/DbContextHolder.java new file mode 100644 index 0000000..289654b --- /dev/null +++ b/iot-energy/src/main/java/org/example/druid/DbContextHolder.java @@ -0,0 +1,33 @@ +package org.example.druid; + +public class DbContextHolder { + //使用ThreadLocal,防止线程安全问题 + private static final ThreadLocal CONTEXT_HOLDER = new ThreadLocal(); + + /** + * 设置数据源 + * + * @param dbTypeEnum 数据库类型 + */ + public static void setDbType(DBTypeEnum dbTypeEnum) { + CONTEXT_HOLDER.set(dbTypeEnum.getValue()); + } + + /** + * 取得当前数据源 + * + * @return dbType + */ + public static String getDbType() { + return (String) CONTEXT_HOLDER.get(); + } + + /** + * 清除上下文数据 + */ + public static void clearDbType() { + CONTEXT_HOLDER.remove(); + } + +} + diff --git a/iot-energy/src/main/java/org/example/druid/DruidProperties.java b/iot-energy/src/main/java/org/example/druid/DruidProperties.java new file mode 100644 index 0000000..a807289 --- /dev/null +++ b/iot-energy/src/main/java/org/example/druid/DruidProperties.java @@ -0,0 +1,46 @@ +package org.example.druid; + +import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +import javax.sql.DataSource; +import java.util.HashMap; +import java.util.Map; + +/** + * druid 配置属性 + */ +@Configuration +public class DruidProperties +{ + + + @Bean + @ConfigurationProperties("spring.datasource.dynamic.datasource.primary") + public DataSource primaryDataSource() + { + return DruidDataSourceBuilder.create().build(); + } + +// @Bean +// @ConfigurationProperties(prefix = "spring.datasource.slave") +// public DataSource slaveDataSource(){ +// return new DruidDataSourceBuilder().build(); +// } + + @Bean + @Primary + public DataSource multipleDataSource(DataSource masterDataSource){ + DynamicDataSource multipleDataSource = new DynamicDataSource(); + Map dataSources = new HashMap<>(); + dataSources.put(DBTypeEnum.MASTER.getValue(), masterDataSource); +// dataSources.put(DBTypeEnum.SLAVE.getValue(), slaveDataSource); + multipleDataSource.setTargetDataSources(dataSources); + multipleDataSource.setDefaultTargetDataSource(masterDataSource); + return multipleDataSource; + } + +} \ No newline at end of file diff --git a/iot-energy/src/main/java/org/example/druid/DynamicDataSource.java b/iot-energy/src/main/java/org/example/druid/DynamicDataSource.java new file mode 100644 index 0000000..7976c79 --- /dev/null +++ b/iot-energy/src/main/java/org/example/druid/DynamicDataSource.java @@ -0,0 +1,10 @@ +package org.example.druid; + +import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; + +public class DynamicDataSource extends AbstractRoutingDataSource { + @Override + protected Object determineCurrentLookupKey() { + return DbContextHolder.getDbType(); + } +} diff --git a/iot-energy/src/main/java/org/example/socket/BootNettySocketChannelInboundHandler.java b/iot-energy/src/main/java/org/example/socket/BootNettySocketChannelInboundHandler.java new file mode 100644 index 0000000..074386a --- /dev/null +++ b/iot-energy/src/main/java/org/example/socket/BootNettySocketChannelInboundHandler.java @@ -0,0 +1,138 @@ +package org.example.socket; + +import com.ctc.wstx.util.StringUtil; +import io.netty.channel.*; +import org.example.background.service.MakeInfoService; +import org.example.utils.CommonConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.annotation.Resource; +import java.net.SocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * @author zjb + * @since 2023-12-11 10:31:26 + */ +@Component +@EnableAsync +@ChannelHandler.Sharable +public class BootNettySocketChannelInboundHandler extends ChannelInboundHandlerAdapter { + + private final Logger log = LoggerFactory.getLogger(this.getClass()); + + @Resource + private MakeInfoService makeInfo; + + private Map ctxMap = new ConcurrentHashMap(16); + + + /** + * 从客户端收到新的数据时,这个方法会在收到消息时被调用 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object frame) { +// int[] ints = makeInfo.sendEnergyGetDevId50(); + String result = makeInfo.convertPlaintext((byte[]) frame); + if(result.equals("-1")){ + System.out.println("CRC校验错误,收到的数据不完整或错误"); + }else{ + System.out.println("客户端发送:"+result); + Integer res = makeInfo.convertTextToData(result, getClientIP(ctx)); + if(res > 0) { + //发送补充指令 + //发送第二条指令 + int[] data2 = makeInfo.sendEnergyGetDevId50(); + ChannelFuture channelFuture1 = ctx.channel().writeAndFlush(data2); + channelFuture1.addListener((ChannelFutureListener) future2 -> { + if (future2.isSuccess()) { + sendEnergyMessages(data2); + } else { + System.out.println("第二条指令发送失败"); + } + }); + } + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // 客户端断开连接的处理逻辑 + System.out.println("客户端断开连接"); + ctxMap.remove(getClientIP(ctx)); + super.channelInactive(ctx); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + System.out.println("客户端(重新)连接:" + ctx.channel().remoteAddress()); + ctxMap.put(getClientIP(ctx),ctx); + super.channelActive(ctx); + } + + + /** + * 客户端与服务端 断连时执行 channelInactive方法之后执行 + */ + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + ctxMap.remove(getClientIP(ctx)); + super.channelUnregistered(ctx); + } + + /** + * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { +// ctxMap.remove(getClientIP(ctx)); + super.exceptionCaught(ctx, cause); + } + + @Scheduled(cron = "0 0 */1 * * *") //每小时的第0分钟第0秒 +// @Scheduled(fixedRate = 10000) //10秒 + private void sendIntervals() { + int[] data = makeInfo.sendEnergyMessage(); + ChannelHandlerContext ctx = ctxMap.get(CommonConfig.energy1); + if(!StringUtils.isEmpty(ctx)) { + ChannelFuture channelFuture = ctx.channel().writeAndFlush(data); + // 添加监听器,检查发送是否成功 + channelFuture.addListener((ChannelFutureListener) future1 -> { + if(future1.isSuccess()) { + sendEnergyMessages(data); + }else{ + System.out.println("第一条指令发送失败"); + } + }); + }else{ + System.out.println("ip为"+CommonConfig.energy1+"的客户端不在线。无法传递指令"); + } + } + + public void sendEnergyMessages(int [] data){ + System.out.println("成功发送指令:"); + for(int num = 0; num < data.length; num++){ + if (num == data.length - 1) { + System.out.print(Integer.toHexString(data[num])); + System.out.print("\n"); + }else { + System.out.print(Integer.toHexString(data[num]) + " "); + } + } + } + + public String getClientIP(ChannelHandlerContext ctx){ + String ip = ctx.channel().remoteAddress().toString(); + return ip.substring(1, ip.indexOf(":")); + } + +} diff --git a/iot-energy/src/main/java/org/example/socket/BootNettySocketServer.java b/iot-energy/src/main/java/org/example/socket/BootNettySocketServer.java new file mode 100644 index 0000000..2d1e5a4 --- /dev/null +++ b/iot-energy/src/main/java/org/example/socket/BootNettySocketServer.java @@ -0,0 +1,75 @@ +package org.example.socket; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.timeout.ReadTimeoutHandler; +import org.example.utils.MyDecoder; +import org.example.utils.MyEncoder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.TimeUnit; + +/** + * @author zjb + * @since 2023-12-11 10:39:00 + */ +@Component +public class BootNettySocketServer { + + private NioEventLoopGroup bossGroup; + + private NioEventLoopGroup workGroup; + + @Autowired + private BootNettySocketChannelInboundHandler bootNettySocketChannelInboundHandler; + + + /** + * 启动服务 + */ + public void startup(int port) { + try { + bossGroup = new NioEventLoopGroup(1); + workGroup = new NioEventLoopGroup(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workGroup); + bootstrap.channel(NioServerSocketChannel.class); + + bootstrap.option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_RCVBUF, 10485760); + + bootstrap.childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + bootstrap.childHandler(new ChannelInitializer() { + protected void initChannel(SocketChannel ch) { + ChannelPipeline channelPipeline = ch.pipeline(); + channelPipeline.addLast("encoder", new MyEncoder()); + channelPipeline.addLast("decoder", new MyDecoder()); + channelPipeline.addLast(bootNettySocketChannelInboundHandler); + } + }); + ChannelFuture f = bootstrap.bind(port).sync(); + if(f.isSuccess()){ + System.out.println("成功侦听端口:" + port); + f.channel().closeFuture().sync(); + } else { + System.out.println("侦听端口失败:" + port); + } + } catch (Exception e) { + System.out.println("start exception"+e.toString()); + } + } + +} diff --git a/iot-energy/src/main/java/org/example/socket/BootNettySocketServerThread.java b/iot-energy/src/main/java/org/example/socket/BootNettySocketServerThread.java new file mode 100644 index 0000000..cfe95d9 --- /dev/null +++ b/iot-energy/src/main/java/org/example/socket/BootNettySocketServerThread.java @@ -0,0 +1,21 @@ +package org.example.socket; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author zjb + * @since 2023-12-11 10:35:00 + */ +@Component +public class BootNettySocketServerThread extends Thread { + private final int port = 8234; + + @Autowired + private BootNettySocketServer bootNettySocketServer; + + public void run() { + bootNettySocketServer.startup(this.port); + } + +} diff --git a/iot-energy/src/main/java/org/example/utils/CommonConfig.java b/iot-energy/src/main/java/org/example/utils/CommonConfig.java new file mode 100644 index 0000000..fe2f039 --- /dev/null +++ b/iot-energy/src/main/java/org/example/utils/CommonConfig.java @@ -0,0 +1,9 @@ +package org.example.utils; + +public class CommonConfig { + + public static final String energy0 = "127.0.0.1"; + + public static final String energy1 = "10.17.184.195"; + +} diff --git a/iot-energy/src/main/java/org/example/utils/IdService.java b/iot-energy/src/main/java/org/example/utils/IdService.java new file mode 100644 index 0000000..83a68c7 --- /dev/null +++ b/iot-energy/src/main/java/org/example/utils/IdService.java @@ -0,0 +1,15 @@ +package org.example.utils; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Component +public class IdService { + + @Autowired + private SnowflakeIdGenerator generator; + + public Long gen() { + return generator.nextId(); + } +} \ No newline at end of file diff --git a/iot-energy/src/main/java/org/example/utils/MyDecoder.java b/iot-energy/src/main/java/org/example/utils/MyDecoder.java new file mode 100644 index 0000000..47902b7 --- /dev/null +++ b/iot-energy/src/main/java/org/example/utils/MyDecoder.java @@ -0,0 +1,47 @@ +package org.example.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +/** + * @author ZJB + * @since 2023-12-11 10:50:00 + */ +public class MyDecoder extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List out) throws Exception { + byte[] b = new byte[buffer.readableBytes()]; + buffer.readBytes(b); + out.add(b); + } + + public String bytesToHexString(byte[] bArray) { + StringBuffer sb = new StringBuffer(bArray.length); + String sTemp; + for (int i = 0; i < bArray.length; i++) { + sTemp = Integer.toHexString(0xFF & bArray[i]); + if (sTemp.length() < 2) + sb.append(0); + if(i < bArray.length - 1){ + sb.append(sTemp.toUpperCase()).append(" "); + }else{ + sb.append(sTemp.toUpperCase()); + } + } + return sb.toString(); + } + + public static String toHexString1(byte b) { + String s = Integer.toHexString(b & 0xFF); + if (s.length() == 1) { + return "0" + s; + } else { + return s; + } + } + +} diff --git a/iot-energy/src/main/java/org/example/utils/MyEncoder.java b/iot-energy/src/main/java/org/example/utils/MyEncoder.java new file mode 100644 index 0000000..85a873b --- /dev/null +++ b/iot-energy/src/main/java/org/example/utils/MyEncoder.java @@ -0,0 +1,43 @@ +package org.example.utils; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import org.apache.commons.codec.DecoderException; +import org.apache.commons.codec.binary.Hex; + +/** + * @author zjb + * @since 2023-12-11 10:49:32 + */ +public class MyEncoder extends ChannelOutboundHandlerAdapter { + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + // 对消息进行编码 + ByteBuf encodedMsg = encode2(msg); + // 将编码后的消息发送到下一个 ChannelHandler + ctx.write(encodedMsg, promise); + } + + // 自定义的编码方法 16进制解码 字符串 + private ByteBuf encode(Object msg) throws DecoderException { + // 实现编码逻辑 + ByteBuf buf = Unpooled.buffer(); + buf.writeBytes(Hex.decodeHex(msg.toString().replaceAll(" ", "").toCharArray())); + return buf; + } + + //编码 十进制数组编码 + private ByteBuf encode2(Object msg) throws DecoderException { +// 创建一个新的ByteBuf对象 + ByteBuf byteBuf = Unpooled.buffer(); + // 遍历数组并写入到ByteBuf中 + for (int i : (int [])msg) { + byteBuf.writeByte(i); + } + return byteBuf; + } +} diff --git a/iot-energy/src/main/java/org/example/utils/SnowflakeIdGenerator.java b/iot-energy/src/main/java/org/example/utils/SnowflakeIdGenerator.java new file mode 100644 index 0000000..2efc2c1 --- /dev/null +++ b/iot-energy/src/main/java/org/example/utils/SnowflakeIdGenerator.java @@ -0,0 +1,131 @@ +package org.example.utils; + +import org.springframework.stereotype.Component; + +@Component +public class SnowflakeIdGenerator { + // ==============================Fields=========================================== + /** + * 开始时间截 (2015-01-01) + */ + private final long twepoch = 1420041600000L; + /** + * 机器id所占的位数 + */ + private final long workerIdBits = 5L; + /** + * 数据标识id所占的位数 + */ + private final long datacenterIdBits = 5L; + /** + * 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) + */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + /** + * 支持的最大数据标识id,结果是31 + */ + private final long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); + /** + * 序列在id中占的位数 + */ + private final long sequenceBits = 12L; + /** + * 机器ID向左移12位 + */ + private final long workerIdShift = sequenceBits; + /** + * 数据标识id向左移17位(12+5) + */ + private final long datacenterIdShift = sequenceBits + workerIdBits; + /** + * 时间截向左移22位(5+5+12) + */ + private final long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; + /** + * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) + */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + /** + * 工作机器ID(0~31) + */ + private long workerId; + /** + * 数据中心ID(0~31) + */ + private long datacenterId; + /** + * 毫秒内序列(0~4095) + */ + private long sequence = 0L; + /** + * 上次生成ID的时间截 + */ + private long lastTimestamp = -1L; + //==============================Constructors===================================== + + /** + * 构造函数 * @param workerId 工作ID (0~31) * @param datacenterId 数据中心ID (0~31) + */ + public SnowflakeIdGenerator(long workerId, long datacenterId) { + if (workerId > maxWorkerId || workerId < 0) { + throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId)); + } + if (datacenterId > maxDatacenterId || datacenterId < 0) { + throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId)); + } + this.workerId = workerId; + this.datacenterId = datacenterId; + } // ==============================Methods========================================== + + public SnowflakeIdGenerator() { + + } + + /** + * 获得下一个ID (该方法是线程安全的) * @return SnowflakeId + */ + public synchronized long nextId() { + long timestamp = timeGen(); + //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 +// if (timestamp < lastTimestamp) { +// throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); +// } + //如果是同一时间生成的,则进行毫秒内序列 + if (lastTimestamp == timestamp) { + sequence = (sequence + 1) & sequenceMask; + //毫秒内序列溢出 + if (sequence == 0) { + //阻塞到下一个毫秒,获得新的时间戳 + timestamp = tilNextMillis(lastTimestamp); + } + } + //时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + //上次生成ID的时间截 + lastTimestamp = timestamp; + //移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) + | (datacenterId << datacenterIdShift) | (workerId << workerIdShift) | sequence; + } + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 * @param lastTimestamp 上次生成ID的时间截 * @return 当前时间戳 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 返回以毫秒为单位的当前时间 * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } +} + diff --git a/iot-energy/src/main/resources/application.properties b/iot-energy/src/main/resources/application.properties new file mode 100644 index 0000000..61be5d9 --- /dev/null +++ b/iot-energy/src/main/resources/application.properties @@ -0,0 +1,32 @@ +spring.application.name=netty +server.servlet.encoding.force=true +server.port=7781 +lombok.var.flagUsage = ALLOW +# Single file max size +multipart.maxFileSize=50Mb +# All files max size +multipart.maxRequestSize=50Mb + +spring.datasource.dynamic.datasource.primary=primary +spring.datasource.dynamic.datasource.primary.url=jdbc:mysql://10.16.56.3:3308/jcxt?useUnicode=true&characterEncoding=utf-8&useSSL=false +spring.datasource.dynamic.datasource.primary.username=root +spring.datasource.dynamic.datasource.primary.password=Unity3du#d112233 +spring.datasource.dynamic.datasource.primary.driver-class-name=com.mysql.jdbc.Driver +spring.datasource.dynamic.druid.initial-size=10 +spring.datasource.dynamic.druid.max-idle=20 +spring.datasource.dynamic.druid.min-idle=5 +spring.datasource.dynamic.druid.max-active=50 +spring.datasource.dynamic.druid.log-abandoned=true +spring.datasource.dynamic.druid.remove-abandoned=true +spring.datasource.dynamic.druid.remove-abandoned-timeout-millis=120 +spring.datasource.dynamic.druid.max-wait=1000 +spring.datasource.dynamic.druid.test-while-idle=true +spring.datasource.dynamic.druid.validation-query=select 1 from dual +spring.datasource.dynamic.druid.test-on-borrow=true +spring.datasource.dynamic.druid.min-evictable-idle-time-millis=100000 +spring.datasource.dynamic.druid.max-evictable-idle-time-millis=100000 +spring.datasource.dynamic.druid.time-between-eviction-runs-millis=100000 +spring.datasource.dynamic.druid.break-after-acquire-failure=true +spring.datasource.dynamic.druid.connection-error-retry-attempts=5 +spring.datasource.dynamic.druid.fail-fast=true +spring.datasource.dynamic.druid.time-between-connect-error-millis=10000 \ No newline at end of file