using NetMQ;
using NetMQ.Sockets;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using UnityEngine;
///
/// 发布模块
///
public class NetMqPublisher
{
private string serverPubIP;
private Thread _listenerWorker;
public bool _listenerCancelled;
///
/// 自定义名称
///
public string typeName;
public delegate void MessageDelegateByte(st_Motions st);
private readonly MessageDelegateByte _messageDelegateByte;
///
/// 发送队列
///
private readonly ConcurrentQueue _messageQueueData = new ConcurrentQueue();
public NetMqPublisher(string pubIP)
{
serverPubIP = pubIP;
StartListen();
}
///
/// 开始监听
///
public void StartListen()
{
_listenerCancelled = false;
_listenerWorker = new Thread(ByteListenerWork);
_listenerWorker.IsBackground = true;
_listenerWorker.Start();
}
///
/// 停止监听
///
public void StopListen()
{
_listenerCancelled = true;
//_listenerWorker.Join();
}
///
/// 发送线程
///
private void ByteListenerWork()
{
AsyncIO.ForceDotNet.Force();
using (var publishSocket = new PublisherSocket())
{
publishSocket.Options.ReceiveHighWatermark = 1000;
publishSocket.Connect(serverPubIP);
UnityEngine.Debug.Log("客户端开启成功:"+ serverPubIP);
while (!_listenerCancelled)
{
if (!_messageQueueData.IsEmpty)
{
if (_messageQueueData.TryDequeue(out st_Motions sendData))
{
if(_listenerCancelled)
{
UnityEngine.Debug.LogError("_listenerCancelled="+true);
}
byte[] tmpbytes = new byte[4 + sendData.m_sOperaData.Length];
Array.Copy(BitConverter.GetBytes(sendData.m_iOperaType), 0, tmpbytes, 0, 4);
Array.Copy(sendData.m_sOperaData, 0, tmpbytes, 4, sendData.m_sOperaData.Length);
if (!_listenerCancelled && !publishSocket.SendMoreFrame(sendData.area).TrySendFrame(tmpbytes)) continue;
UnityEngine.Debug.Log("发送一个消息:" + sendData.area + "," + sendData.m_iOperaType);
}
}
}
publishSocket.Close();
}
NetMQConfig.Cleanup();
UnityEngine.Debug.LogError("发送线程退出:"+typeName);
}
public void AddMessageToSendQue(string area, int type, byte[] data)
{
st_Motions sendData = new st_Motions { area = area, m_iOperaType = type, m_sOperaData = data };
_messageQueueData.Enqueue(sendData);
}
public void AddMessageToSendQue(st_Motions st)
{
if (!string.IsNullOrEmpty(st.area))
{
_messageQueueData.Enqueue(st);
}
else
{
UnityEngine.Debug.LogError("area空:type=" + st.m_iOperaType);
}
}
}