问题描述:在用TcpClient建立的网络连接中,当某个环节(大多是收数据时)出了问题,进行网络重连并再次收发数据,然后会报错。 public void Read() { netstream = tcpclient.GetStream(); //实例化网络数据流 //int PackageBufferLength = tcpclient.ReceiveBufferSize; //从网络流中读取的字节数 int PackageBufferLength = PackageBuffer.Length; //从网络流中读取的字节数 netstream.BeginRead(PackageBuffer, 0, PackageBufferLength, new AsyncCallback(this.ReadCallBack1), netstream); //异步读取数据 netstream.Flush(); //刷新数据流,保存方法供下一次使用 } private void ReadCallBack1(IAsyncResult ar)//从网络流中读取数据的回调函数 { try { while (true) { if (tcpclient.Connected && netstream.CanRead) //在通讯正常且数据流中有数据时读取 { USIParse usiParse = new USIParse(); if (isDone) //如果数据操作完成,将数据流中的数据存到队列中 { int DataCount = netstream.EndRead(ar); if (DataCount <= 0) { logger.Info("此次网络流中没有数据,只好先返回。"); Thread.Sleep(1); //休眠一下,等待数据的到来 return; } for (int i = 0; i < DataCount; i++) //将缓冲包中的数据存放于队列中 { TaskBuffer.Enqueue(PackageBuffer[i]); } isDone = false; //队列中有了新报文等待处理,所以把标志位置为false. Read(); //再次读取数据,并跳出循环 break; } } } } catch (Exception ex) { logger.Error("读数据这里出错" + ex.ToString()); IsConnection = false; } } |
|
#1 |
路过看看,等SP他们回复了
|
#2 |
|
#3 |
回复2楼: 在接收完数据后等待数据的处理,处理完成后会给一个isDone 的标志位,只有这个标志位为true才能再去接收数据等待处理,防止前一次的没处理完,后面的数据又上来了,用异步是不是显得没有必要? 重连代码 private void checkState() { while (true) { Thread.Sleep(500); if ( IsConnection == false )//IsConnection 网络连接标志位 { try { tcpclient.Close(); //在连接前先关闭tcp连接 if (Readthread.IsAlive) { Readthread.Abort(); logger.Warn("读线程关闭"); } if (Handlethread.IsAlive) { Handlethread.Abort(); logger.Warn("处理线程关闭"); } logger.Warn("网络连接断开..."); tcpclient = new TcpClient(); //重新创建TcpClient实例 tcpclient.Connect(IP, int.Parse(Port.Trim())); if (tcpclient.Connected) { IsConnection = true; logger.Warn("网络重连成功"); Write(StartMsg); //重连成功后发送启动命令 HandleStart(); //发送完毕后开启数据处理线程,其中包括Readthread和Handlethread } } catch { logger.Error("网络重连未成功!"); IsConnection = false; } } } } |
#4 |
回复2楼: 用了这个死循环,是不是也有可能使重连后接收数据陷入死循环? |
5分
#5 |
你用了俩死循环??
重连是一个死循环 接收数据是另一个死循环? 放弃死循环的思路 重连直接写进抛出错误的catch里去,不要没事就一直判断网络是否在连接 |
#6 |
回复5楼: 其实接收数据不是死循环呢,那个死循环只是想随时监测数据是否处理完成,即标志位isDone的变化,当然您可能会建议我用事件触发机制,但那个我不太熟悉,用不好,能不能采取个折中的方式?谢谢您! |
#7 |
回复5楼:
好的,重连我就按照您的建议做了试一下
|
#8 |
理清思路,重连之前先断开
|
#9 |
回复8楼: 重连之后是断了呀 tcpclient.Close(); //在连接前先关闭tcp连接 |
#10 |
回复9楼:
错了,是重连之前后断了
|
#11 |
我算看懂了
你在netstream.BeginRead里,指定回调函数是ReadCallBack1 而在ReadCallBack1里用死循环来判断读取数据是否完成 ReadCallBack1本来就是异步读取的回调事件,既然它执行了,那么说明读取已经完成 |
5分
#12 |
还有,你需要设置netstream.ReadTimeout
否则一直没有接收到数据,它就会一直阻塞,不再继续执行下去 |
#13 |
回复11楼: 那个死循环不是为了判断是否读完成哦,就像您说的,有了回调肯定是读完成了的,其实是在读完之后存到一个队列,另一个数据处理线程也要操作队列,这样我防止线程同时操作队列造成错误,就设置一个标志位isDone==true指示操作线程是否完成,完成了读数据线程开始,那个死循环就是为了及时的检测isDone是否为true,是的话马上读数据,开始下一轮的处理。 |
#14 |
回复12楼:
在这里来总是有收获,您这句提醒了我另一个需要解决问题,谢谢!
|
#15 |
回复13楼: 多线程操作队列,加个lock(对象)就行了,不要写那么复杂的代码 |
#16 |
回复12楼: 有了这句是不是就不用超时那个设置了啊? if (DataCount <= 0) { logger.Info("此次网络流中没有数据,只好先返回。"); Thread.Sleep(1); //休眠一下,等待数据的到来 return; } |
5分
#17 |
你需要定义一个静态变量
static object 线程锁=new object(); 然后 lock(线程锁) 这样一次就只能允许1个线程访问这个代码段,其他线程都要阻塞等待 |
#18 |
回复16楼: 你这一句只是让主线程返回了,异步接收是在线程池里执行的,并不是结束异步接收了啊 |
5分
#19 |
lock的是代码段,但是也要传入对象,是根据对象来判断哪些代码段不允许多线程同时执行的
比如A线程里可以这样 lock(线程锁) { //代码A } B线程里可以这样 lock(线程锁) { //代码B } 即使要锁住的并不是同一个代码块,只要是线程锁对象是同一个,多线程同时访问的时候就只能按顺序来执行 |
#20 |
回复15楼:
恩,好的,那上面那个问题是什么原因导致啊,涉及到的代码我全贴上了
|
#21 |
回复18楼:
哦,这样,那上面这个问题很有可能与这个有关
|
#22 |
回复17楼:
好的好的,您这几句就把用法解释的很到位了,我一定要用一下
|
#23 |
因为你重连之后,tcpclient这个应该是个全局变量吧,你把它指向了新的连接(new了)
那么之前获取的netstream对象指向的对象就被回收了 对象被回收了,绑定的异步方法当然也就无法返回正确的数据了 你要知道,通信过程中,很多情况会引起抛出错误 |
#24 |
回复23楼:
“对象被回收了,绑定的异步方法当然也就无法返回正确的数据了”,那这个问题我该怎么办呢?
“不要不加try而让程序产生致命错误,也不要乱加try”——获益匪浅 |
#25 |
TcpClient 本身问题就很多。不建议使用。
|
5分
#26 |
我以前贴过一段底层代码,你可以从那个修改一下嫁接你的应用层解析机制。http://bbs.csdn.net/topics/390987992
关于 try…catch,上面楼层其实已经说么,我再补充明确分析说明一下: 测试与调试这个阶段,原则是尽量让bug“跳出来”,所以应该以“条件编译”或者System.Diagnostics.ConditionalAttribute来暂时去掉try。因此Debug可执行代码只有很少的try….catch,个别地方你实在找不到用程序去“正规判断”的时候才去可能偶尔看到try语句。 因为这个阶段出现bug崩溃(vs自动进入调试环境)其实是很“好”的事情,测试驱动的节奏告诉你下一步该干什么,而不是把精力都放在欣赏自己的代码上。如果你为你现在伤脑筋这个功能写上5个测试用例(50行代码),然后再写上300行代码来让测试能够通过,然后(以随机产生测试数据方式)跑上顺序10遍再并发20个线程(分别)跑上100遍,这时候你就可以去喝咖啡去了。如果测试跑过了,并且你每一次修改了代码之后都能回归跑过,你的这个通讯程序也就“非常稳定了”,也就才可以考虑Release了。 但是一旦到达最后阶段——Release阶段,产品在表现层(或者是你的框架的最外层)总是有try…catch的。因为有些人为灾难总是超过了测试环境的,例如拔掉网线、断电,这时候就必须写上try…catch。有些人看到别人的代码中有这个,就以为这个不影响测试和调试,实际上是不了解开发的流程,把发布当作开发过程。 |
#27 |
另外,我们要说一个.net上的常识。.net framework的 TcpListener/TcpClient 类是对windows 的IOCP的封装,而 SOCKET类并不是。因此虽然前者中调用了后者,但是不以为着二者的实用性是等价的。
在应用中应该尽量使用前者,尽量使用IOCP。 |
#28 |
回复27楼:
您的意思是尽量使用TcpClient吗?我是觉得它被封装得挺好用的。
|
#29 |
回复26楼:
我要好好领会一下
|
#30 |
回复25楼:
您觉得直接还是用socket靠谱?
|
5分
#31 |
回复30楼: TcpClient 的代码 public class TcpClient : IDisposable { // Fields private bool m_Active; private bool m_CleanedUp; private Socket m_ClientSocket; private NetworkStream m_DataStream; private AddressFamily m_Family; // Methods public TcpClient() : this(AddressFamily.InterNetwork) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "TcpClient", (string) null); } if (Logging.On) { Logging.Exit(Logging.Sockets, this, "TcpClient", (string) null); } } public TcpClient(IPEndPoint localEP) { this.m_Family = AddressFamily.InterNetwork; if (Logging.On) { Logging.Enter(Logging.Sockets, this, "TcpClient", localEP); } if (localEP == null) { throw new ArgumentNullException("localEP"); } this.m_Family = localEP.AddressFamily; this.initialize(); this.Client.Bind(localEP); if (Logging.On) { Logging.Exit(Logging.Sockets, this, "TcpClient", ""); } } public TcpClient(AddressFamily family) { this.m_Family = AddressFamily.InterNetwork; if (Logging.On) { Logging.Enter(Logging.Sockets, this, "TcpClient", family); } if ((family != AddressFamily.InterNetwork) && (family != AddressFamily.InterNetworkV6)) { throw new ArgumentException(SR.GetString("net_protocol_invalid_family", new object[] { "TCP" }), "family"); } this.m_Family = family; this.initialize(); if (Logging.On) { Logging.Exit(Logging.Sockets, this, "TcpClient", (string) null); } } internal TcpClient(Socket acceptedSocket) { this.m_Family = AddressFamily.InterNetwork; if (Logging.On) { Logging.Enter(Logging.Sockets, this, "TcpClient", acceptedSocket); } this.Client = acceptedSocket; this.m_Active = true; if (Logging.On) { Logging.Exit(Logging.Sockets, this, "TcpClient", (string) null); } } public TcpClient(string hostname, int port) { this.m_Family = AddressFamily.InterNetwork; if (Logging.On) { Logging.Enter(Logging.Sockets, this, "TcpClient", hostname); } if (hostname == null) { throw new ArgumentNullException("hostname"); } if (!ValidationHelper.ValidateTcpPort(port)) { throw new ArgumentOutOfRangeException("port"); } try { this.Connect(hostname, port); } catch (Exception exception) { if (((exception is ThreadAbortException) || (exception is StackOverflowException)) || (exception is OutOfMemoryException)) { throw; } if (this.m_ClientSocket != null) { this.m_ClientSocket.Close(); } throw exception; } if (Logging.On) { Logging.Exit(Logging.Sockets, this, "TcpClient", (string) null); } } [HostProtection(SecurityAction.LinkDemand, ExternalThreading=true)] public IAsyncResult BeginConnect(IPAddress address, int port, AsyncCallback requestCallback, object state) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "BeginConnect", address); } IAsyncResult result = this.Client.BeginConnect(address, port, requestCallback, state); if (Logging.On) { Logging.Exit(Logging.Sockets, this, "BeginConnect", (string) null); } return result; } [HostProtection(SecurityAction.LinkDemand, ExternalThreading=true)] public IAsyncResult BeginConnect(string host, int port, AsyncCallback requestCallback, object state) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "BeginConnect", host); } IAsyncResult result = this.Client.BeginConnect(host, port, requestCallback, state); if (Logging.On) { Logging.Exit(Logging.Sockets, this, "BeginConnect", (string) null); } return result; } [HostProtection(SecurityAction.LinkDemand, ExternalThreading=true)] public IAsyncResult BeginConnect(IPAddress[] addresses, int port, AsyncCallback requestCallback, object state) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "BeginConnect", addresses); } IAsyncResult result = this.Client.BeginConnect(addresses, port, requestCallback, state); if (Logging.On) { Logging.Exit(Logging.Sockets, this, "BeginConnect", (string) null); } return result; } public void Close() { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "Close", ""); } ((IDisposable) this).Dispose(); if (Logging.On) { Logging.Exit(Logging.Sockets, this, "Close", ""); } } public void Connect(IPEndPoint remoteEP) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "Connect", remoteEP); } if (this.m_CleanedUp) { throw new ObjectDisposedException(base.GetType().FullName); } if (remoteEP == null) { throw new ArgumentNullException("remoteEP"); } this.Client.Connect(remoteEP); this.m_Active = true; if (Logging.On) { Logging.Exit(Logging.Sockets, this, "Connect", (string) null); } } public void Connect(IPAddress address, int port) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "Connect", address); } if (this.m_CleanedUp) { throw new ObjectDisposedException(base.GetType().FullName); } if (address == null) { throw new ArgumentNullException("address"); } if (!ValidationHelper.ValidateTcpPort(port)) { throw new ArgumentOutOfRangeException("port"); } IPEndPoint remoteEP = new IPEndPoint(address, port); this.Connect(remoteEP); if (Logging.On) { Logging.Exit(Logging.Sockets, this, "Connect", (string) null); } } 别听某些人瞎扯。自己去研究代码。某些人的水平,还只是停留在初级水平。 |
#32 |
public void Connect(string hostname, int port) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "Connect", hostname); } if (this.m_CleanedUp) { throw new ObjectDisposedException(base.GetType().FullName); } if (hostname == null) { throw new ArgumentNullException("hostname"); } if (!ValidationHelper.ValidateTcpPort(port)) { throw new ArgumentOutOfRangeException("port"); } if (this.m_Active) { throw new SocketException(SocketError.IsConnected); } IPAddress[] hostAddresses = Dns.GetHostAddresses(hostname); Exception exception = null; Socket socket = null; Socket socket2 = null; try { if (this.m_ClientSocket == null) { if (Socket.OSSupportsIPv4) { socket2 = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); } if (Socket.OSSupportsIPv6) { socket = new Socket(AddressFamily.InterNetworkV6, SocketType.Stream, ProtocolType.Tcp); } } foreach (IPAddress address in hostAddresses) { try { if (this.m_ClientSocket == null) { if ((address.AddressFamily == AddressFamily.InterNetwork) && (socket2 != null)) { socket2.Connect(address, port); this.m_ClientSocket = socket2; if (socket != null) { socket.Close(); } } else if (socket != null) { socket.Connect(address, port); this.m_ClientSocket = socket; if (socket2 != null) { socket2.Close(); } } this.m_Family = address.AddressFamily; this.m_Active = true; goto Label_01BF; } if (address.AddressFamily == this.m_Family) { this.Connect(new IPEndPoint(address, port)); this.m_Active = true; goto Label_01BF; } } catch (Exception exception2) { if (((exception2 is ThreadAbortException) || (exception2 is StackOverflowException)) || (exception2 is OutOfMemoryException)) { throw; } exception = exception2; } } } catch (Exception exception3) { if (((exception3 is ThreadAbortException) || (exception3 is StackOverflowException)) || (exception3 is OutOfMemoryException)) { throw; } exception = exception3; } finally { if (!this.m_Active) { if (socket != null) { socket.Close(); } if (socket2 != null) { socket2.Close(); } if (exception != null) { throw exception; } throw new SocketException(SocketError.NotConnected); } } Label_01BF: if (Logging.On) { Logging.Exit(Logging.Sockets, this, "Connect", (string) null); } } public void Connect(IPAddress[] ipAddresses, int port) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "Connect", ipAddresses); } this.Client.Connect(ipAddresses, port); this.m_Active = true; if (Logging.On) { Logging.Exit(Logging.Sockets, this, "Connect", (string) null); } } |
#33 |
protected virtual void Dispose(bool disposing) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "Dispose", ""); } if (this.m_CleanedUp) { if (Logging.On) { Logging.Exit(Logging.Sockets, this, "Dispose", ""); } } else { if (disposing) { IDisposable dataStream = this.m_DataStream; if (dataStream != null) { dataStream.Dispose(); } else { Socket client = this.Client; if (client != null) { try { client.InternalShutdown(SocketShutdown.Both); } finally { client.Close(); this.Client = null; } } } GC.SuppressFinalize(this); } this.m_CleanedUp = true; if (Logging.On) { Logging.Exit(Logging.Sockets, this, "Dispose", ""); } } } public void EndConnect(IAsyncResult asyncResult) { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "EndConnect", asyncResult); } this.Client.EndConnect(asyncResult); this.m_Active = true; if (Logging.On) { Logging.Exit(Logging.Sockets, this, "EndConnect", (string) null); } } ~TcpClient() { this.Dispose(false); } public NetworkStream GetStream() { if (Logging.On) { Logging.Enter(Logging.Sockets, this, "GetStream", ""); } if (this.m_CleanedUp) { throw new ObjectDisposedException(base.GetType().FullName); } if (!this.Client.Connected) { throw new InvalidOperationException(SR.GetString("net_notconnected")); } if (this.m_DataStream == null) { this.m_DataStream = new NetworkStream(this.Client, true); } if (Logging.On) { Logging.Exit(Logging.Sockets, this, "GetStream", this.m_DataStream); } return this.m_DataStream; } private void initialize() { this.Client = new Socket(this.m_Family, SocketType.Stream, ProtocolType.Tcp); this.m_Active = false; } private int numericOption(SocketOptionLevel optionLevel, SocketOptionName optionName) { return (int) this.Client.GetSocketOption(optionLevel, optionName); } void IDisposable.Dispose() { this.Dispose(true); } // Properties protected bool Active { get { return this.m_Active; } set { this.m_Active = value; } } public int Available { get { return this.m_ClientSocket.Available; } } public Socket Client { get { return this.m_ClientSocket; } set { this.m_ClientSocket = value; } } public bool Connected { get { return this.m_ClientSocket.Connected; } } public bool ExclusiveAddressUse { get { return this.m_ClientSocket.ExclusiveAddressUse; } set { this.m_ClientSocket.ExclusiveAddressUse = value; } } public LingerOption LingerState { get { return (LingerOption) this.Client.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger); } set { this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger, value); } } public bool NoDelay { get { if (this.numericOption(SocketOptionLevel.Tcp, SocketOptionName.Debug) == 0) { return false; } return true; } set { this.Client.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.Debug, value ? 1 : 0); } } public int ReceiveBufferSize { get { return this.numericOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer); } set { this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, value); } } public int ReceiveTimeout { get { return this.numericOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout); } set { this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, value); } } public int SendBufferSize { get { return this.numericOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer); } set { this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, value); } } public int SendTimeout { get { return this.numericOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout); } set { this.Client.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, value); } } } |
#34 |
这个贴真舍不得结啊,还希望有更多的人参与讨论就好了。
|