WX-Game1/Assets/Scripts/Mqtt/MqttManager.cs

397 lines
9.7 KiB
C#
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using Best.MQTT;
using Best.MQTT.Packets;
using Best.MQTT.Packets.Builders;
using DefaultNamespace;
using MotionFramework;
using Newtonsoft.Json;
using Unity.VisualScripting;
using UnityEngine;
/// <summary>
/// Test 类,用于演示如何在 Unity 环境中使用 MQTT 客户端库与 MQTT 代理进行交互。
/// </summary>
public class MqttManager : MonoBehaviour
{
public static MqttManager Instance;
/// <summary>
/// MQTT 客户端
/// </summary>
MQTTClient client;
/// <summary>
/// 订阅的主题接收的消息
/// </summary>
[Header("订阅的世界主题接收的消息")] public List<string> wordStrings = new List<string>();
/// <summary>
/// 订阅的主题接收的消息
/// </summary>
private Queue<string> wordStringQueue = new Queue<string>();
/// <summary>
/// 聊天面板管理器
/// </summary>
public ChatPanelManager chatPanelManager;
/// <summary>
/// 用户信息
/// </summary>
public UserInfo userInfo;
/// <summary>
/// 发送消息实例
/// </summary>
public MqttRoot mqttRoot;
/// <summary>
/// MQTT配置
/// </summary>
public MqttConfigData mqttConfigData;
/// <summary>
/// 历史消息
/// </summary>
public HistoryRoot historyRoot;
void Awake()
{
Instance = this;
}
private void Start()
{
GetHistory();
GetMqttConfig();
}
public void Update()
{
if (wordStringQueue.Count > 0)
{
string str = wordStringQueue.Dequeue();
MessageDisplay(str);
}
}
/// <summary>
/// 初始化MQTT
/// </summary>
public void Init()
{
// 根据平台选择不同的连接方式
var options = new ConnectionOptionsBuilder()
#if !UNITY_WEBGL || UNITY_EDITOR
.WithTCP(mqttConfigData.host, mqttConfigData.port)
#else
.WithWebSocket("broker.emqx.io", 8084).WithTLS()
#endif
.Build();
// 初始化 MQTT 客户端并设置事件处理程序
client = new MQTTClientBuilder()
.WithOptions(options)
.WithEventHandler(OnConnected)
.WithEventHandler(OnDisconnected)
.WithEventHandler(OnStateChanged)
.WithEventHandler(OnError)
.CreateClient();
// 开始连接到 MQTT 代理
client.BeginConnect(ConnectPacketBuilderCallback);
}
/// <summary>
/// 处理用户信息
/// </summary>
async void GetMqttConfig()
{
try
{
Debug.Log("开始获取mqtt连接配置...");
// 使用UserDataNetworkManager获取用户信息
MqttConfigRoot response = await UserDataNetworkManager.Instance.GetMqttConfig();
if (response != null && response.data != null)
{
mqttConfigData = response.data;
Init();
Debug.Log("mqtt连接配置获取成功");
}
else
{
Debug.LogError("mqtt连接配置为空");
}
}
catch (Exception ex)
{
Debug.LogError($"获取mqtt连接配置时发生错误: {ex.Message}");
// 可以在这里添加错误处理逻辑,比如显示错误提示给用户
}
}
/// <summary>
/// 获取历史消息
/// </summary>
async void GetHistory()
{
try
{
Debug.Log("开始获取历史消息...");
// 使用UserDataNetworkManager获取用户信息
HistoryRoot response = await UserDataNetworkManager.Instance.GetHistroy();
Debug.Log(response.msg);
if (response != null && response.data != null)
{
historyRoot = response;
foreach (var item in historyRoot.data)
{
chatPanelManager.AddBubble(item.user_username + ":" + item.chat_content, false);
}
Debug.Log("获取历史消息获取成功!");
}
else
{
Debug.LogError("获取历史消息为空");
}
}
catch (Exception ex)
{
Debug.LogError($"获取历史消息发生错误: {ex.Message}");
// 可以在这里添加错误处理逻辑,比如显示错误提示给用户
}
}
/// <summary>
/// 消息接收
/// </summary>
/// <param name="message"></param>
private void MessageDisplay(string message)
{
Debug.Log("接收消息: " + message);
wordStrings.Add(message);
MessageDto _mqrrRoot = JsonConvert.DeserializeObject<MessageDto>(message);
chatPanelManager.AddBubble(_mqrrRoot.nickname+ ":" + _mqrrRoot.content, false);
}
private void OnConnected(MQTTClient client)
{
// 订阅指定的主题并设置消息接收回调
client.CreateSubscriptionBuilder(mqttConfigData.topic)
.WithMessageCallback(OnWordMessage)
.BeginSubscribe();
}
/// <summary>
/// 世界频道消息
/// </summary>
/// <param name="client"></param>
/// <param name="topic"></param>
/// <param name="topicName"></param>
/// <param name="message"></param>
private void OnWordMessage(MQTTClient client, SubscriptionTopic topic, string topicName, ApplicationMessage message)
{
// 处理接收到的消息
Debug.Log("得到数据");
var payload = Encoding.UTF8.GetString(message.Payload.Data, message.Payload.Offset, message.Payload.Count);
Debug.Log($"订阅输出的数据:Content-Type: '{message.ContentType}' Payload: '{payload}'");
wordStringQueue.Enqueue(payload);
}
private void OnDestroy()
{
// 在对象销毁时断开与 MQTT 代理的连接
client?.CreateDisconnectPacketBuilder()
.WithReasonCode(DisconnectReasonCodes.NormalDisconnection)
.WithReasonString("Bye")
.BeginDisconnect();
}
private ConnectPacketBuilder ConnectPacketBuilderCallback(MQTTClient client, ConnectPacketBuilder builder)
{
// 设置连接时的用户名和密码
return builder.WithUserNameAndPassword(mqttConfigData.username, mqttConfigData.password);
}
private void OnStateChanged(MQTTClient client, ClientStates oldState, ClientStates newState)
{
// 处理客户端状态变化
Debug.Log($"{oldState} => {newState}");
}
private void OnDisconnected(MQTTClient client, DisconnectReasonCodes code, string reason)
{
// 处理断开连接事件
Debug.Log($"OnDisconnected - code: {code}, reason: '{reason}'");
}
private void OnError(MQTTClient client, string reason)
{
// 处理错误事件
Debug.Log($"OnError reason: '{reason}'");
}
/// <summary>
/// 发送Mqtt消息
/// </summary>
/// <param name="topic"></param>
/// <param name="message"></param>
public void SendMessage(string topic, string message)
{
if (client.State != ClientStates.Connected)
{
Debug.Log("连接已经断开,不发送消息");
return;
}
client.CreateApplicationMessageBuilder(topic)
.WithPayload(message)
.WithQoS(Best.MQTT.Packets.QoSLevels.AtMostOnceDelivery)
.BeginPublish();
}
//
// // 示例方法,用于触发发送消息
// [ContextMenu("ExamplePublish")]
// public void ExamplePublish()
// {
// SendMessage("世界", "你好MQTT!");
// }
}
[Serializable]
public class MqttData
{
/// <summary>
///
/// </summary>
public int chat_id;
/// <summary>
/// 内容
/// </summary>
public string chat_content;
/// <summary>
/// 发送消息的用户名
/// </summary>
public string user_username;
/// <summary>
///
/// </summary>
public string user_image;
}
[Serializable]
public class MqttRoot
{
/// <summary>
///
/// </summary>
public int code;
/// <summary>
///
/// </summary>
public string msg;
/// <summary>
///
/// </summary>
public List<MqttData> data;
}
public class SendMessageData
{
/// <summary>
///
/// </summary>
public int user_id { get; set; }
/// <summary>
///
/// </summary>
public int user_account_id { get; set; }
/// <summary>
/// 李四
/// </summary>
public string user_username { get; set; }
/// <summary>
///
/// </summary>
public string user_image { get; set; }
/// <summary>
///
/// </summary>
public int user_honor_level { get; set; }
/// <summary>
///
/// </summary>
public int have { get; set; }
}
public class SendMessageRoot
{
/// <summary>
/// 响应状态码0-成功,其他-失败)
/// </summary>
public int code { get; set; }
/// <summary>
/// 响应消息
/// </summary>
public string msg { get; set; }
/// <summary>
/// 响应数据(发送消息接口返回字符串类型,如"发送成功"
/// </summary>
public string data { get; set; }
}
public class MessageDto
{
/// <summary>
///
/// </summary>
public int userId { get; set; }
/// <summary>
/// 张三
/// </summary>
public string nickname { get; set; }
/// <summary>
///
/// </summary>
public string image { get; set; }
/// <summary>
///
/// </summary>
public string content { get; set; }
/// <summary>
///
/// </summary>
public string time { get; set; }
}