You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

546 lines
19 KiB
C#

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
namespace HuizhongLibrary.Network
{
#region 服务器
public class UdpService
{
public int Port = 0;
public int RefshSecond = 30; //如果30秒没有收到回复,那么重发
public int MaxSendNumber = 3; //超出最大发送次数,那么认为已经断开
public int MaxWaitMsgNumber = 16; //最大消息等待回包数量(滑动窗口)
public int OutTime = 60 * 5; //断开多少秒后重连
public List<byte> SocketBuffer = new List<byte>(); //网络数据缓存
private AutoResetEvent AutoReset = new AutoResetEvent(false);
public event Action<UserSocket, byte[]> ReceiveDataEvent; //读取到数据时发生
public event Action<string> CloseEvent; //关闭时发生
public event Action<UserSocket> ConnectEvent; //连接客户端时发生
public List<SocketMessage> ListMessage = new List<SocketMessage>();
public List<UserSocket> ListUserSocket = new List<UserSocket>();
bool IsRun = false;
UdpClient udp = null;
IPEndPoint groupEP = null;
public UdpService()
{
}
#region 启动
public void Start(int Port)
{
this.Port = Port;
groupEP = new IPEndPoint(IPAddress.Any, Port);
udp = new UdpClient(Port);
udp.BeginReceive(ReceiveThread, null);
Thread t = new Thread(this.SendThread);
t.Start();
}
#endregion
#region 停止
public void Stop()
{
IsRun = false;
AutoReset.Set();
udp.Close();
lock (ListUserSocket)
{
ListUserSocket.Clear();
}
}
#endregion
#region 关闭连接
public void CloseSocket(string DeviceNo)
{
for (int i = ListUserSocket.Count - 1; i >= 0; i--)
{
if (ListUserSocket[i].DeviceNo != DeviceNo) continue;
ListUserSocket[i].IsLogin = false;
}
if (this.CloseEvent != null) this.CloseEvent(DeviceNo);
}
#endregion
#region 接收线程
void ReceiveThread(IAsyncResult ar)
{
byte[] data = udp.EndReceive(ar, ref groupEP);
string ip=groupEP.Address.ToString();
UserSocket model = GetUserSocket(ip);
if (model == null)
{
model = new UserSocket("", ip, groupEP.Port);
ListUserSocket.Add(model);
}
model.Port = groupEP.Port;
if (this.ReceiveDataEvent != null) this.ReceiveDataEvent(model, data);
udp.BeginReceive(ReceiveThread, null);
}
#endregion
#region 发送线程
void SendThread()
{
IsRun = true;
while (IsRun)
{
for (int i = ListUserSocket.Count - 1; i >= 0; i--)
{
UserSocket model = ListUserSocket[i];
if (model.IsLogin == false)
{
//如果状态是断开状态,并且超出连接时间,那么重连客户端
if (model.PrevTime.AddSeconds(OutTime) < DateTime.Now)
{
if (this.ConnectEvent != null) this.ConnectEvent(model);
}
continue;
}
else
{
while (model.WaitMsgNumber < this.MaxWaitMsgNumber)
{
SocketMessage Msg = model.GetNextSocketMessage(this.RefshSecond);
if (Msg == null) break;
//如果超出N次没回应,那么就算是断开连接
if (Msg.SendNumber > this.MaxSendNumber)
{
model.PrevTime = DateTime.Now;
model.IsLogin = false;
if (this.CloseEvent != null) this.CloseEvent(model.DeviceNo);
break;
}
try
{
Msg.SendNumber++;
Msg.SendTime = DateTime.Now;
int SendLength=udp.Send(Msg.Bytes, Msg.Bytes.Length, model.EndPoint);
if (SendLength != Msg.Bytes.Length)
{
//如果发送的数据长度不对,那么就算是断开连接
model.PrevTime = DateTime.Now;
model.IsLogin = false;
if (this.CloseEvent != null) this.CloseEvent(model.DeviceNo);
break;
}
}
catch
{
//如果发送出错,那么就算是断开连接
model.PrevTime = DateTime.Now;
model.IsLogin = false;
if (this.CloseEvent != null) this.CloseEvent(model.DeviceNo);
break;
}
}
if (ListMessage.Count == 0) continue;
bool IsRemoveMessage = false;
for (int ii = 0; ii < ListMessage.Count; ii++)
{
SocketMessage Model = ListMessage[ii];
if (Model.Enabled == false) { IsRemoveMessage = true; continue; }
//清理超过10分钟还没有发送的数据
if (Model.SendTime.AddMinutes(20) < DateTime.Now)
{
Model.Enabled = false;
IsRemoveMessage = true;
continue;
}
if (Model.DeviceNo != model.DeviceNo) continue;
model.ListMessage.Add(Model.Copy());
Model.Enabled = false;
IsRemoveMessage = true;
}
if (IsRemoveMessage == true) RemoveSocketMessage();
}
AutoReset.WaitOne(100);
}
AutoReset.WaitOne(1000);
}
}
#endregion
#region 移除已发送消息
public void RemoveSocketMessage()
{
lock (((ICollection)this.ListMessage).SyncRoot)
{
for (int i = ListMessage.Count - 1; i >= 0; i--)
{
if (ListMessage[i].Enabled == false) ListMessage.RemoveAt(i);
}
}
}
#endregion
#region 读取UserSocket
public UserSocket GetUserSocket(string ip)
{
for (int i =ListUserSocket.Count - 1; i >= 0; i--)
{
if (ListUserSocket[i].IpAddress == ip) return ListUserSocket[i];
}
return null;
}
#endregion
#region 发送数据
public void Send(byte[] bytes,IPEndPoint endPoint)
{
udp.Send(bytes, bytes.Length, endPoint);
}
#endregion
}
#endregion
#region 连接对象
public class UserSocket
{
public string DeviceNo = "";
public string IpAddress = "";
public int Port = 0;
public Byte[] Data = null;
public DateTime PrevTime = DateTime.Now;
public List<byte> SocketBuffer = new List<byte>();
public List<SocketMessage> ListMessage = new List<SocketMessage>();
public int WaitMsgNumber = 0;
public bool IsLogin = false;
private IPEndPoint m_EndPoint;
public IPEndPoint EndPoint
{
get { return m_EndPoint; }
}
private DateTime CacheDataTime = DateTime.Now;
public UserSocket(string DeviceNo, string IpAddress,int Port)
{
PrevTime = DateTime.Now;
WaitMsgNumber = 0;
IsLogin = false;
this.DeviceNo = DeviceNo;
this.IpAddress = IpAddress;
this.Port = Port;
m_EndPoint = new IPEndPoint(IPAddress.Parse(IpAddress), Port);
}
#region 返回要发送的消息
public SocketMessage GetNextSocketMessage(int RefshSecond)
{
SocketMessage model = null;
for (int i = 0; i < this.ListMessage.Count; i++)
{
model = this.ListMessage[i];
if (model.Enabled == false) continue;
if (model.IsLock == true && i > 0) return null;
if (model.SendTime.AddSeconds(RefshSecond) <= DateTime.Now)
{
if (model.SendNumber > 0) DecrementWaitMsgNumber();
return model;
}
else
{
if (model.IsLock == true && i == 0) return null;
}
}
return null;
}
#endregion
#region 移除已发送消息
public void RemoveSocketMessage()
{
lock (((ICollection)this.ListMessage).SyncRoot)
{
for (int i = ListMessage.Count - 1; i >= 0; i--)
{
if (ListMessage[i].Enabled == false) ListMessage.RemoveAt(i);
}
}
}
#endregion
#region 增加等待回复的消息数
public void IncrementWaitMsgNumber()
{
Interlocked.Increment(ref WaitMsgNumber);
}
#endregion
#region 减少等待回复的消息数
public void DecrementWaitMsgNumber()
{
Interlocked.Decrement(ref WaitMsgNumber);
}
#endregion、
#region 消除已回复消息
public int EndWaitMsg(string FunNo)
{
SocketMessage model = null;
System.Threading.Monitor.Enter(this);
for (int i = 0; i < this.ListMessage.Count; i++)
{
var model2 = this.ListMessage[i];
if (model2.FunNo == FunNo)
{
model = model2;
this.ListMessage.RemoveAt(i);
break;
}
}
DecrementWaitMsgNumber();
System.Threading.Monitor.Exit(this);
if (model == null) return 0;
return model.MessageID;
}
#endregion
#region 合并缓存数据
public void AddData(byte[] RevData, int Len)
{
if (CacheDataTime.AddSeconds(10) < DateTime.Now)
{
SocketBuffer.Clear();
}
Data = new byte[Len + SocketBuffer.Count];
for (int i = 0; i < SocketBuffer.Count; i++)
{
Data[i] = SocketBuffer[i];
}
Buffer.BlockCopy(RevData, 0, Data, SocketBuffer.Count, Len);
SocketBuffer.Clear();
}
#endregion
#region 新增缓存
public void AddBuff(byte[] SrcArray, int offset)
{
for (int i = offset; i < SrcArray.Length; i++)
{
SocketBuffer.Add(SrcArray[i]);
}
CacheDataTime = DateTime.Now;
}
#endregion
#region 返回消息
public SocketMessage GetSocketMessage(string FunNo)
{
SocketMessage model = null;
SocketMessage model2 = null;
for (int i = 0; i < this.ListMessage.Count; i++)
{
model2 = this.ListMessage[i];
if (model2.FunNo == FunNo) { model = model2; break; }
}
return model;
}
#endregion
}
#endregion
#region 客户端
public class UdpClientSocket
{
public int Port = 0;
public int RefshSecond = 30; //如果30秒没有收到回复,那么重发
public int MaxSendNumber = 3; //超出最大发送次数,那么认为已经断开
public string ServerIpAddress = ""; //服务器IP地址
public int ServerPort = 0; //服务器端口
public bool IsLogin = false; //是否已经登陆,必须登陆后才能发送数据
public int OutTime = 60*5; //如果多少时间内没有数据来住,那么发送心跳保持
public List<byte> SocketBuffer = new List<byte>(); //网络数据缓存
private AutoResetEvent AutoReset = new AutoResetEvent(false);
public event Action<byte[]> ReceiveDataEvent; //读取到数据时发生
public event ThreadStart LoginEvent; //要求登陆时发生
public List<SocketMessage> ListMessage = new List<SocketMessage>();
DateTime PrevTime = DateTime.Now.AddMinutes(-6);
DateTime CacheDataTime = DateTime.Now;
bool IsRun = false;
UdpClient udp = null;
IPEndPoint SerEP = null;
IPEndPoint groupEP = null;
public UdpClientSocket()
{
}
#region 启动
public void Start(int Port,string ServerIpAddress,int ServerPort)
{
this.Port = Port;
this.ServerIpAddress = ServerIpAddress;
this.ServerPort = ServerPort;
udp = new UdpClient(Port);
SerEP = new IPEndPoint(IPAddress.Parse(ServerIpAddress), ServerPort);
groupEP = new IPEndPoint(IPAddress.Any, Port);
Thread t = new Thread(this.OnStart);
t.Start();
}
#endregion
#region 停止
public void Stop()
{
IsRun = false;
AutoReset.Set();
udp.Close();
}
#endregion
#region 接收线程
void ReceiveThread(IAsyncResult ar)
{
byte[] data = udp.EndReceive(ar, ref groupEP);
if (this.ReceiveDataEvent != null) this.ReceiveDataEvent(data);
udp.BeginReceive(ReceiveThread, null);
}
#endregion
#region 接收和发送
private void OnStart()
{
IsRun = true;
IPEndPoint groupEP = new IPEndPoint(IPAddress.Any, Port);
while (IsRun)
{
try
{
if (this.IsLogin == false && PrevTime.AddSeconds(this.OutTime) < DateTime.Now)
{
//如果处于断线状态,并且连接不成功后XX秒,再进行第二次连接
if (this.LoginEvent != null)
{
this.LoginEvent();
PrevTime = DateTime.Now;
continue;
}
}
SocketMessage Msg = GetNextSocketMessage(this.RefshSecond);
if (Msg == null) break;
if (this.MaxSendNumber > 0 && Msg.SendNumber > this.MaxSendNumber)
{
//断开连接
IsLogin = false;
continue;
}
try
{
Msg.SendNumber++;
Msg.SendTime = DateTime.Now;
int SendLen = udp.Send(Msg.Bytes, Msg.Bytes.Length, SerEP);
if (SendLen != Msg.Bytes.Length)
{
//发送的长度不对,认为已经断开同,或者重发
IsLogin = false;
continue;
}
PrevTime = DateTime.Now;
}
catch
{
//远程已经关闭连接
IsLogin = false;
}
}
catch
{
IsLogin = false;
}
AutoReset.WaitOne(100, false);
}
}
#endregion
#region 发送
public void Send(byte[] bytes)
{
try
{
int SendLen = udp.Send(bytes, bytes.Length, SerEP);
if (SendLen != bytes.Length)
{
//发送的长度不对,认为已经断开同,或者重发
IsLogin = false;
return;
}
PrevTime = DateTime.Now;
}
catch
{
//远程已经关闭连接
IsLogin = false;
}
}
#endregion
#region 返回要发送的消息
public SocketMessage GetNextSocketMessage(int RefshSecond)
{
SocketMessage model = null;
for (int i = 0; i < this.ListMessage.Count; i++)
{
model = this.ListMessage[i];
if (model.Enabled == false) continue;
if (model.IsLock == true && i > 0) return null;
if (model.SendTime.AddSeconds(RefshSecond) <= DateTime.Now)
{
return model;
}
else
{
if (model.IsLock == true && i == 0) return null;
}
}
return null;
}
#endregion
#region 消除已回复消息
public int EndWaitMsg(string FunNo)
{
SocketMessage model = null;
System.Threading.Monitor.Enter(this);
for (int i = 0; i < this.ListMessage.Count; i++)
{
model = this.ListMessage[i];
if (model.FunNo == FunNo) { this.ListMessage.RemoveAt(i); break; }
}
System.Threading.Monitor.Exit(this);
if (model == null) return 0;
return model.MessageID;
}
#endregion
#region 合并缓存数据
public byte[] MergeData(byte[] RevData)
{
if (CacheDataTime.AddSeconds(10) < DateTime.Now)
{
SocketBuffer.Clear();
}
byte[] Data = new byte[RevData.Length + SocketBuffer.Count];
for (int i = 0; i < SocketBuffer.Count; i++)
{
Data[i] = SocketBuffer[i];
}
Buffer.BlockCopy(RevData, 0, Data, SocketBuffer.Count, RevData.Length);
SocketBuffer.Clear();
return Data;
}
#endregion
#region 新增缓存
public void AddBuff(byte[] SrcArray, int offset)
{
for (int i = offset; i < SrcArray.Length; i++)
{
SocketBuffer.Add(SrcArray[i]);
}
CacheDataTime = DateTime.Now;
}
#endregion
}
#endregion
}