|
问题描述:在用TcpClient建立的网络连接中,当某个环节(大多是收数据时)出了问题,进行网络重连并再次收发数据,然后会报错。 public void Read()
{
netstream = tcpclient.GetStream(); //实例化网络数据流
//int PackageBufferLength = tcpclient.ReceiveBufferSize; //从网络流中读取的字节数
int PackageBufferLength = PackageBuffer.Length; //从网络流中读取的字节数
netstream.BeginRead(PackageBuffer, 0, PackageBufferLength, new AsyncCallback(this.ReadCallBack1), netstream); //异步读取数据
netstream.Flush(); //刷新数据流,保存方法供下一次使用
}
private void ReadCallBack1(IAsyncResult ar)//从网络流中读取数据的回调函数
{
try
{
while (true)
{
if (tcpclient.Connected && netstream.CanRead) //在通讯正常且数据流中有数据时读取
{
USIParse usiParse = new USIParse();
if (isDone) //如果数据操作完成,将数据流中的数据存到队列中
{
int DataCount = netstream.EndRead(ar);
if (DataCount <= 0) {
logger.Info("此次网络流中没有数据,只好先返回。");
Thread.Sleep(1); //休眠一下,等待数据的到来
return;
}
for (int i = 0; i < DataCount; i++) //将缓冲包中的数据存放于队列中
{
TaskBuffer.Enqueue(PackageBuffer[i]);
}
isDone = false; //队列中有了新报文等待处理,所以把标志位置为false.
Read(); //再次读取数据,并跳出循环
break;
}
}
}
}
catch (Exception ex)
{
logger.Error("读数据这里出错" + ex.ToString());
IsConnection = false;
}
}
|
|
|
#1 |
路过看看,等SP他们回复了
|
|
#2 |
|
|
#3 |
回复2楼: 在接收完数据后等待数据的处理,处理完成后会给一个isDone 的标志位,只有这个标志位为true才能再去接收数据等待处理,防止前一次的没处理完,后面的数据又上来了,用异步是不是显得没有必要? 重连代码
private void checkState()
{
while (true)
{
Thread.Sleep(500);
if ( IsConnection == false )//IsConnection 网络连接标志位
{
try
{
tcpclient.Close(); //在连接前先关闭tcp连接
if (Readthread.IsAlive)
{
Readthread.Abort();
logger.Warn("读线程关闭");
}
if (Handlethread.IsAlive)
{
Handlethread.Abort();
logger.Warn("处理线程关闭");
}
logger.Warn("网络连接断开...");
tcpclient = new TcpClient(); //重新创建TcpClient实例
tcpclient.Connect(IP, int.Parse(Port.Trim()));
if (tcpclient.Connected)
{
IsConnection = true;
logger.Warn("网络重连成功");
Write(StartMsg); //重连成功后发送启动命令
HandleStart(); //发送完毕后开启数据处理线程,其中包括Readthread和Handlethread
}
}
catch
{
logger.Error("网络重连未成功!");
IsConnection = false;
}
}
}
}
|
|
#4 |
回复2楼: 用了这个死循环,是不是也有可能使重连后接收数据陷入死循环? |
|
5分
#5 |
你用了俩死循环??
重连是一个死循环 接收数据是另一个死循环? 放弃死循环的思路 重连直接写进抛出错误的catch里去,不要没事就一直判断网络是否在连接 |
|
#6 |
回复5楼: 其实接收数据不是死循环呢,那个死循环只是想随时监测数据是否处理完成,即标志位isDone的变化,当然您可能会建议我用事件触发机制,但那个我不太熟悉,用不好,能不能采取个折中的方式?谢谢您! |
|
#7 |
回复5楼:
好的,重连我就按照您的建议做了试一下
|
|
#8 |
理清思路,重连之前先断开
|
|
#9 |
回复8楼: 重连之后是断了呀 tcpclient.Close(); //在连接前先关闭tcp连接 |
|
#10 |
回复9楼:
错了,是重连之前后断了
|
|
#11 |
我算看懂了
你在netstream.BeginRead里,指定回调函数是ReadCallBack1 而在ReadCallBack1里用死循环来判断读取数据是否完成 ReadCallBack1本来就是异步读取的回调事件,既然它执行了,那么说明读取已经完成 |
|
5分
#12 |
还有,你需要设置netstream.ReadTimeout
否则一直没有接收到数据,它就会一直阻塞,不再继续执行下去 |
|
#13 |
回复11楼: 那个死循环不是为了判断是否读完成哦,就像您说的,有了回调肯定是读完成了的,其实是在读完之后存到一个队列,另一个数据处理线程也要操作队列,这样我防止线程同时操作队列造成错误,就设置一个标志位isDone==true指示操作线程是否完成,完成了读数据线程开始,那个死循环就是为了及时的检测isDone是否为true,是的话马上读数据,开始下一轮的处理。 |
|
#14 |
回复12楼:
在这里来总是有收获,您这句提醒了我另一个需要解决问题,谢谢!
|
|
#15 |
回复13楼: 多线程操作队列,加个lock(对象)就行了,不要写那么复杂的代码 |
|
#16 |
回复12楼: 有了这句是不是就不用超时那个设置了啊? if (DataCount <= 0) {
logger.Info("此次网络流中没有数据,只好先返回。");
Thread.Sleep(1); //休眠一下,等待数据的到来
return;
}
|
|
5分
#17 |
你需要定义一个静态变量
static object 线程锁=new object(); 然后 lock(线程锁) 这样一次就只能允许1个线程访问这个代码段,其他线程都要阻塞等待 |
|
#18 |
回复16楼: 你这一句只是让主线程返回了,异步接收是在线程池里执行的,并不是结束异步接收了啊 |
|
5分
#19 |
lock的是代码段,但是也要传入对象,是根据对象来判断哪些代码段不允许多线程同时执行的
比如A线程里可以这样 lock(线程锁) { //代码A } B线程里可以这样 lock(线程锁) { //代码B } 即使要锁住的并不是同一个代码块,只要是线程锁对象是同一个,多线程同时访问的时候就只能按顺序来执行 |
|
#20 |
回复15楼:
恩,好的,那上面那个问题是什么原因导致啊,涉及到的代码我全贴上了
|
|
#21 |
回复18楼:
哦,这样,那上面这个问题很有可能与这个有关
|
|
#22 |
回复17楼:
好的好的,您这几句就把用法解释的很到位了,我一定要用一下
|
|
#23 |
因为你重连之后,tcpclient这个应该是个全局变量吧,你把它指向了新的连接(new了)
那么之前获取的netstream对象指向的对象就被回收了 对象被回收了,绑定的异步方法当然也就无法返回正确的数据了 你要知道,通信过程中,很多情况会引起抛出错误 |
|
#24 |
回复23楼:
“对象被回收了,绑定的异步方法当然也就无法返回正确的数据了”,那这个问题我该怎么办呢?
“不要不加try而让程序产生致命错误,也不要乱加try”——获益匪浅 |
|
#25 |
TcpClient 本身问题就很多。不建议使用。
|
|
5分
#26 |
我以前贴过一段底层代码,你可以从那个修改一下嫁接你的应用层解析机制。http://bbs.csdn.net/topics/390987992
关于 try…catch,上面楼层其实已经说么,我再补充明确分析说明一下: 测试与调试这个阶段,原则是尽量让bug“跳出来”,所以应该以“条件编译”或者System.Diagnostics.ConditionalAttribute来暂时去掉try。因此Debug可执行代码只有很少的try….catch,个别地方你实在找不到用程序去“正规判断”的时候才去可能偶尔看到try语句。 因为这个阶段出现bug崩溃(vs自动进入调试环境)其实是很“好”的事情,测试驱动的节奏告诉你下一步该干什么,而不是把精力都放在欣赏自己的代码上。如果你为你现在伤脑筋这个功能写上5个测试用例(50行代码),然后再写上300行代码来让测试能够通过,然后(以随机产生测试数据方式)跑上顺序10遍再并发20个线程(分别)跑上100遍,这时候你就可以去喝咖啡去了。如果测试跑过了,并且你每一次修改了代码之后都能回归跑过,你的这个通讯程序也就“非常稳定了”,也就才可以考虑Release了。 但是一旦到达最后阶段——Release阶段,产品在表现层(或者是你的框架的最外层)总是有try…catch的。因为有些人为灾难总是超过了测试环境的,例如拔掉网线、断电,这时候就必须写上try…catch。有些人看到别人的代码中有这个,就以为这个不影响测试和调试,实际上是不了解开发的流程,把发布当作开发过程。 |
|
#27 |
另外,我们要说一个.net上的常识。.net framework的 TcpListener/TcpClient 类是对windows 的IOCP的封装,而 SOCKET类并不是。因此虽然前者中调用了后者,但是不以为着二者的实用性是等价的。
在应用中应该尽量使用前者,尽量使用IOCP。 |
|
#28 |
回复27楼:
您的意思是尽量使用TcpClient吗?我是觉得它被封装得挺好用的。
|
|
#29 |
回复26楼:
我要好好领会一下
|
|
#30 |
回复25楼:
您觉得直接还是用socket靠谱?
|
|
5分
#31 |
回复30楼: TcpClient 的代码
public class TcpClient : IDisposable
{
// Fields
private bool m_Active;
private bool m_CleanedUp;
private Socket m_ClientSocket;
private NetworkStream m_DataStream;
private AddressFamily m_Family;
// Methods
public TcpClient() : this(AddressFamily.InterNetwork)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "TcpClient", (string) null);
}
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "TcpClient", (string) null);
}
}
public TcpClient(IPEndPoint localEP)
{
this.m_Family = AddressFamily.InterNetwork;
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "TcpClient", localEP);
}
if (localEP == null)
{
throw new ArgumentNullException("localEP");
}
this.m_Family = localEP.AddressFamily;
this.initialize();
this.Client.Bind(localEP);
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "TcpClient", "");
}
}
public TcpClient(AddressFamily family)
{
this.m_Family = AddressFamily.InterNetwork;
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "TcpClient", family);
}
if ((family != AddressFamily.InterNetwork) && (family != AddressFamily.InterNetworkV6))
{
throw new ArgumentException(SR.GetString("net_protocol_invalid_family", new object[] { "TCP" }), "family");
}
this.m_Family = family;
this.initialize();
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "TcpClient", (string) null);
}
}
internal TcpClient(Socket acceptedSocket)
{
this.m_Family = AddressFamily.InterNetwork;
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "TcpClient", acceptedSocket);
}
this.Client = acceptedSocket;
this.m_Active = true;
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "TcpClient", (string) null);
}
}
public TcpClient(string hostname, int port)
{
this.m_Family = AddressFamily.InterNetwork;
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "TcpClient", hostname);
}
if (hostname == null)
{
throw new ArgumentNullException("hostname");
}
if (!ValidationHelper.ValidateTcpPort(port))
{
throw new ArgumentOutOfRangeException("port");
}
try
{
this.Connect(hostname, port);
}
catch (Exception exception)
{
if (((exception is ThreadAbortException) || (exception is StackOverflowException)) || (exception is OutOfMemoryException))
{
throw;
}
if (this.m_ClientSocket != null)
{
this.m_ClientSocket.Close();
}
throw exception;
}
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "TcpClient", (string) null);
}
}
[HostProtection(SecurityAction.LinkDemand, ExternalThreading=true)]
public IAsyncResult BeginConnect(IPAddress address, int port, AsyncCallback requestCallback, object state)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "BeginConnect", address);
}
IAsyncResult result = this.Client.BeginConnect(address, port, requestCallback, state);
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "BeginConnect", (string) null);
}
return result;
}
[HostProtection(SecurityAction.LinkDemand, ExternalThreading=true)]
public IAsyncResult BeginConnect(string host, int port, AsyncCallback requestCallback, object state)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "BeginConnect", host);
}
IAsyncResult result = this.Client.BeginConnect(host, port, requestCallback, state);
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "BeginConnect", (string) null);
}
return result;
}
[HostProtection(SecurityAction.LinkDemand, ExternalThreading=true)]
public IAsyncResult BeginConnect(IPAddress[] addresses, int port, AsyncCallback requestCallback, object state)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "BeginConnect", addresses);
}
IAsyncResult result = this.Client.BeginConnect(addresses, port, requestCallback, state);
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "BeginConnect", (string) null);
}
return result;
}
public void Close()
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "Close", "");
}
((IDisposable) this).Dispose();
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "Close", "");
}
}
public void Connect(IPEndPoint remoteEP)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "Connect", remoteEP);
}
if (this.m_CleanedUp)
{
throw new ObjectDisposedException(base.GetType().FullName);
}
if (remoteEP == null)
{
throw new ArgumentNullException("remoteEP");
}
this.Client.Connect(remoteEP);
this.m_Active = true;
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "Connect", (string) null);
}
}
public void Connect(IPAddress address, int port)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "Connect", address);
}
if (this.m_CleanedUp)
{
throw new ObjectDisposedException(base.GetType().FullName);
}
if (address == null)
{
throw new ArgumentNullException("address");
}
if (!ValidationHelper.ValidateTcpPort(port))
{
throw new ArgumentOutOfRangeException("port");
}
IPEndPoint remoteEP = new IPEndPoint(address, port);
this.Connect(remoteEP);
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "Connect", (string) null);
}
}
别听某些人瞎扯。自己去研究代码。某些人的水平,还只是停留在初级水平。 |
|
#32 |
public void Connect(string hostname, int port)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "Connect", hostname);
}
if (this.m_CleanedUp)
{
throw new ObjectDisposedException(base.GetType().FullName);
}
if (hostname == null)
{
throw new ArgumentNullException("hostname");
}
if (!ValidationHelper.ValidateTcpPort(port))
{
throw new ArgumentOutOfRangeException("port");
}
if (this.m_Active)
{
throw new SocketException(SocketError.IsConnected);
}
IPAddress[] hostAddresses = Dns.GetHostAddresses(hostname);
Exception exception = null;
Socket socket = null;
Socket socket2 = null;
try
{
if (this.m_ClientSocket == null)
{
if (Socket.OSSupportsIPv4)
{
socket2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
}
if (Socket.OSSupportsIPv6)
{
socket = new Socket(AddressFamily.InterNetworkV6, SocketType.Stream, ProtocolType.Tcp);
}
}
foreach (IPAddress address in hostAddresses)
{
try
{
if (this.m_ClientSocket == null)
{
if ((address.AddressFamily == AddressFamily.InterNetwork) && (socket2 != null))
{
socket2.Connect(address, port);
this.m_ClientSocket = socket2;
if (socket != null)
{
socket.Close();
}
}
else if (socket != null)
{
socket.Connect(address, port);
this.m_ClientSocket = socket;
if (socket2 != null)
{
socket2.Close();
}
}
this.m_Family = address.AddressFamily;
this.m_Active = true;
goto Label_01BF;
}
if (address.AddressFamily == this.m_Family)
{
this.Connect(new IPEndPoint(address, port));
this.m_Active = true;
goto Label_01BF;
}
}
catch (Exception exception2)
{
if (((exception2 is ThreadAbortException) || (exception2 is StackOverflowException)) || (exception2 is OutOfMemoryException))
{
throw;
}
exception = exception2;
}
}
}
catch (Exception exception3)
{
if (((exception3 is ThreadAbortException) || (exception3 is StackOverflowException)) || (exception3 is OutOfMemoryException))
{
throw;
}
exception = exception3;
}
finally
{
if (!this.m_Active)
{
if (socket != null)
{
socket.Close();
}
if (socket2 != null)
{
socket2.Close();
}
if (exception != null)
{
throw exception;
}
throw new SocketException(SocketError.NotConnected);
}
}
Label_01BF:
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "Connect", (string) null);
}
}
public void Connect(IPAddress[] ipAddresses, int port)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "Connect", ipAddresses);
}
this.Client.Connect(ipAddresses, port);
this.m_Active = true;
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "Connect", (string) null);
}
}
|
|
#33 |
protected virtual void Dispose(bool disposing)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "Dispose", "");
}
if (this.m_CleanedUp)
{
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "Dispose", "");
}
}
else
{
if (disposing)
{
IDisposable dataStream = this.m_DataStream;
if (dataStream != null)
{
dataStream.Dispose();
}
else
{
Socket client = this.Client;
if (client != null)
{
try
{
client.InternalShutdown(SocketShutdown.Both);
}
finally
{
client.Close();
this.Client = null;
}
}
}
GC.SuppressFinalize(this);
}
this.m_CleanedUp = true;
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "Dispose", "");
}
}
}
public void EndConnect(IAsyncResult asyncResult)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "EndConnect", asyncResult);
}
this.Client.EndConnect(asyncResult);
this.m_Active = true;
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "EndConnect", (string) null);
}
}
~TcpClient()
{
this.Dispose(false);
}
public NetworkStream GetStream()
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "GetStream", "");
}
if (this.m_CleanedUp)
{
throw new ObjectDisposedException(base.GetType().FullName);
}
if (!this.Client.Connected)
{
throw new InvalidOperationException(SR.GetString("net_notconnected"));
}
if (this.m_DataStream == null)
{
this.m_DataStream = new NetworkStream(this.Client, true);
}
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "GetStream", this.m_DataStream);
}
return this.m_DataStream;
}
private void initialize()
{
this.Client = new Socket(this.m_Family, SocketType.Stream, ProtocolType.Tcp);
this.m_Active = false;
}
private int numericOption(SocketOptionLevel optionLevel, SocketOptionName optionName)
{
return (int) this.Client.GetSocketOption(optionLevel, optionName);
}
void IDisposable.Dispose()
{
this.Dispose(true);
}
// Properties
protected bool Active
{
get
{
return this.m_Active;
}
set
{
this.m_Active = value;
}
}
public int Available
{
get
{
return this.m_ClientSocket.Available;
}
}
public Socket Client
{
get
{
return this.m_ClientSocket;
}
set
{
this.m_ClientSocket = value;
}
}
public bool Connected
{
get
{
return this.m_ClientSocket.Connected;
}
}
public bool ExclusiveAddressUse
{
get
{
return this.m_ClientSocket.ExclusiveAddressUse;
}
set
{
this.m_ClientSocket.ExclusiveAddressUse = value;
}
}
public LingerOption LingerState
{
get
{
return (LingerOption) this.Client.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger);
}
set
{
this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger, value);
}
}
public bool NoDelay
{
get
{
if (this.numericOption(SocketOptionLevel.Tcp, SocketOptionName.Debug) == 0)
{
return false;
}
return true;
}
set
{
this.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.Debug, value ? 1 : 0);
}
}
public int ReceiveBufferSize
{
get
{
return this.numericOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer);
}
set
{
this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, value);
}
}
public int ReceiveTimeout
{
get
{
return this.numericOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout);
}
set
{
this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, value);
}
}
public int SendBufferSize
{
get
{
return this.numericOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer);
}
set
{
this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, value);
}
}
public int SendTimeout
{
get
{
return this.numericOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout);
}
set
{
this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, value);
}
}
}
|
|
#34 |
这个贴真舍不得结啊,还希望有更多的人参与讨论就好了。
|