using System.Collections; using System.Collections.Generic; using UnityEngine; using MQTTnet.Client; using MQTTnet; using System.Threading.Tasks; using MQTTnet.Packets; using MQTTnet.Server; using Unity.VisualScripting.FullSerializer; /// /// MQTT管理器 /// public class MQTTManager : MonoBehaviour { private static MQTTManager _Instance; private static readonly string MQTTURI = "172.16.1.253"; private static readonly int MQTTPort = 1883; private static readonly string MQTTUser = "dev"; private static readonly string MQTTPassword = "!11@22#33"; private string _ClientID; /// /// 重连间隔时间(s) /// private static readonly float ReconnectGapTime = 10; /// /// 客户端 /// private IMqttClient _Client; /// /// 失败次数 /// private int _FailCount; /// /// 当前状态 /// private MQTTStatus _Status; /// /// 等待时间 /// private float _WaitTime; // Start is called before the first frame update void Start() { _ClientID = System.Guid.NewGuid().ToString(); Task.Run(InitMQTT); } // Update is called once per frame void Update() { if (_Status == MQTTStatus.Failed) { _WaitTime += Time.deltaTime; if (_WaitTime > ReconnectGapTime) { _WaitTime = 0; Reconnect(); } } } /// /// 初始化MQTT信息 /// private void InitMQTT() { MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder() .WithTcpServer(MQTTURI, MQTTPort) // 要访问的mqtt服务端的 ip 和 端口号 .WithCredentials(MQTTUser, MQTTPassword) // 要访问的mqtt服务端的用户名和密码 .WithClientId(_ClientID) // 设置客户端id .WithCleanSession() .WithTls(new MqttClientOptionsBuilderTlsParameters { UseTls = false // 是否使用 tls加密 }); MqttClientOptions clientOptions = builder.Build(); _Client = new MqttFactory().CreateMqttClient(); _Client.ConnectedAsync += Client_ConnectedAsync; // 客户端连接成功事件 _Client.DisconnectedAsync += Client_DisconnectedAsync; // 客户端连接关闭事件 _Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync; ; // 收到消息事件 // _Status = MQTTStatus.Connecting; _Client.ConnectAsync(clientOptions); } /// /// 重新连接 /// private void Reconnect() { // LogManager.LogInfo("重新连接"); Task.Run(delegate () { _Status = MQTTStatus.Connecting; _Client.ReconnectAsync(); }); } /// /// 新消息事件 /// /// /// private System.Threading.Tasks.Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg) { //string str = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array); //// Loom.QueueOnMainThread(() => //// { //// LogManager.LogInfo("收到消息:" + arg.ApplicationMessage.Topic + "=====" + str); //// }); //Debug.Log($"收到消息:{arg.ApplicationMessage.Topic}====={str}"); //return Task.CompletedTask; string topic1 = arg.ApplicationMessage.Topic;//根据该属性分辨是哪个 //主题传递的消息,你可以根据不同的主题传来的消息做出不同的反应 //将消息转为字符串 string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array); //下面一般是你自己的业务逻辑,如何处理传递过来的消息 //reciveDatas.Enqueue(payload);通常会使用队列先存起来,后续会在Update //中处理 // UnityEngine.Debug.Log($"Topic is {topic}, Payload is {payload}"); return Task.CompletedTask; } /// /// 连接断开事件 /// /// /// private System.Threading.Tasks.Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg) { // Loom.QueueOnMainThread(() => // { // LogManager.LogInfo("MQTT连接断开:" + arg.Reason); // }); _Status = MQTTStatus.Failed; _FailCount++; return Task.CompletedTask; } /// /// 连接成功事件 /// /// /// private System.Threading.Tasks.Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg) { _Status = MQTTStatus.Connected; // Loom.QueueOnMainThread(() => // { // LogManager.LogInfo("MQTT连接成功"); // _FailCount = 0; // }); Debug.Log("MQTT连接成功"); return Task.CompletedTask; } /// /// 发布消息 /// public static void PublishAsync(MqttApplicationMessage message) { if (_Instance) { _Instance._Client.PublishAsync(message); } } /// /// 订阅消息 /// public static void SubscribeAsync(string options) { if (_Instance) { _Instance._Client.SubscribeAsync(options); Debug.Log($"订阅{options}"); } } /// /// 取消订阅消息 /// public static void UnsubscribeAsync(MqttClientUnsubscribeOptions options) { if (_Instance) { _Instance._Client.UnsubscribeAsync(options); } } /// /// 状态 /// public enum MQTTStatus { Empty = 0, /// /// 连接中 /// Connecting = 1, /// /// 连接成功 /// Connected = 2, /// /// 连接失败 /// Failed = 3, } }