Task-based downtime detection

It is unusual to demand a limit on the interval between certain events and take action if the limit is exceeded. For example, sending a heartbeat between network peers to detect that the other end is alive.

In C # async / await style, you can implement this by replacing the timeout task every time a heart rate arrives:

var client = new TcpClient { ... }; await client.ConnectAsync(...); Task heartbeatLost = new Task.Delay(HEARTBEAT_LOST_THRESHOLD); while (...) { Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length); Task first = await Task.WhenAny(heartbeatLost, readTask); if (first == readTask) { if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) { heartbeatLost = new Task.Delay(HEARTBEAT_LOST_THRESHOLD); } } else if (first == heartbeatLost) { TellUserPeerIsDown(); break; } } 

This is convenient, but each instance of the Task delay has a Timer , and if many heartbeat packets arrive in less time than the threshold value, then many Timer objects load the threadpool. In addition, the completion of each Timer will run the code in threadpool, is there any continuation associated with it or not.

You cannot release the old Timer by calling heartbeatLost.Dispose() ; which will give an exception

InvalidOperationException : A task can only be deleted if it is in a completion state.

You can create a CancellationTokenSource and use it to cancel the old delay task, but it seems suboptimal to create even more objects to accomplish this when the timers themselves have a recalculation function.

What is the best way to integrate time redistribution so that the code can be structured more like this?

 var client = new TcpClient { ... }; await client.ConnectAsync(...); var idleTimeout = new TaskDelayedCompletionSource(HEARTBEAT_LOST_THRESHOLD); Task heartbeatLost = idleTimeout.Task; while (...) { Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length); Task first = await Task.WhenAny(heartbeatLost, readTask); if (first == readTask) { if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) { idleTimeout.ResetDelay(HEARTBEAT_LOST_THRESHOLD); } } else if (first == heartbeatLost) { TellUserPeerIsDown(); break; } } 
+5
source share
1 answer

It seems pretty simple to me, the name of your hypothetical class will bring you most of the way. All you need is a TaskCompletionSource and one timer that you save.

 public class TaskDelayedCompletionSource { private TaskCompletionSource<bool> _completionSource; private readonly System.Threading.Timer _timer; private readonly object _lockObject = new object(); public TaskDelayedCompletionSource(int interval) { _completionSource = CreateCompletionSource(); _timer = new Timer(OnTimerCallback); _timer.Change(interval, Timeout.Infinite); } private static TaskCompletionSource<bool> CreateCompletionSource() { return new TaskCompletionSource<bool>(TaskCreationOptions.DenyChildAttach | TaskCreationOptions.RunContinuationsAsynchronously | TaskCreationOptions.HideScheduler); } private void OnTimerCallback(object state) { //Cache a copy of the completion source before we entier the lock, so we don't complete the wrong source if ResetDelay is in the middle of being called. var completionSource = _completionSource; lock (_lockObject) { completionSource.TrySetResult(true); } } public void ResetDelay(int interval) { lock (_lockObject) { var oldSource = _completionSource; _timer.Change(interval, Timeout.Infinite); _completionSource = CreateCompletionSource(); oldSource.TrySetCanceled(); } } public Task Task => _completionSource.Task; } 

This will create only one timer and update it, the task will end when the timer fires.

You will need to change your code a bit, because a new TaskCompletionSource is created every time you update the final time required to call Task heartbeatLost = idleTimeout.Task; inside a while loop.

 var client = new TcpClient { ... }; await client.ConnectAsync(...); var idleTimeout = new TaskDelayedCompletionSource(HEARTBEAT_LOST_THRESHOLD); while (...) { Task heartbeatLost = idleTimeout.Task; Task<int> readTask = client.ReadAsync(buffer, 0, buffer.Length); Task first = await Task.WhenAny(heartbeatLost, readTask); if (first == readTask) { if (ProcessData(buffer, 0, readTask.Result).HeartbeatFound) { idleTimeout.ResetDelay(HEARTBEAT_LOST_THRESHOLD); } } else if (first == heartbeatLost) { TellUserPeerIsDown(); } } 

EDIT: If you saved the completion source creation object (for example, you program in Game Engine, where the GC collection is a large group), you can add additional logic to OnTimerCallback and ResetDelay to reuse the completion source if the call has not already been completed, and You know for sure that you are not in Reset Delay.

You will likely have to switch from lock to SemaphoreSlim and change the callback to

  private void OnTimerCallback(object state) { if(_semaphore.Wait(0)) { _completionSource.TrySetResult(true); } } 

I can update this answer later to include OnTimerCallback , but I don't have time right now.

+4
source

Source: https://habr.com/ru/post/1265969/


All Articles