Goal:
I am using the StackExchange Redis client. My goal is to create an Observable stream from a Sub Pub Subscriber exposed by the Client, which can then support 1-n Observables subscriptions, each of which has its own filter through LINQ. (The publication works as planned, the problem is only associated with subscribing to the event stream on a particular channel.)
BACKGROUND:
I am using Redis Pub Sub as part of an event-related CQRS application. A specific use case is to publish events for several subscribers, who then update various reading models, send emails, etc.
Each of these subscribers needs to filter the types of events that they process, and for this I am looking to use Rx.Net (Reactive Extensions) with LINQ to provide filtering criteria in the event stream, so that it can only respond effectively to events of interest. Using this approach eliminates the need to register handlers with the implementation of the event bus and allows you to add new projections to the system by deploying 1-n Microservices, each of which has 1-n Observables, subscribed to the event stream, with their own specific filters.
WHAT I EXCLUDE:
1) I created a class that inherits from ObservableBase, overriding the SubscribeCore method, which receives subscription requests from Observables, stores them in ConcurrentDictionary, and when each Redis notification arrives from the channel, it passes through the registered Observable subscribers and calls their OnNext method to RedisValue.
2) I created a theme that also accepts subscriptions from Observables and calls their OnNext method. Again, the use of Subjects seems to be underestimated by many.
RELEASE:
The approaches I tried perform a function (at least superficially) with different levels of performance, but feel like a hack , and that I do not use Rx as intended.
I see a lot of comments about the fact that built-in methods of the Observable should be used where possible, for example, Observable.FromEvent, but this, apparently, cannot be done with the StackExchange Redis client subscription API, at least before my eyes.
I also understand that the preferred method of receiving a stream and forwarding to multiple observers is to use ConnectableObservable , which would seem to be for the very scenario that I am facing (each microservice will internally have 1-n Observed subscribers). At the moment, I cannot figure out how to connect ConnectableObservable to notifications from StackExchange Redis, or if it offers real benefits to Observable .
UPDATE
Although my script is not a problem (deletion removed), error handling is important; for example, isolating errors detected by one subscriber to prevent the completion of all signatures.