Asynchronous version of Monitor.Pulse / Wait

I am trying to optimize the asynchronous version of something similar (in the basic functionality) to the Monitor.Wait and Monitor.Pulse methods. The idea is to use this async method.

Requirements: 1) I have one task that performs that it is responsible for someone to pulse my monitor. 2) This task can calculate complex (i.e., laborious) work. Meanwhile, the pulse method could be called several times without doing anything (since the main task is already doing some processing). 3) As soon as the main task is over, he will again begin to wait until another impulse appears.

In the worst case scenario - Wait> Pulse> Wait> Pulse> Wait ... but usually I have tens / hundreds of pulses for each wait.

So, I have the following class (it works, but I think it can be slightly optimized depending on my requirements)

internal sealed class Awaiter { private readonly ConcurrentQueue<TaskCompletionSource<byte>> _waiting = new ConcurrentQueue<TaskCompletionSource<byte>>(); public void Pulse() { TaskCompletionSource<byte> tcs; if (_waiting.TryDequeue(out tcs)) { tcs.TrySetResult(1); } } public Task Wait() { TaskCompletionSource<byte> tcs; if (_waiting.TryPeek(out tcs)) { return tcs.Task; } tcs = new TaskCompletionSource<byte>(); _waiting.Enqueue(tcs); return tcs.Task; } } 

The problem with the above class is the luggage, which I use only for synchronization. Since I will expect from one and only one thread, there is no need to have a ConcurrentQueue, since I always have only one element.

So, I simplified it a bit and wrote the following:

 internal sealed class Awaiter2 { private readonly object _mutex = new object(); private TaskCompletionSource<byte> _waiting; public void Pulse() { var w = _waiting; if (w == null) { return; } lock (_mutex) { w = _waiting; if (w == null) { return; } _waiting = null; w.TrySetResult(1); } } public Task Wait() { var w = _waiting; if (w != null) { return w.Task; } lock (_mutex) { w = _waiting; if (w != null) { return w.Task; } w = _waiting = new TaskCompletionSource<byte>(); return w.Task; } } } 

This new version also works fine, but I still think it can be optimized a bit more by removing locks.

I am looking for suggestions on how I can optimize the second version. Any ideas?

+5
source share
2 answers

