Summary
The first thing to do is not to use Observable.FromEvent to avoid referencing the string literal. This version of FromEventPattern will work:
var groupedKeyPresses = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>( h => KeyPress += h, h => KeyPress -= h) .Select(k => k.EventArgs.KeyChar) .GroupBy(k => k);
If you want to make FromEvent work, you can do it like this:
var groupedKeyPresses = Observable.FromEvent<KeyPressEventHandler, KeyPressEventArgs>( handler => { KeyPressEventHandler kpeHandler = (sender, e) => handler(e); return kpeHandler; }, h => KeyPress += h, h => KeyPress -= h) .Select(k => k.KeyChar) .GroupBy(k => k);
Why? This is because there is a FromEvent to work with any type of event delegation.
The first parameter here is the conversion function that connects the event to the Rx subscriber. It accepts the OnNext handler of the observer (a Action<T> ) and returns a handler compatible with the main delegate of the event that this OnNext handler will call. This handler can then be subscribed to the event.
I never liked the official MSDN documentation for this feature , so here is an extended explanation that goes through using this feature in parts.
Suspend on Observable.FromEvent
The following describes why FromEvent exists and how it works:
An overview of how .NET subscriptions work.
Consider how .NET events work. They are implemented as chains of delegates. Standard event delegates follow the delegate void FooHandler(object sender, EventArgs eventArgs) , but in action events can work with any type of delegate (even with return type!). We subscribe to the event by passing the corresponding delegate to a special function that adds it to the delegate chain (usually through the + = operator), or if no handler is signed, the delegate becomes the root of this chain. That is why we should do a zero check when creating an event.
When an event occurs (usually), the delegate chain is called so that each delegate in the chain is called one at a time. To unsubscribe from a .NET event, the delegate is passed to a special function (usually using the - = operator) so that it can be removed from the delegate chain (the chain continues until a matching link is found, and this link is removed from chains).
Let me create a simple but custom .NET implementation. Here I use the less common add / remove syntax to reveal the underlying chain of delegates and allow us to register and unsubscribe. In our custom event, there is a delegate with integer and string parameters, and not with the usual subclass of object sender and EventArgs :
public delegate void BarHandler(int x, string y); public class Foo { private BarHandler delegateChain; public event BarHandler BarEvent { add { delegateChain += value; Console.WriteLine("Event handler added"); } remove { delegateChain -= value; Console.WriteLine("Event handler removed"); } } public void RaiseBar(int x, string y) { var temp = delegateChain; if(temp != null) { delegateChain(x, y); } } }
Overview of Rx Subscriptions
Now let's look at how observed flows work. Subscribing to an observable is formed by calling the Subscribe method and passing an object that implements the IObserver<T> interface, which has the OnNext , OnCompleted and OnError called by the observable to process events. In addition, the Subscribe method returns an IDisposable handle that can be removed to unsubscribe.
More typically, we use convenience extension methods that overload Subscribe . These extensions accept delegate handlers matching OnXXX signatures and transparently create AnonymousObservable<T> , OnXXX methods will call these handlers.
Overcoming .NET and Rx Events
So, how can we create a bridge to extend .NET events in Rx observable threads? The result of calling Observable.FromEvent is to create an IObservable, the Subscribe method acts like a factory that will create this bridge.
The .NET event pattern has no idea of ββcompleted or failed events. Only about the event. In other words, we only need to connect the three aspects of the event, which are displayed in Rx as follows:
- A subscription , for example, a call to
IObservable<T>.Subscribe(SomeIObserver<T>) corresponds to fooInstance.BarEvent += barHandlerInstance . - A call , for example. a call to
barHandlerInstance(int x, string y) maps to SomeObserver.OnNext(T arg) - Unsubscribe , for example. that we save the returned
IDisposable handler from our Subscribe call to a variable called subscription , then the call to subscription.Dispose() displayed in fooInstance.BarEvent -= barHandlerInstance .
Note that this is only the Subscribe call act that creates the subscription. So calling Observable.FromEvent returns factory subscription support, calling and unsubscribing from the main event. Currently there is no subscription to events. Only at the Subscribe dial peer will the Observer be available with the OnNext handler. Therefore, the FromEvent call must accept factory methods that it can use to implement three bridge actions at the appropriate time.
Arguments of type FromEvent
So, now let's look at the correct FromEvent implementation for the above event.
Recall that OnNext handlers accept only one argument. .NET event handlers can have any number of parameters. Therefore, our first decision is to choose one type to represent event calls in the target observable stream.
In fact, it can be any type that you want to display in the target observable stream. This is a conversion function (briefly discussed) to provide the logic for converting an event call into an OnNext call - and there is a lot of freedom to decide how this happens.
Here we map the arguments int x, string y call to BarEvent to a formatted string that describes both values. In other words, we call fooInstance.RaiseBar(1, "a") to call someObserver.OnNext("X:1 Y:a") .
In this example, a very common source of confusion should be mentioned: what are parameters of type FromEvent ? Here, the first type of BarHandler is the source type of the .NET.NET delegate, the second type is the type of the target OnNext argument. Because this second type is often a subclass of EventArgs , it is often suggested that it should be some necessary part of the .NET event delegate. Many people overlook that its relevance is really related to the OnNext handler. So, the first part of our FromEvent call looks like this:
var observableBar = Observable.FromEvent<BarHandler, string>(
Conversion function
Now consider the first FromEvent argument, the so-called conversion function. (Note that some FromEvent overloads omit the conversion function β more on this later.)
The lambda syntax can be truncated quite a bit thanks to type inference, so here you can find a long version:
(Action<string> onNextHandler) => { BarHandler barHandler = (int x, string y) => { onNextHandler("X:" + x + " Y:" + y); }; return barHandler; }
Thus, this conversion function is a factory function that, when called, creates a handler compatible with the main .NET event. The factory function accepts an OnNext delegate. This delegate must be called by the returned handler in response to the handler function called with the basic .NET arguments. The delegate will be called with the result of converting the arguments of the .NET event to an instance of the OnNext parameter OnNext . Therefore, it can be seen from the above example that the factory function will be called with onNextHandler type Action<string> - it should be called with a string value in response to each call to the .NET event. The factory function creates a delegate handler of type BarHandler for the .NET event, which processes event calls by calling onNextHandler with a formatted string created from the arguments of the corresponding event call.
With a small type of output, we can collapse the above code to the following equivalent code:
onNextHandler => (int x, string y) => onNextHandler("X:" + x + " Y:" + y)
Thus, the transform function executes some Subscription logic to provide a function to create the appropriate event handler, and also does the work of binding the .NET event call to the Rx OnNext handler call.
As mentioned earlier, FromEvent overloads FromEvent that omit the conversion function. This is because it is not required if the event delegate is already compatible with the method signature required by OnNext .
Add / Remove Handlers
The other two arguments are addHandler and removeHandler, which are responsible for subscribing and unsubscribing the created delegate handler with a real .NET event. Assuming we have an instance of Foo called Foo , the completed FromEvent call looks like this:
var observableBar = Observable.FromEvent<BarHandler, string>( onNextHandler => (int x, string y) => onNextHandler("X:" + x + " Y:" + y), h => foo.BarEvent += h, h => foo.BarEvent -= h);
We need to decide how the event that we are going to hold will occur - therefore, we provide functions for adding and removing handlers that are waiting for the created conversion handler to be provided. An event is usually captured using a closure, as in the example above, where we close an instance of Foo .
Now we have all the parts for the observable FromEvent to fully implement the subscription, call and unsubscribe.
One more thing ...
There is one last piece of glue. Rx optimizes .NET event subscriptions. In fact, for any given number of monitored subscribers, only one subscription is made to the main .NET event. This is then a multicast connection with Rx subscribers through the Publish mechanism. It is as if Publish().RefCount() was added to the observable.
Consider the following example using the delegate and class defined above:
public static void Main() { var foo = new Foo(); var observableBar = Observable.FromEvent<BarHandler, string>( onNextHandler => (int x, string y) => onNextHandler("X:" + x + " Y:" + y), h => foo.BarEvent += h, h => foo.BarEvent -= h); var xs = observableBar.Subscribe(x => Console.WriteLine("xs: " + x)); foo.RaiseBar(1, "First"); var ys = observableBar.Subscribe(x => Console.WriteLine("ys: " + x)); foo.RaiseBar(1, "Second"); xs.Dispose(); foo.RaiseBar(1, "Third"); ys.Dispose(); }
This produces the following output, showing only one subscription:
Event handler added xs: X:1 Y:First xs: X:1 Y:Second ys: X:1 Y:Second ys: X:1 Y:Third Event handler removed
I help this helps eliminate any lingering confusion about how this complex function works!