225 lines
6.1 KiB
C#
225 lines
6.1 KiB
C#
using System.Collections;
|
|
using System.Collections.Generic;
|
|
using UnityEngine;
|
|
using MQTTnet.Client;
|
|
using MQTTnet;
|
|
using System.Threading.Tasks;
|
|
using MQTTnet.Packets;
|
|
using MQTTnet.Server;
|
|
using Unity.VisualScripting.FullSerializer;
|
|
|
|
/// <summary>
|
|
/// MQTT管理器
|
|
/// </summary>
|
|
public class MQTTManager : MonoBehaviour
|
|
{
|
|
|
|
private static MQTTManager _Instance;
|
|
|
|
private static readonly string MQTTURI = "172.16.1.253";
|
|
private static readonly int MQTTPort = 1883;
|
|
private static readonly string MQTTUser = "dev";
|
|
private static readonly string MQTTPassword = "!11@22#33";
|
|
|
|
private string _ClientID;
|
|
|
|
/// <summary>
|
|
/// 重连间隔时间(s)
|
|
/// </summary>
|
|
private static readonly float ReconnectGapTime = 10;
|
|
|
|
/// <summary>
|
|
/// 客户端
|
|
/// </summary>
|
|
private IMqttClient _Client;
|
|
|
|
/// <summary>
|
|
/// 失败次数
|
|
/// </summary>
|
|
private int _FailCount;
|
|
|
|
/// <summary>
|
|
/// 当前状态
|
|
/// </summary>
|
|
private MQTTStatus _Status;
|
|
|
|
/// <summary>
|
|
/// 等待时间
|
|
/// </summary>
|
|
private float _WaitTime;
|
|
|
|
// Start is called before the first frame update
|
|
void Start()
|
|
{
|
|
_ClientID = System.Guid.NewGuid().ToString();
|
|
Task.Run(InitMQTT);
|
|
}
|
|
|
|
// Update is called once per frame
|
|
void Update()
|
|
{
|
|
if (_Status == MQTTStatus.Failed)
|
|
{
|
|
_WaitTime += Time.deltaTime;
|
|
if (_WaitTime > ReconnectGapTime)
|
|
{
|
|
_WaitTime = 0;
|
|
Reconnect();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 初始化MQTT信息
|
|
/// </summary>
|
|
private void InitMQTT()
|
|
{
|
|
MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder()
|
|
.WithTcpServer(MQTTURI, MQTTPort) // 要访问的mqtt服务端的 ip 和 端口号
|
|
.WithCredentials(MQTTUser, MQTTPassword) // 要访问的mqtt服务端的用户名和密码
|
|
.WithClientId(_ClientID) // 设置客户端id
|
|
.WithCleanSession()
|
|
.WithTls(new MqttClientOptionsBuilderTlsParameters
|
|
{
|
|
UseTls = false // 是否使用 tls加密
|
|
});
|
|
MqttClientOptions clientOptions = builder.Build();
|
|
_Client = new MqttFactory().CreateMqttClient();
|
|
|
|
_Client.ConnectedAsync += Client_ConnectedAsync; // 客户端连接成功事件
|
|
_Client.DisconnectedAsync += Client_DisconnectedAsync; // 客户端连接关闭事件
|
|
_Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync; ; // 收到消息事件
|
|
|
|
// _Status = MQTTStatus.Connecting;
|
|
_Client.ConnectAsync(clientOptions);
|
|
}
|
|
/// <summary>
|
|
/// 重新连接
|
|
/// </summary>
|
|
private void Reconnect()
|
|
{
|
|
// LogManager.LogInfo("重新连接");
|
|
Task.Run(delegate ()
|
|
{
|
|
_Status = MQTTStatus.Connecting;
|
|
_Client.ReconnectAsync();
|
|
});
|
|
}
|
|
|
|
/// <summary>
|
|
/// 新消息事件
|
|
/// </summary>
|
|
/// <param name="arg"></param>
|
|
/// <returns></returns>
|
|
private System.Threading.Tasks.Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
|
|
{
|
|
//string str = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array);
|
|
//// Loom.QueueOnMainThread(() =>
|
|
//// {
|
|
//// LogManager.LogInfo("收到消息:" + arg.ApplicationMessage.Topic + "=====" + str);
|
|
//// });
|
|
//Debug.Log($"收到消息:{arg.ApplicationMessage.Topic}====={str}");
|
|
//return Task.CompletedTask;
|
|
|
|
string topic1 = arg.ApplicationMessage.Topic;//根据该属性分辨是哪个
|
|
//主题传递的消息,你可以根据不同的主题传来的消息做出不同的反应
|
|
//将消息转为字符串
|
|
string payload = System.Text.Encoding.UTF8.GetString(arg.ApplicationMessage.PayloadSegment.Array);
|
|
//下面一般是你自己的业务逻辑,如何处理传递过来的消息
|
|
//reciveDatas.Enqueue(payload);通常会使用队列先存起来,后续会在Update
|
|
//中处理
|
|
// UnityEngine.Debug.Log($"Topic is {topic}, Payload is {payload}");
|
|
return Task.CompletedTask;
|
|
|
|
}
|
|
|
|
/// <summary>
|
|
/// 连接断开事件
|
|
/// </summary>
|
|
/// <param name="arg"></param>
|
|
/// <returns></returns>
|
|
private System.Threading.Tasks.Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
|
|
{
|
|
// Loom.QueueOnMainThread(() =>
|
|
// {
|
|
// LogManager.LogInfo("MQTT连接断开:" + arg.Reason);
|
|
// });
|
|
_Status = MQTTStatus.Failed;
|
|
_FailCount++;
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 连接成功事件
|
|
/// </summary>
|
|
/// <param name="arg"></param>
|
|
/// <returns></returns>
|
|
private System.Threading.Tasks.Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
|
|
{
|
|
_Status = MQTTStatus.Connected;
|
|
// Loom.QueueOnMainThread(() =>
|
|
// {
|
|
// LogManager.LogInfo("MQTT连接成功");
|
|
// _FailCount = 0;
|
|
// });
|
|
Debug.Log("MQTT连接成功");
|
|
return Task.CompletedTask;
|
|
}
|
|
|
|
/// <summary>
|
|
/// 发布消息
|
|
/// </summary>
|
|
public static void PublishAsync(MqttApplicationMessage message)
|
|
{
|
|
if (_Instance)
|
|
{
|
|
_Instance._Client.PublishAsync(message);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 订阅消息
|
|
/// </summary>
|
|
public static void SubscribeAsync(string options)
|
|
{
|
|
if (_Instance)
|
|
{
|
|
_Instance._Client.SubscribeAsync(options);
|
|
Debug.Log($"订阅{options}");
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 取消订阅消息
|
|
/// </summary>
|
|
public static void UnsubscribeAsync(MqttClientUnsubscribeOptions options)
|
|
{
|
|
if (_Instance)
|
|
{
|
|
_Instance._Client.UnsubscribeAsync(options);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// 状态
|
|
/// </summary>
|
|
public enum MQTTStatus
|
|
{
|
|
Empty = 0,
|
|
|
|
/// <summary>
|
|
/// 连接中
|
|
/// </summary>
|
|
Connecting = 1,
|
|
|
|
/// <summary>
|
|
/// 连接成功
|
|
/// </summary>
|
|
Connected = 2,
|
|
|
|
/// <summary>
|
|
/// 连接失败
|
|
/// </summary>
|
|
Failed = 3,
|
|
}
|
|
} |