using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Data; using System.Linq; using System.Reflection.PortableExecutable; using System.Threading; using System.Threading.Tasks; using DataAcquisition.Model; using HslCommunication; using HslCommunication.Core; using HslCommunication.ModBus; using MySqlConnector; using System.Text.Json; using MySql.Data.MySqlClient; // 连接池项类,用于管理每个PLC的连接 public class ConnectionPoolItem { public List Clients { get; set; } = new List(); public SemaphoreSlim Semaphore { get; set; } } public class ModbusDataCollector : IDisposable { private readonly Dictionary _modbusClients = new Dictionary(); private readonly object _lock = new object(); private readonly object data_lock = new object(); private readonly ConcurrentDictionary _timers = new ConcurrentDictionary(); private readonly ConcurrentDictionary> _frequencyGroups = new ConcurrentDictionary>(); private bool _disposed; private readonly int _maxReadRegisters = 125; // Modbus TCP一次最多读取125个寄存器 private readonly int _maxReadCoils = 2000; // Modbus TCP一次最多读取2000个线圈或离散输入 private readonly int _maxConnectionsPerPLC = 2; // 每个PLC的最大连接数 private readonly ConcurrentDictionary _connectionPool = new ConcurrentDictionary(); private readonly string _connectionString; // 数据库连接字符串 public event EventHandler LogMessage; public ModbusDataCollector(string connectionString) { _connectionString = connectionString; } public async Task InitializeAsync() { var devices = await LoadDevicesAsync(); UpdateFrequencyGroups(devices); StartTimers(); } private void Log(string message) { LogMessage?.Invoke(this, $"[{DateTime.Now:yyyy-MM-dd HH:mm:ss.fff}] {message}"); } public async Task> LoadDevicesAsync() { var devices = new List(); Gather.Mysql.BLL.iot_device device_bll = new Gather.Mysql.BLL.iot_device(); Gather.Mysql.BLL.iot_plc_pt device_pt_bll = new Gather.Mysql.BLL.iot_plc_pt(); Gather.Mysql.BLL.iot_function_code_config function_code_config_bll = new Gather.Mysql.BLL.iot_function_code_config(); var device_list = device_bll.GetModelList(""); var device_pt_list = device_pt_bll.GetModelList(""); var function_code_config_list = function_code_config_bll.GetModelList(""); // 查询每个设备的点位 foreach (var device in device_list) { var device_model = new DeviceInfo { Id = device.id, DeviceNoHost = device.device_no_host, DeviceNoFollow = device.device_no_follow, DeviceName = device.device_name, IpAddress = device.service_ip, Port = int.Parse(device.service_port), Station = byte.Parse(device.station), AcquisitionFrequency = int.Parse(device.acquisition_frequency) }; devices.Add(device_model); var device_pt = device_pt_list.Where(pt => pt.pt_device_id == device.id).ToList(); foreach (var item in device_pt) { var point = new ModbusPoint { Id = item.id, Name = item.pt_name, EnglishName = item.pt_english, Address = item.pt_value, FunctionCode = item.pt_function, Pattern = item.pt_pattern, DataType = item.pt_data_type, ByteOrder = item.pt_byte_order, // 读取是否按位读取的字段值 IsBitRead = item.pt_is_bit_read, // 读取位索引的字段值 BitIndex = !string.IsNullOrEmpty(item.pt_bit_index) ? int.Parse(item.pt_bit_index) : 0, StartAddressFromZero = "1" }; var function_code_config_model = function_code_config_list.Where(a => a.function_code == point.FunctionCode).FirstOrDefault(); if (function_code_config_model != null) { point.StartAddressFromZero = function_code_config_model.start_address_from_zero; } if (!string.IsNullOrEmpty(item.pt_magnification_ratio)) { point.Magnification = float.Parse(item.pt_magnification_ratio); } device_model.Points.Add(point); } } Log($"从数据库加载了 {devices.Count} 个设备"); return devices; } private void UpdateFrequencyGroups(List devices) { var newGroups = devices .Where(d => d.AcquisitionFrequency > 0) .GroupBy(d => d.AcquisitionFrequency) .ToDictionary(g => g.Key, g => g.ToList()); // 移除不再存在的频率组 var removedFrequencies = _frequencyGroups.Keys.Except(newGroups.Keys).ToList(); foreach (var freq in removedFrequencies) { if (_frequencyGroups.TryRemove(freq, out _) && _timers.TryRemove(freq, out var timer)) { timer.Dispose(); Log($"移除频率组: {freq}ms"); } } // 更新或添加新频率组 foreach (var group in newGroups) { _frequencyGroups.AddOrUpdate(group.Key, group.Value, (k, v) => group.Value); if (!_timers.ContainsKey(group.Key)) { var timer = new Timer(CollectDevicesByFrequency, group.Key, Timeout.Infinite, Timeout.Infinite); _timers.TryAdd(group.Key, timer); Log($"创建新的采集定时器: {group.Key}ms, 设备数: {group.Value.Count}"); } } } private void StartTimers() { foreach (var kvp in _frequencyGroups) { if (_timers.TryGetValue(kvp.Key, out var timer)) { timer.Change(kvp.Key, kvp.Key); Log($"启动定时器: {kvp.Key}ms, 设备数: {kvp.Value.Count}"); } } } private async void CollectDevicesByFrequency(object state) { int frequency = (int)state; if (!_frequencyGroups.TryGetValue(frequency, out var devices) || devices.Count == 0) return; Log($"开始采集频率组: {frequency}ms, 设备数: {devices.Count}"); try { var results = await BatchReadDevicesAsync(devices); foreach (var deviceResult in results) { var device = devices.FirstOrDefault(d => d.Id == deviceResult.Key); if (device == null) continue; // 生成 JSON var statusJson = GenerateStatusJson(device, deviceResult.Value); // 判断值是否变更并更新数据库 UpdateDeviceStatusInDatabase(device.Id, statusJson); foreach (var pointResult in deviceResult.Value) { var point = device.Points.FirstOrDefault(p => p.Id == pointResult.Key); if (point == null) continue; // 这里可以添加处理采集结果的逻辑,如存入数据库或发送到消息队列 Log($"设备 {device.DeviceName}({device.Id}) 点位 {point.Name}({point.Address})({point.FunctionCode}): {pointResult.Value}"); } } } catch (Exception ex) { Log($"频率 {frequency}ms 采集失败: {ex.Message}"); } } private string GenerateStatusJson(DeviceInfo device, Dictionary pointResults) { var status = new Dictionary(); foreach (var point in device.Points) { if (pointResults.TryGetValue(point.Id, out var value)) { status[point.EnglishName] = value; } } return JsonSerializer.Serialize(status); } private void UpdateDeviceStatusInDatabase(long deviceId, string statusJson) { using (var connection = new MySqlConnection(_connectionString)) { connection.Open(); lock (data_lock) { // 查询现有记录 var selectQuery = "SELECT status_json FROM iot_device_status WHERE device_id = @DeviceId"; using (var selectCommand = new MySqlCommand(selectQuery, connection)) { selectCommand.Parameters.AddWithValue("@DeviceId", deviceId); var existingJson = selectCommand.ExecuteScalar() as string; if (existingJson != statusJson) { // 有变更,更新或插入记录 if (existingJson != null) { // 更新现有记录 var updateQuery = "UPDATE iot_device_status SET status_json = @StatusJson, update_time = @UpdateTime WHERE device_id = @DeviceId"; using (var updateCommand = new MySqlCommand(updateQuery, connection)) { updateCommand.Parameters.AddWithValue("@DeviceId", deviceId); updateCommand.Parameters.AddWithValue("@StatusJson", statusJson); updateCommand.Parameters.AddWithValue("@UpdateTime", DateTime.Now); updateCommand.ExecuteNonQuery(); } } else { // 插入新记录 var insertQuery = "INSERT INTO iot_device_status (id, device_id, status_json, creat_time, update_time) VALUES (@Id, @DeviceId, @StatusJson, @CreatTime, @UpdateTime)"; using (var insertCommand = new MySqlCommand(insertQuery, connection)) { var id = Guid.NewGuid().ToString("N"); insertCommand.Parameters.AddWithValue("@Id", id); insertCommand.Parameters.AddWithValue("@DeviceId", deviceId); insertCommand.Parameters.AddWithValue("@StatusJson", statusJson); insertCommand.Parameters.AddWithValue("@CreatTime", DateTime.Now); insertCommand.Parameters.AddWithValue("@UpdateTime", DateTime.Now); insertCommand.ExecuteNonQuery(); Log($"{deviceId},插入数据:{id}"); } } } } } } } private async Task>> BatchReadDevicesAsync(List devices) { var results = new Dictionary>(); var tasks = new List(); foreach (var device in devices) { tasks.Add(Task.Run(async () => { try { using (await GetConnectionSemaphore(device)) { var client = GetModbusClient(device); var deviceResults = await ReadDevicePointsAsync(client, device); lock (_lock) { results[device.Id] = deviceResults; } } } catch (Exception ex) { Log($"读取设备 {device.DeviceName}({device.Id}) 数据失败: {ex.Message}"); } })); } await Task.WhenAll(tasks); return results; } private async Task GetConnectionSemaphore(DeviceInfo device) { var key = $"{device.IpAddress}:{device.Port}"; var poolItem = _connectionPool.GetOrAdd(key, k => { var item = new ConnectionPoolItem { Semaphore = new SemaphoreSlim(_maxConnectionsPerPLC, _maxConnectionsPerPLC) }; return item; }); await poolItem.Semaphore.WaitAsync(); return new SemaphoreReleaser(poolItem.Semaphore); } private ModbusTcpNet GetModbusClient(DeviceInfo device) { var key = $"{device.IpAddress}:{device.Port}"; var poolItem = _connectionPool.GetOrAdd(key, k => new ConnectionPoolItem()); lock (_lock) { var client = poolItem.Clients.FirstOrDefault(c => c.Station == device.Station); if (client != null) { return client; } client = new ModbusTcpNet(device.IpAddress, device.Port, device.Station) { ConnectTimeOut = 2000 }; //获取或设置起始的地址是否从0开始,默认为True //client.AddressStartWithZero = false; var connectResult = client.ConnectServer(); if (!connectResult.IsSuccess) { throw new Exception($"连接设备 {device.DeviceName}({device.IpAddress}:{device.Port}) 失败: {connectResult.Message}"); } poolItem.Clients.Add(client); Log($"创建新的Modbus客户端: {device.DeviceName}({device.IpAddress}:{device.Port})"); return client; } } private DataFormat GetDataFormat(string byteOrder) { if (string.IsNullOrEmpty(byteOrder)) { return DataFormat.ABCD; } switch (byteOrder?.ToUpper()) { case "ABCD": return DataFormat.ABCD; case "BADC": return DataFormat.BADC; case "CDAB": return DataFormat.CDAB; case "DCBA": return DataFormat.DCBA; default: return DataFormat.ABCD; } } private async Task> ReadDevicePointsAsync(ModbusTcpNet client, DeviceInfo device) { var results = new Dictionary(); var functionGroups = device.Points.GroupBy(p => p.FunctionCode); byte stationNumber = device.Station; foreach (var group in functionGroups) { var functionCode = group.Key; var points = group.OrderBy(p => p.Address).ToList(); try { switch (functionCode) { case "01": await ReadCoils(client, points, results, stationNumber); break; case "02": await ReadDiscreteInputs(client, points, results, stationNumber); break; case "03": case "04": await ReadRegisters(client, points, results, functionCode, stationNumber); break; default: Log($"设备 {device.DeviceName} 不支持的功能码: {functionCode}"); break; } } catch (Exception ex) { Log($"读取设备 {device.DeviceName} 功能码 {functionCode} 数据失败: {ex.Message}"); } } return results; } private async Task ReadCoils(ModbusTcpNet client, List points, Dictionary results, byte stationNumber) { var addressGroups = MergeAddresses(points, _maxReadCoils); foreach (var group in addressGroups) { var startAddress = group.Min(p => ushort.Parse(p.Address)); var length = group.Max(p => ushort.Parse(p.Address)) - startAddress + 1; if (group.First().StartAddressFromZero == "1") { client.AddressStartWithZero = true; } else { client.AddressStartWithZero = false; } var readResult = await client.ReadCoilAsync($"s={stationNumber};{startAddress}", (ushort)length); if (!readResult.IsSuccess) { throw new Exception($"读取线圈失败: {readResult.Message}"); } else { Log($"DO,s={stationNumber};{startAddress},长度{length},返回值:{string.Join(',', readResult.Content)}"); } var values = readResult.Content; foreach (var point in group) { var index = ushort.Parse(point.Address) - startAddress; if (index < values.Length) { results[point.Id] = values[index]; } } } } private async Task ReadDiscreteInputs(ModbusTcpNet client, List points, Dictionary results, byte stationNumber) { var addressGroups = MergeAddresses(points, _maxReadCoils); foreach (var group in addressGroups) { var startAddress = group.Min(p => ushort.Parse(p.Address)); var length = group.Max(p => ushort.Parse(p.Address)) - startAddress + 1; if (group.First().StartAddressFromZero == "1") { client.AddressStartWithZero = true; } else { client.AddressStartWithZero = false; } var readResult = await client.ReadDiscreteAsync($"s={stationNumber};{startAddress}", (ushort)length); if (!readResult.IsSuccess) { throw new Exception($"读取离散输入失败: {readResult.Message}"); } else { Log($"DI,s={stationNumber};{startAddress},长度{length},返回值:{string.Join(',', readResult.Content)}"); } var values = readResult.Content; foreach (var point in group) { var index = ushort.Parse(point.Address) - startAddress; if (index < values.Length) { results[point.Id] = values[index]; } } } } private async Task ReadRegisters(ModbusTcpNet client, List points, Dictionary results, string functionCode, byte stationNumber) { var registerPoints = points.Select(p => new { Point = p, RegisterCount = GetRegisterCount(p.DataType), Address = ushort.Parse(p.Address), StartAddressFromZero = p.StartAddressFromZero }).ToList(); var addressGroups = MergeRegisterGroups(registerPoints, _maxReadRegisters); foreach (var group in addressGroups) { var startAddress = group.Min(x => x.Address); var endAddress = group.Max(x => x.Address + x.RegisterCount - 1); var length = endAddress - startAddress + 1; if (group.First().StartAddressFromZero == "1") { client.AddressStartWithZero = true; } else { client.AddressStartWithZero = false; } var addressPrefix = functionCode == "03" ? "3" : "4"; var readResult = await client.ReadAsync($"s={stationNumber};x={addressPrefix};{startAddress}", (ushort)length); if (!readResult.IsSuccess) { throw new Exception($"读取寄存器失败: {readResult.Message}"); } else { Log($"s={stationNumber};x={addressPrefix};{startAddress},长度{length},返回值:{string.Join(',', readResult.Content)}"); } var bytes = readResult.Content; foreach (var item in group) { var offset = (item.Address - startAddress) * 2; if (offset + item.RegisterCount * 2 <= bytes.Length) { var value = ConvertRegisterValue(bytes, offset, item.RegisterCount, item.Point.DataType, item.Point.ByteOrder, item.Point.Magnification); // 按位读取处理 if (item.Point.IsBitRead == "1") { int intValue = (int)value; bool bitValue = (intValue & (1 << item.Point.BitIndex)) != 0; value = bitValue ? 1 : 0; } results[item.Point.Id] = value; } } } } private int GetRegisterCount(string dataType) { switch (dataType?.ToLower()) { case "bool": case "byte": case "short": case "ushort": return 1; case "int": case "uint": case "float": return 2; case "long": case "ulong": case "double": return 4; default: return 2; } } private object ConvertRegisterValue(byte[] bytes, int offset, int registerCount, string dataType, string byteOrder, float Magnification) { var data = new byte[registerCount * 2]; Array.Copy(bytes, offset, data, 0, data.Length); data = ReorderBytes(data, byteOrder); switch (dataType?.ToLower()) { case "bool": return BitConverter.ToBoolean(data, 0) ? 1 : 0; case "byte": return data[0]; case "short": return BitConverter.ToInt16(data, 0) * Magnification; case "ushort": return BitConverter.ToUInt16(data, 0) * Magnification; case "int": return BitConverter.ToInt32(data, 0) * Magnification; case "uint": return BitConverter.ToUInt32(data, 0) * Magnification; case "long": return BitConverter.ToInt64(data, 0) * Magnification; case "ulong": return BitConverter.ToUInt64(data, 0) * Magnification; case "float": return BitConverter.ToSingle(data, 0) * Magnification; case "double": return BitConverter.ToDouble(data, 0) * Magnification; default: return BitConverter.ToUInt16(data, 0) * Magnification; } } /// /// 按照指定的Modbus字节顺序重新排列字节数组 /// /// 原始字节数组 /// 字节顺序类型:ABCD, BADC, CDAB, DCBA /// 重新排列后的字节数组 public byte[] ReorderBytes(byte[] bytes, string byteOrder) { if (bytes == null || bytes.Length == 0) return Array.Empty(); // 根据不同的字节顺序进行处理 switch (byteOrder?.ToUpper()) { case "ABCD": return ReverseDWords(bytes); // 原始顺序,无需转换 case "BADC": return SwapDWords(bytes); // 交换每四个字节中的前两个和后两个 case "CDAB": return SwapWords(bytes); // 交换每两个字节 case "DCBA": return bytes; // 完全反转每四个字节 default: throw new ArgumentException($"不支持的字节顺序: {byteOrder}", nameof(byteOrder)); } } // 交换每两个字节 (BADC) private byte[] SwapWords(byte[] bytes) { var result = new byte[bytes.Length]; for (int i = 0; i < bytes.Length; i += 2) { if (i + 1 < bytes.Length) { result[i] = bytes[i + 1]; // 高字节与低字节交换 result[i + 1] = bytes[i]; } else { result[i] = bytes[i]; // 剩余单个字节保持不变 } } return result; } // 交换每四个字节中的前两个和后两个 (CDAB) private byte[] SwapDWords(byte[] bytes) { var result = new byte[bytes.Length]; for (int i = 0; i < bytes.Length; i += 4) { if (i + 3 < bytes.Length) { result[i] = bytes[i + 2]; // 第三个字节放到第一个位置 result[i + 1] = bytes[i + 3]; // 第四个字节放到第二个位置 result[i + 2] = bytes[i]; // 第一个字节放到第三个位置 result[i + 3] = bytes[i + 1]; // 第二个字节放到第四个位置 } else if (i + 1 < bytes.Length) { // 不足四个字节但至少有两个字节时,交换这两个字节 result[i] = bytes[i + 1]; result[i + 1] = bytes[i]; if (i + 2 < bytes.Length) result[i + 2] = bytes[i + 2]; // 剩余单个字节保持不变 } else { result[i] = bytes[i]; // 剩余单个字节保持不变 } } return result; } // 完全反转每四个字节 (DCBA) private byte[] ReverseDWords(byte[] bytes) { var result = new byte[bytes.Length]; for (int i = 0; i < bytes.Length; i += 4) { if (i + 3 < bytes.Length) { result[i] = bytes[i + 3]; // 第四个字节放到第一个位置 result[i + 1] = bytes[i + 2]; // 第三个字节放到第二个位置 result[i + 2] = bytes[i + 1]; // 第二个字节放到第三个位置 result[i + 3] = bytes[i]; // 第一个字节放到第四个位置 } else if (i + 1 < bytes.Length) { // 不足四个字节但至少有两个字节时,交换这两个字节 result[i] = bytes[i + 1]; result[i + 1] = bytes[i]; if (i + 2 < bytes.Length) result[i + 2] = bytes[i + 2]; // 剩余单个字节保持不变 } else { result[i] = bytes[i]; // 剩余单个字节保持不变 } } return result; } private List> MergeAddresses(List points, int maxCount) { var result = new List>(); if (points.Count == 0) return result; var orderedPoints = points.OrderBy(p => ushort.Parse(p.Address)).ToList(); var currentGroup = new List { orderedPoints[0] }; for (int i = 1; i < orderedPoints.Count; i++) { var prevAddress = ushort.Parse(orderedPoints[i - 1].Address); var currAddress = ushort.Parse(orderedPoints[i].Address); if (currAddress - prevAddress <= 1 && currentGroup.Count < maxCount) { currentGroup.Add(orderedPoints[i]); } else { result.Add(currentGroup); currentGroup = new List { orderedPoints[i] }; } } if (currentGroup.Count > 0) { result.Add(currentGroup); } return result; } private List> MergeRegisterGroups(List items, int maxRegisters) where T : class { var result = new List>(); if (items.Count == 0) return result; var orderedItems = items.OrderBy(x => (x as dynamic).Address).ToList(); var currentGroup = new List { orderedItems[0] }; var currentRegisters = (int)(orderedItems[0] as dynamic).RegisterCount; for (int i = 1; i < orderedItems.Count; i++) { var item = orderedItems[i]; var address = (ushort)(item as dynamic).Address; var registerCount = (int)(item as dynamic).RegisterCount; var lastItem = currentGroup.Last(); var lastAddress = (ushort)(lastItem as dynamic).Address; var lastRegisterCount = (int)(lastItem as dynamic).RegisterCount; var endAddress = lastAddress + lastRegisterCount - 1; var gap = address - endAddress - 1; if (gap <= 1 && currentRegisters + registerCount + gap <= maxRegisters) { currentGroup.Add(item); currentRegisters += registerCount + Math.Max(0, gap); } else { result.Add(currentGroup); currentGroup = new List { item }; currentRegisters = registerCount; } } if (currentGroup.Count > 0) { result.Add(currentGroup); } return result; } public async Task RefreshDevicesAsync() { var devices = await LoadDevicesAsync(); UpdateFrequencyGroups(devices); StartTimers(); } public void Dispose() { if (_disposed) return; _disposed = true; lock (_lock) { foreach (var timer in _timers.Values) { timer.Dispose(); } _timers.Clear(); foreach (var poolItem in _connectionPool.Values) { foreach (var client in poolItem.Clients) { try { client.ConnectClose(); } catch { // 忽略关闭错误 } } poolItem.Semaphore.Dispose(); } _connectionPool.Clear(); } Log("Modbus采集器已释放"); } } // 用于释放信号量的辅助类 public class SemaphoreReleaser : IDisposable { private readonly SemaphoreSlim _semaphore; public SemaphoreReleaser(SemaphoreSlim semaphore) { _semaphore = semaphore; } public void Dispose() { _semaphore.Release(); } }