Is this approach better than just shooting stream.Read () in Task.Run?

Edit: I did not do this approach in accordance with a discussion with Stephen Cleary . If you are interested in how I did it differently, look at my answer below.

I am looking for a way to read asynchronously NetworkStreamwith a timeout. Of course, the problem is that there is no way to cancel ReadAsync()on NetworkStream, as it simply ignores it CancellationToken. I read the answer, which suggested closing the stream to revoke the token, but in my case this is not an option, since the Tcp connection should remain open. So I came up with the following code, but I was wondering if it would be better

Task.Run(() => stream.Read(buffer, offset, count)

and just block the thread.

public static class TcpStreamExtension
{
    public static async Task<int> ReadAsyncWithTimeout(this NetworkStream stream, byte[] buffer, int offset, int count)
    {
        CancellationTokenSource cts = new CancellationTokenSource();
        bool keepTrying = true;
        Timer timer = new Timer(stream.ReadTimeout);
        timer.Elapsed += new ElapsedEventHandler((sender, args) => stopTrying(sender, args, cts, out keepTrying));
        timer.Start();

        try
        {
            if (stream.CanRead)
            {
                while (true)
                {
                    if (stream.DataAvailable)
                    {
                        return await stream.ReadAsync(buffer, offset, count, cts.Token).ConfigureAwait(false);
                    }

                    if (keepTrying)
                    {
                        await Task.Delay(300, cts.Token).ConfigureAwait(false);
                    }
                    else
                    {
                        cts.Dispose();
                        timer.Dispose();
                        throw new IOException();
                    }
                }
            }
        }
        catch (TaskCanceledException tce)
        {
            // do nothing
        }
        finally
        {
            cts.Dispose();
            timer.Dispose();
        }
        if (stream.DataAvailable)
        {
            return await stream.ReadAsync(buffer, offset, count).ConfigureAwait(false);
        }

        throw new IOException();
    }

    private static void stopTrying(object sender, ElapsedEventArgs args, CancellationTokenSource cts, out bool keepTrying)
    {
        keepTrying = false;
        cts.Cancel();
    }

}

, , , , , IO. , -

+4
3

, , , - , , TcpClient , - . Task.Run(() => beginReading());, , , , , , await , ,

:

private readonly Queue<byte> bigBuffer = new Queue<byte>();
private readonly SemaphoreSlim _signal = new SemaphoreSlim(0, 1);

// This is called in a Task.Run() 
private async Task beginReading()
{
    byte[] buffer = new byte[1024];

    using (_shutdownToken.Register(() => m_TcpStream.Close()))
    {
        while (!_shutdownToken.IsCancellationRequested)
        {
            try
            {
                int bytesReceived = 0;
                if (m_TcpStream.CanRead)
                {
                    bytesReceived = await m_TcpStream.ReadAsync(buffer, 0, buffer.Length).ConfigureAwait(false);
                }
                else
                {
                    // in case the stream is not working, wait a little bit
                    await Task.Delay(3000, _shutdownToken);
                }

                if (bytesReceived > 0)
                {
                    for (int i = 0; i < bytesReceived; i++)
                    {
                        bigBuffer.Enqueue(buffer[i]);
                    }

                    _signal.Release();
                    Array.Clear(buffer, 0, buffer.Length);

                }
            }
            catch (Exception e)
            {
                LoggingService.Log(e);
            }
        }
    }
}

private async Task<int> ReadAsyncWithTimeout(byte[] buffer, int offset, int count)
{
    int bytesToBeRead = 0;

    if (!m_TcpClient.Connected)
    {
        throw new ObjectDisposedException("Socket is not connected");
    }

    if (bigBuffer.Count > 0)
    {
        bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count;

        for (int i = offset; i < bytesToBeRead; i++)
        {
            buffer[i] = bigBuffer.Dequeue();
        }

        // Clear up the semaphore in case of a race condition where the writer just wrote and then this came in and read it without waiting
        if (_signal.CurrentCount > 0)
            await _signal.WaitAsync(BabelfishConst.TCPIP_READ_TIME_OUT_IN_MS, _shutdownToken).ConfigureAwait(false);

        return bytesToBeRead;
    }

    // In case there is nothing in the Q, wait up to timeout to get data from the writer
    await _signal.WaitAsync(15000, _shutdownToken).ConfigureAwait(false);

    // read again in case the semaphore was signaled by an Enqueue
    if (bigBuffer.Count > 0)
    {
        bytesToBeRead = bigBuffer.Count < count ? bigBuffer.Count : count;

        for (int i = offset; i < bytesToBeRead; i++)
        {
            buffer[i] = bigBuffer.Dequeue();
        }


        return bytesToBeRead;
    }

    // This is because the synchronous NetworkStream Read() method throws this exception when it times out
    throw new IOException();
}
0

-, , , . TCP/IP - , .

, - . . DataAvailable .

...

"" . TCP/IP - . , ReadAsync CancellationToken. , CancellationToken . ReadAsync CancellationToken, . " " - , , .

+3

Task.Delay() -. :

public static async Task<int> ReadAsync(
        NetworkStream stream, byte[] buffer, int offset, int count, int timeoutMillis)
{
        if (timeoutMillis < 0) throw new ArgumentException(nameof(timeoutMillis));
        else if (timeoutMillis == 0)
        {
            // No timeout
            return await stream.ReadAsync(buffer, offset, count);
        }

        var cts = new CancellationTokenSource();
        var readTask = stream.ReadAsync(buffer, offset, count, cts.Token);
        var timerTask = Task.Delay(timeoutMillis, cts.Token);

        var finishedTask = await Task.WhenAny(readTask, timerTask);
        var hasTimeout = ReferenceEquals(timerTask, finishedTask);
        // Cancel the timer which might be still running
        cts.Cancel();
        cts.Dispose();

        if (hasTimeout) throw new TimeoutException();
        // No timeout occured
        return readTask.Result;
}
0

Source: https://habr.com/ru/post/1682869/


All Articles