Using the F # Event and Asynchrony in Multithreaded Code

I work a lot with asynchronous workflows and agents in F #. While I delved into events, I noticed that the type Event <_> () is not thread safe.

Here I am not talking about the general problem of raising an event. I am actually talking about subscribing and deleting / removing from an event. For testing, I wrote this short program:

let event = Event<int>() let sub = event.Publish [<EntryPoint>] let main argv = let subscribe sub x = async { let mutable disposables = [] for i=0 to x do let dis = Observable.subscribe (fun x -> printf "%d" x) sub disposables <- dis :: disposables for dis in disposables do dis.Dispose() } Async.RunSynchronously(async{ let! x = Async.StartChild (subscribe sub 1000) let! y = Async.StartChild (subscribe sub 1000) do! x do! y event.Trigger 1 do! Async.Sleep 2000 }) 0 

The program is simple. I create an event and a function that subscribes to a certain number of events, and after that I place each handler. I use another asynchronous calculation to spawn two instances of this function with Async.StartChild. After completing both functions, I fire the event to see if there are any other handlers left.

But when event.Trigger(1) as a result, there are still several handlers registered to the event. Somehow "1" will be printed on the console. Typically, this means that subscription and / or deletion are not thread safe.

And this is what I did not expect. If subscription and deletion are not thread safe, how can events be safely used?

Of course, events can also be used outside of threads, and the trigger does not generate any functions in parallel or in different threads. But for me it’s somehow normal that events are used in Async , agent code, or with threads in general. They are often used to exchange information about Backroundworker streams.

With Async.AwaitEvent, you can subscribe to an event. If subscription and deletion are not thread safe, how can events be used in such an environment? What purpose does Async.AwaitEvent have? Whereas an asynchronous workflow is executing a thread, hoping that a simple use of Async.AwaitEvent is mostly “design-disrupted” if subscribing / deleting an event is not thread safe by default.

The general question I am facing is: is it right that subscription and deletion are not thread safe? From my example, it looks like this, but maybe I missed some important details. Currently, I often use events in my design, and I usually have MailboxProcessors and use events for notifications. So the question is. If the events are not thread safe, then the entire project that I am currently using is not thread safe at all. So what to fix in this situation? Creating a whole new thread-oriented implementation of events? Are there already some implementations that face this problem? Or are there other options for using events safely in a multi-threaded environment?

+6
source share
2 answers

FYI; for Event<int> can be found here .

An interesting bit is as follows:

 member e.AddHandler(d) = x.multicast <- (System.Delegate.Combine(x.multicast, d) :?> Handler<'T>) member e.RemoveHandler(d) = x.multicast <- (System.Delegate.Remove(x.multicast, d) :?> Handler<'T>) 

Event signing combines the current event handler with the event handler passed to the subscription. This combined event handler replaces the current one.

The problem from the point of view of concurrency is that here we have a race condition that concurrent subscribers can use the processed current event handler to combine and the "last" that writes the handler callback (the latter is a complicated concept in concurrency these days, but nvm).

What can be done here is to introduce a CAS loop using Interlocked.CompareAndExchange , but this will add overhead to the performance, which disadvantages incompatible users. This is something that could turn off PR and see if it has been approved by the F # community.

WRT to your second question on what to do about it. I can just say what I will do. I would go for the option of creating a version of FSharpEvent that supports secure subscriptions / unsubscriptions. Perhaps base it on FSharpEvent if your company’s FOSS policy allows it. If this proves successful, then he will be able to form a future PR version for the F # kernel.

I do not know your requirements, but it is also possible that if you need coroutines (that is, Async) and not threads, then you can rewrite the program to use only 1 thread, and thus you will not be affected by this race condition.

+4
source

Firstly, thanks to FuleSnable for his answer. He pointed me in the right direction. Based on the information he provided, I myself implemented the ConcurrentEvent type. This type uses Interlocked.CompareExchange to add / remove its handlers, so it is not blocked and, we hope, the fastest way to do this.

I started the implementation by copying the Event type from the F # compiler. (I also leave a comment as is.) The current implementation is as follows:

 type ConcurrentEvent<'T> = val mutable multicast : Handler<'T> new() = { multicast = null } member x.Trigger(arg:'T) = match x.multicast with | null -> () | d -> d.Invoke(null,arg) |> ignore member x.Publish = // Note, we implement each interface explicitly: this works around a bug in the CLR // implementation on CompactFramework 3.7, used on Windows Phone 7 { new obj() with member x.ToString() = "<published event>" interface IEvent<'T> interface IDelegateEvent<Handler<'T>> with member e.AddHandler(d) = let mutable exchanged = false while exchanged = false do System.Threading.Thread.MemoryBarrier() let dels = x.multicast let newDels = System.Delegate.Combine(dels, d) :?> Handler<'T> let result = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels) if obj.ReferenceEquals(dels,result) then exchanged <- true member e.RemoveHandler(d) = let mutable exchanged = false while exchanged = false do System.Threading.Thread.MemoryBarrier() let dels = x.multicast let newDels = System.Delegate.Remove(dels, d) :?> Handler<'T> let result = System.Threading.Interlocked.CompareExchange(&x.multicast, newDels, dels) if obj.ReferenceEquals(dels,result) then exchanged <- true interface System.IObservable<'T> with member e.Subscribe(observer) = let h = new Handler<_>(fun sender args -> observer.OnNext(args)) (e :?> IEvent<_,_>).AddHandler(h) { new System.IDisposable with member x.Dispose() = (e :?> IEvent<_,_>).RemoveHandler(h) } } 

Some design notes:

  • I started with a recursive loop. But by doing this and looking at the compiled code, it creates an anonymous class, and a call to AddHandler or RemoveHandler creates an object of this. Thanks to the direct implementation of the while loop, this avoids instantiating the object each time a new handler is added / removed.
  • I explicitly used obj.ReferenceEquals to avoid common hash equality.

At least in my tests, adding / removing a handler now seems thread-oriented. ConcurrentEvent can simply be swapped with an Event type as needed.


Test, if people are interested in how much slower will ConcurrentEvent compare to Event :

 let stopWatch () = System.Diagnostics.Stopwatch.StartNew() let event = Event<int>() let sub = event.Publish let cevent = ConcurrentEvent<int>() let csub = cevent.Publish let subscribe sub x = async { let mutable disposables = [] for i=0 to x do let dis = Observable.subscribe (fun x -> printf "%d" x) sub disposables <- dis :: disposables for dis in disposables do dis.Dispose() } let sw = stopWatch() Async.RunSynchronously(async{ // Amount of tries let tries = 10000 // benchmarking Event subscribe/unsubscribing let sw = stopWatch() let! x = Async.StartChild (subscribe sub tries) let! y = Async.StartChild (subscribe sub tries) do! x do! y sw.Stop() printfn "Event: %O" sw.Elapsed do! Async.Sleep 1000 event.Trigger 1 do! Async.Sleep 2000 // Benchmarking ConcurrentEvent subscribe/unsubscribing let sw = stopWatch() let! x = Async.StartChild (subscribe csub tries) let! y = Async.StartChild (subscribe csub tries) do! x do! y sw.Stop() printfn "\nConcurrentEvent: %O" sw.Elapsed do! Async.Sleep 1000 cevent.Trigger 1 do! Async.Sleep 2000 }) 

On my system, subscribing / unsubscribing to 10,000 handlers using an unsafe Event takes about 1.4 seconds.

The thread-safe ConcurrentEvent takes about 1.8 seconds. So I think the overhead is pretty low.

+2
source

Source: https://habr.com/ru/post/1241910/


All Articles