import mqtt from "mqtt"; // 实例池,用于缓存相同配置的实例(仅用于MQTT) const instancePool = new Map(); class DataMiddleware { /** * 构造函数 * @param {Object} options - 实例配置 */ constructor(options = {}) { // 存储实例配置 this.options = { url: options.url || '', reconnectConfig: { maxRetries: 10, //最大重连次数 initialDelay: 1000, // maxDelay: 30000, ...options.reconnectConfig }, mqttOptions: options.mqttOptions || {}, httpOptions: options.httpOptions || {}, reconnect: true }; // 存储所有请求/订阅的信息 this.requests = { http: new Map(), // HTTP请求仅用于跟踪当前请求以便取消 mqtt: new Map() // MQTT订阅缓存: key为topic }; // 客户端实例 this.mqttClient = null; // 连接状态 this.connectionStatus = { mqtt: 'disconnected', // disconnected, connecting, connected, error http: 'idle', retryCount: 0 }; // 用于生成唯一订阅ID this.subscriptionIdCounter = 0; } /** * 获取实例的唯一标识键 */ getInstanceKey() { return this.options.url; } /** * 静态方法:获取或创建实例 */ static getInstance(options = {}) { const key = options.url || ''; if (instancePool.has(key)) { return instancePool.get(key); } const instance = new DataMiddleware(options); instancePool.set(key, instance); return instance; } /** * 获取当前连接状态 */ getStatus() { return { ...this.connectionStatus }; } /** * 获取当前MQTT客户端实例(用于发布消息) */ getMqttClient() { return this.mqttClient; } /** * HTTP请求方法 - 优化参数处理,支持keys可选 * @param {string} url - 请求地址 * @param {string[]|Function} [keys] - 需要提取的key数组,可选 * @param {Function} [callback] - 回调函数,格式: (data, error) => {},可选 * @returns {Promise} 返回Promise对象,同时通过callback暴露结果 */ httpRequest(url, keys, callback) { // 处理参数:支持keys可选,自动识别callback let actualKeys = []; let actualCallback = null; if (typeof keys === 'function') { actualCallback = keys; actualKeys = []; } else if (Array.isArray(keys) && typeof callback === 'function') { actualKeys = keys; actualCallback = callback; } else if (Array.isArray(keys)) { actualKeys = keys; actualCallback = null; } else if (arguments.length === 1) { actualKeys = []; actualCallback = null; } // 生成唯一请求ID const requestId = this.generateSubscriptionId(); // 创建AbortController用于取消请求 const controller = new AbortController(); // 存储请求信息以便后续取消 this.requests.http.set(requestId, { controller, url }); // 将keys数组转换为逗号分隔的字符串 const keysStr = Array.isArray(actualKeys) && actualKeys.length > 0 ? actualKeys.join(',') : ''; // 构建URL并添加keys参数 const fullUrl = new URL(url); if (keysStr) { fullUrl.searchParams.append('keys', keysStr); } // 构建请求选项,禁用缓存 const fetchOptions = { method: 'GET', headers: { 'Content-Type': 'application/json', 'Cache-Control': 'no-cache, no-store, must-revalidate', 'Pragma': 'no-cache', 'Expires': '0', ...this.options.httpOptions.headers }, signal: controller.signal, ...this.options.httpOptions }; this.connectionStatus.http = 'loading'; // 返回Promise,同时支持callback const promise = new Promise((resolve, reject) => { fetch(fullUrl.toString(), fetchOptions) .then(response => { this.connectionStatus.http = 'idle'; if (!response.ok) { throw new Error(`HTTP error! status: ${response.status}`); } return response.json(); }) .then(data => { // 请求完成后从跟踪表中移除 this.requests.http.delete(requestId); // 根据keys过滤数据 // const filteredData = this._filterDataByKeys(data, actualKeys); // 调用回调函数(如果提供) if (typeof actualCallback === 'function') { actualCallback(data, null); } resolve(data); }) .catch(error => { this.connectionStatus.http = 'idle'; this.requests.http.delete(requestId); // 忽略取消请求的错误 if (error.name !== 'AbortError') { // 调用回调函数(如果提供) if (typeof actualCallback === 'function') { actualCallback(null, error); } reject(error); } }); }); // 将请求ID附加到promise上,方便取消请求 promise.requestId = requestId; return promise; } /** * 取消HTTP请求 * @param {string} requestId - 请求ID * @returns {boolean} 是否取消成功 */ cancelHttpRequest(requestId) { if (this.requests.http.has(requestId)) { const { controller } = this.requests.http.get(requestId); controller.abort(); this.requests.http.delete(requestId); return true; } return false; } /** * 连接到MQTT服务器 */ _connectMqtt() { if (!this.options.url) { console.error('MQTT WebSocket URL未配置'); return; } // 如果已有连接,先断开 if (this.mqttClient) { this.mqttClient.end(false, {}, () => { console.log('已断开现有MQTT连接'); }); } this.connectionStatus.mqtt = 'connecting'; this._notifyAllMqttSubscribers({ type: 'status', status: 'connecting' }); // 使用mqtt.js连接 this.mqttClient = mqtt.connect(this.options.url, { clientId: `vue-client-${Math.random().toString(36).substr(2, 10)}`, clean: true, reconnectPeriod: 0, // 禁用内置重连,使用自定义重连策略 ...this.options.mqttOptions }); // 连接成功回调 this.mqttClient.on('connect', () => { console.log('MQTT连接成功'); this.connectionStatus.mqtt = 'connected'; this.connectionStatus.retryCount = 0; // 通知所有订阅者 this._notifyAllMqttSubscribers({ type: 'status', status: 'connected' }); // 重新订阅所有主题 this.requests.mqtt.forEach((info, topic) => { this.mqttClient.subscribe(topic, { qos: info.qos }); }); }); // 收到消息回调 this.mqttClient.on('message', (topic, message) => { try { const data = JSON.parse(message.toString()); this._onMqttMessage(topic, data); } catch (error) { console.error('解析MQTT消息失败:', error); this._notifyMqttError(topic, new Error('消息格式错误')); } }); // 连接断开回调 this.mqttClient.on('close', () => { if (this.connectionStatus.mqtt !== 'disconnected') { console.log('MQTT连接已关闭'); this.connectionStatus.mqtt = 'disconnected'; this._notifyAllMqttSubscribers({ type: 'status', status: 'disconnected' }); if(this.reconnect) { this._scheduleReconnect(); } } }); // 错误回调 this.mqttClient.on('error', (error) => { console.error('MQTT错误:', error); this.connectionStatus.mqtt = 'error'; this._notifyAllMqttSubscribers({ type: 'error', error: new Error(`连接错误: ${error.message}`) }); }); } /** * 订阅MQTT主题 */ mqttSubscribe(topics, callback, options = {}) { if (!this.options.url) { throw new Error('MQTT WebSocket URL未配置'); } // 验证输入参数 if (!Array.isArray(topics) || topics.length === 0) { throw new Error('请提供有效的主题数组'); } if (typeof callback !== 'function') { throw new Error('请提供回调函数'); } const subIds = []; // 遍历主题数组,为每个主题创建订阅 topics.forEach(topic => { const subId = this.generateSubscriptionId(); const mqttKey = topic; // 添加订阅者 if (this.requests.mqtt.has(mqttKey)) { const mqttInfo = this.requests.mqtt.get(mqttKey); mqttInfo.subscribers[subId] = { callback }; } else { const mqttInfo = { topic, options, subscribers: { [subId]: { callback } }, qos: options.qos || 1 }; this.requests.mqtt.set(mqttKey, mqttInfo); } subIds.push(subId); }); // 连接MQTT(如果尚未连接) if (!this.mqttClient || !this.mqttClient.connected) { this._connectMqtt(); } else { // 已连接状态下直接订阅所有主题 topics.forEach(topic => { this.mqttClient.subscribe(topic, { qos: options.qos || 0 }); }); } // 返回所有订阅ID的数组 return subIds; } /** * 处理接收到的MQTT消息 */ _onMqttMessage(topic, data) { const mqttInfo = this.requests.mqtt.get(topic); if (!mqttInfo) return; // 通知该主题的所有订阅者 Object.values(mqttInfo.subscribers).forEach(({ keys, callback }) => { try { const filteredData = this._filterDataByKeys(data, keys); callback({ type: 'data', data: filteredData, topic, timestamp: Date.now() }, null); } catch (error) { console.error('通知MQTT订阅者失败:', error); } }); } /** * 发布MQTT消息 */ mqttPublish(topic, message, options = {}) { return new Promise((resolve, reject) => { if (!this.mqttClient || !this.mqttClient.connected) { reject(new Error('MQTT客户端未连接')); return; } this.mqttClient.publish(topic, JSON.stringify(message), options, (error) => { if (error) { reject(error); } else { resolve(true); } }); }); } /** * 安排重连 */ _scheduleReconnect() { if (this.connectionStatus.retryCount >= this.options.reconnectConfig.maxRetries) { console.error('已达到最大重连次数'); return; } this.connectionStatus.retryCount++; const delay = Math.min( this.options.reconnectConfig.initialDelay * Math.pow(2, this.connectionStatus.retryCount - 1), this.options.reconnectConfig.maxDelay ); console.log(`将在 ${delay}ms 后尝试重连 (第 ${this.connectionStatus.retryCount} 次)`); setTimeout(() => { this._connectMqtt(); }, delay); } /** * 计算重连延迟 */ _calculateReconnectDelay(attempt) { const delay = this.options.reconnectConfig.initialDelay * Math.pow(2, attempt); return Math.min(delay, this.options.reconnectConfig.maxDelay); } /** * 取消订阅 */ unsubscribe(data) { if(data && data.length) { data.forEach((d)=> { this.unsubscribeFun(d); }) } } /** * 取消订阅 */ unsubscribeFun(subId) { // if (!['http', 'mqtt'].includes(type)) { // console.error('无效的订阅类型'); // return false; // } let removed = false; this.requests['mqtt'].forEach((info, key) => { if (info.subscribers && info.subscribers[subId]) { delete info.subscribers[subId]; removed = true; console.log(info.subscribers,"----info.subscribers"); // 如果没有订阅者了,清理资源 if (info.subscribers && Object.keys(info.subscribers).length === 0) { console.log('移除------'); this._cleanupRequest( key, info); } } }); return removed; } /** * 清理不再有订阅者的请求/订阅 */ _cleanupRequest(key, info) { if (this.mqttClient && this.mqttClient.connected) { this.mqttClient.unsubscribe(info.topic); } this.requests.mqtt.delete(key); if(!this.requests.mqtt.size) { this.destroy(); } // if (type === 'http') { // 清除HTTP请求的控制器 // if (info.controller) { // info.controller.abort(); // } // this.requests.http.delete(key); // } else if (type === 'mqtt') { // if (this.mqttClient && this.mqttClient.connected) { // this.mqttClient.unsubscribe(info.topic); // } // this.requests.mqtt.delete(key); // } } /** * 生成唯一订阅ID */ generateSubscriptionId() { return `sub_${this.subscriptionIdCounter++}`; } /** * 根据key数组过滤数据 */ _filterDataByKeys(data, keys) { if (!keys || keys.length === 0) { return data; } if (Array.isArray(data)) { return data.map(item => this._filterObjectByKeys(item, keys)); } return this._filterObjectByKeys(data, keys); } /** * 根据key数组过滤对象属性 */ _filterObjectByKeys(obj, keys) { if (typeof obj !== 'object' || obj === null) { return obj; } return keys.reduce((result, key) => { const value = this._getNestedValue(obj, key); if (value !== undefined) { result[key] = value; } return result; }, {}); } /** * 获取对象的嵌套属性值 */ _getNestedValue(obj, key) { return key.split('.').reduce((current, part) => { if (current && typeof current === 'object' && part in current) { return current[part]; } return undefined; }, obj); } /** * 通知所有MQTT订阅者 */ _notifyAllMqttSubscribers(message) { this.requests.mqtt.forEach((mqttInfo) => { Object.values(mqttInfo.subscribers).forEach(({ callback }) => { try { if (message.error) { callback(null, message.error); } else { callback(message, null); } } catch (err) { console.error('通知MQTT订阅者失败:', err); } }); }); } /** * 通知MQTT订阅者错误 */ _notifyMqttError(topic, error) { const mqttInfo = this.requests.mqtt.get(topic); if (!mqttInfo) return; Object.values(mqttInfo.subscribers).forEach(({ callback }) => { try { callback(null, error); } catch (err) { console.error('通知MQTT订阅者错误失败:', err); } }); } /** * 销毁实例 */ destroy() { // 断开MQTT连接 if (this.mqttClient) { this.reconnect = false; this.mqttClient.end(true); this.mqttClient = null; } // 取消所有HTTP请求 // this.requests.http.forEach(info => { // if (info.controller) { // info.controller.abort(); // } // }); // 清空请求缓存 // this.requests.http.clear(); this.requests.mqtt.clear(); // 从实例池移除 instancePool.delete(this.getInstanceKey()); } } export default DataMiddleware;