Why is TcpClient so slow and the processor hungry?

Related to my other question , but now I'm trying async to hope this fixes the problems. This is not true.

I am trying to create a simple SOCKS5 server. I installed a browser (firefox) to use this program as SOCKS5. The idea is that the program connects to the proxy server, provides the necessary information, and the server simply simply reads / writes data from one connection to another. It just does this and does not register or filter anything. It's dead simple, but due to the CPU problem and the fact that it takes a few seconds to connect to the site after you click a few pages, it makes it completely unusable. How, in fact, is this amount of processor? And why does it take a long time to connect to the site? Both asynchronous and synchronous suffer from this

using System; using System.Collections.Concurrent; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Net.Sockets; using System.Timers; using System.IO; using System.Net; using System.Threading; namespace ProxyTest { class Program { static ManualResetEvent tcpClientConnected =new ManualResetEvent(false); static void Main(string[] args) { var s2 = new TcpListener(9998); s2.Start(); Task.Run(() => { while (true) { tcpClientConnected.Reset(); s2.BeginAcceptTcpClient(Blah, s2); tcpClientConnected.WaitOne(); } }); while (true) System.Threading.Thread.Sleep(10000000); } static void Blah(IAsyncResult ar) { try { Console.WriteLine("Connection"); TcpListener listener = (TcpListener)ar.AsyncState; using (var socketin = listener.EndAcceptTcpClient(ar)) { tcpClientConnected.Set(); var ns1 = socketin.GetStream(); var r1 = new BinaryReader(ns1); var w1 = new BinaryWriter(ns1); if (false) { var s3 = new TcpClient(); s3.Connect("127.0.0.1", 9150); var ns3 = s3.GetStream(); var r3 = new BinaryReader(ns3); var w3 = new BinaryWriter(ns3); while (true) { while (ns1.DataAvailable) { var b = ns1.ReadByte(); w3.Write((byte)b); //Console.WriteLine("1: {0}", b); } while (ns3.DataAvailable) { var b = ns3.ReadByte(); w1.Write((byte)b); Console.WriteLine("2: {0}", b); } } } { if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) return; var c = r1.ReadByte(); for (int i = 0; i < c; ++i) r1.ReadByte(); w1.Write((byte)5); w1.Write((byte)0); } { if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) return; if (r1.ReadByte() != 0) return; } byte[] ipAddr = null; string hostname = null; var type = r1.ReadByte(); switch (type) { case 1: ipAddr = r1.ReadBytes(4); break; case 3: hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte())); break; case 4: throw new Exception(); } var nhport = r1.ReadInt16(); var port = IPAddress.NetworkToHostOrder(nhport); var socketout = new TcpClient(); if (hostname != null) socketout.Connect(hostname, port); else socketout.Connect(new IPAddress(ipAddr), port); w1.Write((byte)5); w1.Write((byte)0); w1.Write((byte)0); w1.Write(type); switch (type) { case 1: w1.Write(ipAddr); break; case 2: w1.Write((byte)hostname.Length); w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length); break; } w1.Write(nhport); var buf1 = new byte[4096]; var buf2 = new byte[4096]; var ns2 = socketout.GetStream(); var r2 = new BinaryReader(ns2); var w2 = new BinaryWriter(ns2); Task.Run(() => { var re = new ManualResetEvent(false); while (true) { re.Reset(); ns1.BeginRead(buf1, 0, buf1.Length, ReadCallback, new A() { buf = buf1, thisSocket = socketin, otherSocket = socketout, thisStream = ns1, otherStream = ns2, re=re }); re.WaitOne(); } }); Task.Run(() => { var re = new ManualResetEvent(false); while (true) { re.Reset(); ns2.BeginRead(buf2, 0, buf2.Length, ReadCallback, new A() { buf = buf2, thisSocket = socketout, otherSocket = socketin, thisStream = ns2, otherStream = ns1, re = re }); re.WaitOne(); } }); while (true) { if (socketin.Connected == false) return; Thread.Sleep(100); } } } catch { } } class A { public byte[] buf; public TcpClient thisSocket, otherSocket; public NetworkStream thisStream, otherStream; public ManualResetEvent re;}; static void ReadCallback(IAsyncResult ar) { try { var a = (A)ar.AsyncState; var ns1 = a.thisStream; var len = ns1.EndRead(ar); a.otherStream.Write(a.buf, 0, len); a.re.Set(); } catch { } } } } 
+4
source share
5 answers

Caution: I had to adjust a little, since I do not use 4.5.

Task.Run () β†’ new Thread (). Start ()

You are using too many threads. Just an attempt to load this question into stackoverflow caused the appearance of 30+ threads that reproduce the behavior observed with Task.Run ().

When your code is reduced to one thread per connection, my processor consumes about 0%. Everything loads quickly.

 using System; using System.Collections.Generic; using System.Collections.Concurrent; using System.ComponentModel; using System.Data; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Net.Sockets; using System.Timers; using System.IO; using System.Net; using System.Threading; namespace SOCKS5 { static class Program { static void Main() { var s2 = new TcpListener(9998); s2.Start(); while (true) { if (s2.Pending()) { Thread test = new Thread(() => { using (TcpClient client = s2.AcceptTcpClient()) { Blah(client); } }); test.Start(); } Thread.Sleep(10); } } static void Blah(TcpClient listener) { try { Console.WriteLine("Connection"); //TcpListener listener = (TcpListener)ar.AsyncState; //tcpClientConnected.Set(); var ns1 = listener.GetStream(); var r1 = new BinaryReader(ns1); var w1 = new BinaryWriter(ns1); if (false) { var s3 = new TcpClient(); s3.Connect("127.0.0.1", 9150); var ns3 = s3.GetStream(); var r3 = new BinaryReader(ns3); var w3 = new BinaryWriter(ns3); while (true) { while (ns1.DataAvailable) { var b = ns1.ReadByte(); w3.Write((byte)b); //Console.WriteLine("1: {0}", b); } while (ns3.DataAvailable) { var b = ns3.ReadByte(); w1.Write((byte)b); Console.WriteLine("2: {0}", b); } } } { if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) return; var c = r1.ReadByte(); for (int i = 0; i < c; ++i) r1.ReadByte(); w1.Write((byte)5); w1.Write((byte)0); } { if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) return; if (r1.ReadByte() != 0) return; } byte[] ipAddr = null; string hostname = null; var type = r1.ReadByte(); switch (type) { case 1: ipAddr = r1.ReadBytes(4); break; case 3: hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte())); break; case 4: throw new Exception(); } var nhport = r1.ReadInt16(); var port = IPAddress.NetworkToHostOrder(nhport); var socketout = new TcpClient(); if (hostname != null) socketout.Connect(hostname, port); else socketout.Connect(new IPAddress(ipAddr), port); w1.Write((byte)5); w1.Write((byte)0); w1.Write((byte)0); w1.Write(type); switch (type) { case 1: w1.Write(ipAddr); break; case 2: w1.Write((byte)hostname.Length); w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length); break; } w1.Write(nhport); var buf1 = new byte[4096]; var buf2 = new byte[4096]; var ns2 = socketout.GetStream(); DateTime last = DateTime.Now; while ((DateTime.Now - last).TotalMinutes < 5.0) { if (ns1.DataAvailable) { int size = ns1.Read(buf1, 0, buf1.Length); ns2.Write(buf1, 0, size); last = DateTime.Now; } if (ns2.DataAvailable) { int size = ns2.Read(buf2, 0, buf2.Length); ns1.Write(buf2, 0, size); last = DateTime.Now; } Thread.Sleep(10); } } catch { } finally { try { listener.Close(); } catch (Exception) { } } } } } 

Edit:

This turned out to be a curious entertainment.

After routing Firefox traffic through this for several hours, some observations.

I never noticed a regular pattern to determine when to close connections. If threads terminate after they have been idle for 5 minutes (no rx / tx), the number of threads remains fairly low. This is a fairly secure binding that allows services, such as gmail chat, to function.

For some reason, the program sometimes will not receive requests from the browser that will report a timeout. There is no notification of a missed request in the program, nothing. Visible only when viewing stackoverflow. Still do not understand that one of them.

+5
source

Here are a few things going on!

Asynchronous calls are called synchronous style . Like in, the thread that starts the operation calls WaitOne - this is basically just the equivalent of a synchronous call, no different.

Sleeping cycles are bad. The sleep (1) loop will respond quickly, but use some processor, the sleep (1000) loop will respond slowly, but use less CPU. Having a dozen threads in the sleep loop does not use much CPU, but if the number of threads increases, CPU usage will become significant. It is best to use asynchronous calls instead of polling.

Many tasks that perform loops . Without guaranteed exit paths, this will lead to an increase in the number of threads.

If you are transferring data from socket A to socket B, you need to act when one of the sockets is closed: stop forwarding, make sure that pending writes complete and close sockets.

The current implementation does not adequately ensure that the forwarding tasks are closed upon closing, and the method of starting the task and then blocking the manual reset event may fail if the task receives an exception before the event is set. Both cases leave the task unlimited.

Checking Socket.Connected seems obvious, but in practice it’s just a cache related to disabling the last I / O operation. I prefer to act with "zero recv", which is your first shutdown notice.

I knocked out a fast asynchronous version of your original synchronous procedure using PowerThreading via NuGet (this is the way to execute asynchronous routines before the 4.5 framework). This works using a TcpListener with zero CPU usage and a very low thread count.

This can be done in vanilla C # using async / await ... I don't know yet :)

 using System; using System.Collections.Generic; using System.Text; namespace AeProxy { using System.IO; using System.Net; using System.Net.Sockets; using System.Threading; // Need to install Wintellect.Threading via NuGet for this: using Wintellect.Threading.AsyncProgModel; class Program { static void Main(string[] args) { var ae = new AsyncEnumerator() {SyncContext = null}; var mainOp = ae.BeginExecute(ListenerFiber(ae), null, null); // block until main server is finished ae.EndExecute(mainOp); } static IEnumerator<int> ListenerFiber(AsyncEnumerator ae) { var listeningServer = new TcpListener(IPAddress.Loopback, 9998); listeningServer.Start(); while (!ae.IsCanceled()) { listeningServer.BeginAcceptTcpClient(ae.End(0, listeningServer.EndAcceptTcpClient), null); yield return 1; if (ae.IsCanceled()) yield break; var clientSocket = listeningServer.EndAcceptTcpClient(ae.DequeueAsyncResult()); var clientAe = new AsyncEnumerator() { SyncContext = null }; clientAe.BeginExecute( ClientFiber(clientAe, clientSocket), ar => { try { clientAe.EndExecute(ar); } catch { } }, null); } } static long clients = 0; static IEnumerator<int> ClientFiber(AsyncEnumerator ae, TcpClient clientSocket) { Console.WriteLine("ClientFibers ++{0}", Interlocked.Increment(ref clients)); try { // original code to do handshaking and connect to remote host var ns1 = clientSocket.GetStream(); var r1 = new BinaryReader(ns1); var w1 = new BinaryWriter(ns1); if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break; var c = r1.ReadByte(); for (int i = 0; i < c; ++i) r1.ReadByte(); w1.Write((byte)5); w1.Write((byte)0); if (!(r1.ReadByte() == 5 && r1.ReadByte() == 1)) yield break; if (r1.ReadByte() != 0) yield break; byte[] ipAddr = null; string hostname = null; var type = r1.ReadByte(); switch (type) { case 1: ipAddr = r1.ReadBytes(4); break; case 3: hostname = Encoding.ASCII.GetString(r1.ReadBytes(r1.ReadByte())); break; case 4: throw new Exception(); } var nhport = r1.ReadInt16(); var port = IPAddress.NetworkToHostOrder(nhport); var socketout = new TcpClient(); if (hostname != null) socketout.Connect(hostname, port); else socketout.Connect(new IPAddress(ipAddr), port); w1.Write((byte)5); w1.Write((byte)0); w1.Write((byte)0); w1.Write(type); switch (type) { case 1: w1.Write(ipAddr); break; case 3: w1.Write((byte)hostname.Length); w1.Write(Encoding.ASCII.GetBytes(hostname), 0, hostname.Length); break; } w1.Write(nhport); using (var ns2 = socketout.GetStream()) { var forwardAe = new AsyncEnumerator() { SyncContext = null }; forwardAe.BeginExecute( ForwardingFiber(forwardAe, ns1, ns2), ae.EndVoid(0, forwardAe.EndExecute), null); yield return 1; if (ae.IsCanceled()) yield break; forwardAe.EndExecute(ae.DequeueAsyncResult()); } } finally { Console.WriteLine("ClientFibers --{0}", Interlocked.Decrement(ref clients)); } } private enum Operation { OutboundWrite, OutboundRead, InboundRead, InboundWrite } const int bufsize = 4096; static IEnumerator<int> ForwardingFiber(AsyncEnumerator ae, NetworkStream inputStream, NetworkStream outputStream) { while (!ae.IsCanceled()) { byte[] outputRead = new byte[bufsize], outputWrite = new byte[bufsize]; byte[] inputRead = new byte[bufsize], inputWrite = new byte[bufsize]; // start off output and input reads. // NB ObjectDisposedExceptions can be raised here when a socket is closed while an async read is in progress. outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead); inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead); var pendingops = 2; while (!ae.IsCanceled()) { // wait for the next operation to complete, the state object passed to each async // call can be used to find out what completed. if (pendingops == 0) yield break; yield return 1; if (!ae.IsCanceled()) { int byteCount; var latestEvent = ae.DequeueAsyncResult(); var currentOp = (Operation)latestEvent.AsyncState; if (currentOp == Operation.InboundRead) { byteCount = inputStream.EndRead(latestEvent); if (byteCount == 0) { pendingops--; outputStream.Close(); continue; } Array.Copy(inputRead, outputWrite, byteCount); outputStream.BeginWrite(outputWrite, 0, byteCount, ae.EndVoid(1, outputStream.EndWrite), Operation.OutboundWrite); inputStream.BeginRead(inputRead, 0, bufsize, ae.End(1, ar => inputStream.EndRead(ar)), Operation.InboundRead); } else if (currentOp == Operation.OutboundRead) { byteCount = outputStream.EndRead(latestEvent); if (byteCount == 0) { pendingops--; inputStream.Close(); continue; } Array.Copy(outputRead, inputWrite, byteCount); inputStream.BeginWrite(inputWrite, 0, byteCount, ae.EndVoid(1, inputStream.EndWrite), Operation.InboundWrite); outputStream.BeginRead(outputRead, 0, bufsize, ae.End(1, ar => outputStream.EndRead(ar)), Operation.OutboundRead); } else if (currentOp == Operation.InboundWrite) { inputStream.EndWrite(latestEvent); } else if (currentOp == Operation.OutboundWrite) { outputStream.EndWrite(latestEvent); } } } } } } } 
+5
source

On this line ...

  while (true) System.Threading.Thread.Sleep(10000000); 

It is not better to replace it with a simple one:

 Console.ReadKey(); 

- the only thing I see in the processor.

In addition, as a suggestion, you should limit the number of incoming connections and use the thread pool template (in the queue or something else).

0
source

You should take a look at Overlapped I / O. One thread per connection may work fine, but overall it's bad.

0
source

You should use asynchronous versions of the TcpClient methods instead of spawning threads.

0
source

Source: https://habr.com/ru/post/1492553/


All Articles