using NetMQ;
using NetMQ.Sockets;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using UnityEngine;
/// 
/// 发布模块
/// 
public class NetMqPublisher
{
    private string serverPubIP;
    private Thread _listenerWorker;
    public bool _listenerCancelled;
    /// 
    /// 自定义名称
    /// 
    public string typeName;
    public delegate void MessageDelegateByte(st_Motions st);
    private readonly MessageDelegateByte _messageDelegateByte;
    /// 
    /// 发送队列
    /// 
    private readonly ConcurrentQueue _messageQueueData = new ConcurrentQueue();
    public NetMqPublisher(string pubIP)
    {
        serverPubIP = pubIP;
        StartListen();
    }
    /// 
    /// 开始监听
    /// 
    public void StartListen()
    {
        _listenerCancelled = false;
        _listenerWorker = new Thread(ByteListenerWork);
        _listenerWorker.IsBackground = true;
        _listenerWorker.Start();
    }
    /// 
    /// 停止监听
    /// 
    public void StopListen()
    {
        _listenerCancelled = true;
        //_listenerWorker.Join();
    }
    /// 
    /// 发送线程
    /// 
    private void ByteListenerWork()
    {
        AsyncIO.ForceDotNet.Force();
        using (var publishSocket = new PublisherSocket())
        {
            publishSocket.Options.ReceiveHighWatermark = 1000;
            publishSocket.Connect(serverPubIP);
            UnityEngine.Debug.Log("客户端开启成功" + _listenerCancelled);
            while (!_listenerCancelled)
            {
                if (!_messageQueueData.IsEmpty)
                {
                    st_Motions sendData = new st_Motions();
                    if (_messageQueueData.TryDequeue(out sendData))
                    {
                        if(_listenerCancelled)
                        {
                            UnityEngine.Debug.LogError("_listenerCancelled="+true);
                        }
                        byte[] tmpbytes = new byte[4 + sendData.m_sOperaData.Length];
                        Array.Copy(BitConverter.GetBytes(sendData.m_iOperaType), 0, tmpbytes, 0, 4);
                        Array.Copy(sendData.m_sOperaData, 0, tmpbytes, 4, sendData.m_sOperaData.Length);
                        if (!_listenerCancelled && !publishSocket.SendMoreFrame(sendData.area).TrySendFrame(tmpbytes)) continue;
                        UnityEngine.Debug.Log("发送一个消息:" + sendData.area + "," + sendData.m_iOperaType);
                    }
                }
            }
            publishSocket.Close();
        }
        NetMQConfig.Cleanup();
        UnityEngine.Debug.LogError("发送线程退出:"+typeName);
    }
    public void AddMessageToSendQue(string area, int type, byte[] data)
    {
        st_Motions sendData = new st_Motions { area = area, m_iOperaType = type, m_sOperaData = data };
        _messageQueueData.Enqueue(sendData);
    }
    public void AddMessageToSendQue(st_Motions st)
    {
        if (!string.IsNullOrEmpty(st.area))
        {
            _messageQueueData.Enqueue(st);
        }
        else
        {
            UnityEngine.Debug.LogError("area空:type=" + st.m_iOperaType);
        }
    }
}