Since you have only one task ever to wait, your function can be simplified to

 internal sealed class Awaiter3 { private volatile TaskCompletionSource<byte> _waiting; public void Pulse() { var w = _waiting; if (w == null) { return; } _waiting = null; #if NET_46_OR_GREATER w.TrySetResult(1); #else Task.Run(() => w.TrySetResult(1)); #endif } //This method is not thread safe and can only be called by one thread at a time. // To make it thread safe put a lock around the null check and the assignment, // you do not need to have a lock on Pulse, "volatile" takes care of that side. public Task Wait() { if(_waiting != null) throw new InvalidOperationException("Only one waiter is allowed to exist at a time!"); #if NET_46_OR_GREATER _waiting = new TaskCompletionSource<byte>(TaskCreationOptions.RunContinuationsAsynchronously); #else _waiting = new TaskCompletionSource<byte>(); #endif return _waiting.Task; } } 

One of my actions has changed. If you are using .NET 4.6 or later, use the code in #if NET_46_OR_GREATER blocks if it uses else blocks. When you call TrySetResult , you can continue the synchronous continuation, this can cause Pulse() take a lot of time. Using TaskCreationOptions.RunContinuationsAsynchronously in .NET 4.6 or by wrapping TrySetResult in Task.Run for pre 4.6, make sure Puse() not blocked by the continuation of the task.

See the SO question Detecting the version of the target environment at compile time on how to make the NET_46_OR_GREATER definition that works in your code.

+1
source

If you don't need a Wait() call to return Task , but content with await Wait() , you can implement custom awaiter / awaitable.

See this link for an overview of the wait pattern used by the compiler.

When implementing custom expectations, you will simply be dealing with delegates, and the actual โ€œwaitingโ€ is yours. When you want to โ€œwaitโ€ for a condition, you can often keep a list of pending continuations, and whenever the condition is met, you can call these continuations. You just need to deal with synchronization, based on the fact that await can be called from arbitrary threads. If you know that you only ever await from one thread (say, a UI thread), then you don't need synchronization at all!

I will try to give you the ability to block, but does not guarantee that this is correct. If you do not understand why all the conditions of the race are safe, you should not use it and implement the async / await protocol using lock statements or other methods that you know how to debug.

 public sealed class AsyncMonitor { private PulseAwaitable _currentWaiter; public AsyncMonitor() { _currentWaiter = new PulseAwaitable(); } public void Pulse() { // Optimize for the case when calling Pulse() when nobody is waiting. // // This has an inherent race condition when calling Pulse() and Wait() // at the same time. The question this was written for did not specify // how to resolve this, so it is a valid answer to tolerate either // result and just allow the race condition. // if (_currentWaiter.HasWaitingContinuations) Interlocked.Exchange(ref _currentWaiter, new PulseAwaitable()).Complete(); } public PulseAwaitable Wait() { return _currentWaiter; } } // This class maintains a list of waiting continuations to be executed when // the owning AsyncMonitor is pulsed. public sealed class PulseAwaitable : INotifyCompletion { // List of pending 'await' delegates. private Action _pendingContinuations; // Flag whether we have been pulsed. This is the primary variable // around which we build the lock free synchronization. private int _pulsed; // AsyncMonitor creates instances as required. internal PulseAwaitable() { } // This check has a race condition which is tolerated. // It is used to optimize for cases when the PulseAwaitable has no waiters. internal bool HasWaitingContinuations { get { return Volatile.Read(ref _pendingContinuations) != null; } } // Called by the AsyncMonitor when it is pulsed. internal void Complete() { // Set pulsed flag first because that is the variable around which // we build the lock free protocol. Everything else this method does // is free to have race conditions. Interlocked.Exchange(ref _pulsed, 1); // Execute pending continuations. This is free to race with calls // of OnCompleted seeing the pulsed flag first. Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke(); } #region Awaitable // There is no need to separate the awaiter from the awaitable // so we use one class to implement both parts of the protocol. public PulseAwaitable GetAwaiter() { return this; } #endregion #region Awaiter public bool IsCompleted { // The return value of this property does not need to be up to date so we could omit the 'Volatile.Read' if we wanted to. // What is not allowed is returning "true" even if we are not completed, but this cannot happen since we never transist back to incompleted. get { return Volatile.Read(ref _pulsed) == 1; } } public void OnCompleted(Action continuation) { // Protected against manual invocations. The compiler-generated code never passes null so you can remove this check in release builds if you want to. if (continuation == null) throw new ArgumentNullException(nameof(continuation)); // Standard pattern of maintaining a lock free immutable variable: read-modify-write cycle. // See for example here: https://blogs.msdn.microsoft.com/oldnewthing/20140516-00/?p=973 // Again the 'Volatile.Read' is not really needed since outdated values will be detected at the first iteration. var oldContinuations = Volatile.Read(ref _pendingContinuations); for (;;) { var newContinuations = (oldContinuations + continuation); var actualContinuations = Interlocked.CompareExchange(ref _pendingContinuations, newContinuations, oldContinuations); if (actualContinuations == oldContinuations) break; oldContinuations = actualContinuations; } // Now comes the interesting part where the actual lock free synchronization happens. // If we are completed then somebody needs to clean up remaining continuations. // This happens last so the first part of the method can race with pulsing us. if (IsCompleted) Interlocked.Exchange(ref _pendingContinuations, null)?.Invoke(); } public void GetResult() { // This is just to check against manual calls. The compiler will never call this when IsCompleted is false. // (Assuming your OnCompleted implementation is bug-free and you don't execute continuations before IsCompleted becomes true.) if (!IsCompleted) throw new NotSupportedException("Synchronous waits are not supported. Use 'await' or OnCompleted to wait asynchronously"); } #endregion } 

Usually, you donโ€™t worry about which thread executions continue in, because if they are asynchronous methods, the compiler has already inserted the code (to continue) in order to switch back to the desired thread, you do not need to do this manually in each expected implementation.

[edit]

As a starting point for how you can implement a lock implementation, I will describe it using the lock statement. It should be easily replaced with a spin lock or some other locking technique. Using the structure as expected, it even has the advantage that it does not make additional selection except the original object. (Of course, in the compiler layout on the call side there are allocations in the async / await structure, but you cannot get rid of them.)

Please note that the iteration counter will only increase for each Wait + Pulse pair and will eventually go negative, but this is normal. We just need to reduce the time it takes to continue beeing until it can call GetResult. 4 billion pairs of waiting + pulses should be enough time for any waiting continuations to call the GetResult method. If you do not want this risk, you can use long or Guid for a more unique iteration counter, but IMHO int is good for almost all scenarios.

 public sealed class AsyncMonitor { public struct Awaitable : INotifyCompletion { // We use a struct to avoid allocations. Note that this means the compiler will copy // the struct around in the calling code when doing 'await', so for your own debugging // sanity make all variables readonly. private readonly AsyncMonitor _monitor; private readonly int _iteration; public Awaitable(AsyncMonitor monitor) { lock (monitor) { _monitor = monitor; _iteration = monitor._iteration; } } public Awaitable GetAwaiter() { return this; } public bool IsCompleted { get { // We use the iteration counter as an indicator when we should be complete. lock (_monitor) { return _monitor._iteration != _iteration; } } } public void OnCompleted(Action continuation) { // The compiler never passes null, but someone may call it manually. if (continuation == null) throw new ArgumentNullException(nameof(continuation)); lock (_monitor) { // Not calling IsCompleted since we already have a lock. if (_monitor._iteration == _iteration) { _monitor._waiting += continuation; // null the continuation to indicate the following code // that we completed and don't want it executed. continuation = null; } } // If we were already completed then we didn't null the continuation. // (We should invoke the continuation outside of the lock because it // may want to Wait/Pulse again and we want to avoid reentrancy issues.) continuation?.Invoke(); } public void GetResult() { lock (_monitor) { // Not calling IsCompleted since we already have a lock. if (_monitor._iteration == _iteration) throw new NotSupportedException("Synchronous wait is not supported. Use await or OnCompleted."); } } } private Action _waiting; private int _iteration; public AsyncMonitor() { } public void Pulse(bool executeAsync) { Action execute = null; lock (this) { // If nobody is waiting we don't need to increment the iteration counter. if (_waiting != null) { _iteration++; execute = _waiting; _waiting = null; } } // Important: execute the callbacks outside the lock because they might Pulse or Wait again. if (execute != null) { // If the caller doesn't want inlined execution (maybe he holds a lock) // then execute it on the thread pool. if (executeAsync) Task.Run(execute); else execute(); } } public Awaitable Wait() { return new Awaitable(this); } } 
+2
source

Source: https://habr.com/ru/post/1240624/


All Articles