Observed stream from the StackExchange Redis Pub subdirectory

Goal:

I am using the StackExchange Redis client. My goal is to create an Observable stream from a Sub Pub Subscriber exposed by the Client, which can then support 1-n Observables subscriptions, each of which has its own filter through LINQ. (The publication works as planned, the problem is only associated with subscribing to the event stream on a particular channel.)

BACKGROUND:

I am using Redis Pub Sub as part of an event-related CQRS application. A specific use case is to publish events for several subscribers, who then update various reading models, send emails, etc.

Each of these subscribers needs to filter the types of events that they process, and for this I am looking to use Rx.Net (Reactive Extensions) with LINQ to provide filtering criteria in the event stream, so that it can only respond effectively to events of interest. Using this approach eliminates the need to register handlers with the implementation of the event bus and allows you to add new projections to the system by deploying 1-n Microservices, each of which has 1-n Observables, subscribed to the event stream, with their own specific filters.

WHAT I EXCLUDE:

1) I created a class that inherits from ObservableBase, overriding the SubscribeCore method, which receives subscription requests from Observables, stores them in ConcurrentDictionary, and when each Redis notification arrives from the channel, it passes through the registered Observable subscribers and calls their OnNext method to RedisValue.

2) I created a theme that also accepts subscriptions from Observables and calls their OnNext method. Again, the use of Subjects seems to be underestimated by many.

RELEASE:

The approaches I tried perform a function (at least superficially) with different levels of performance, but feel like a hack , and that I do not use Rx as intended.

I see a lot of comments about the fact that built-in methods of the Observable should be used where possible, for example, Observable.FromEvent, but this, apparently, cannot be done with the StackExchange Redis client subscription API, at least before my eyes.

I also understand that the preferred method of receiving a stream and forwarding to multiple observers is to use ConnectableObservable , which would seem to be for the very scenario that I am facing (each microservice will internally have 1-n Observed subscribers). At the moment, I cannot figure out how to connect ConnectableObservable to notifications from StackExchange Redis, or if it offers real benefits to Observable .

UPDATE

Although my script is not a problem (deletion removed), error handling is important; for example, isolating errors detected by one subscriber to prevent the completion of all signatures.

+5
source share
1 answer

Here is an extension method that you can use to create an IObservable<RedisValue> from ISubscriber and RedisChannel :

 public static IObservable<RedisValue> WhenMessageReceived(this ISubscriber subscriber, RedisChannel channel) { return Observable.Create<RedisValue>(async (obs, ct) => { await subscriber.SubscribeAsync(channel, (_, message) => { obs.OnNext(message); }).ConfigureAwait(false); return Disposable.Create(() => subscriber.Unsubscribe(channel)); }); } 

Since there will be no completion of Redis channels, the final IObservable will never be completed, however you can unsubscribe from IDisposable to unsubscribe from a Redis channel (this will be done automatically by many Rx operators).

Use may be as follows:

 var subscriber = connectionMultiplexer.GetSubscriber(); var gotMessage = await subscriber.WhenMessageReceived("my_channel") .AnyAsync(msg => msg == "expected_message") .ToTask() .ConfigureAwait(false); 

Or according to your example:

 var subscriber = connectionMultiplexer.GetSubscriber(); var sendEmailEvents = subscriber.WhenMessageReceived("my_channel") .Select(msg => ParseEventFromMessage(msg)) .Where(evt => evt.Type == EventType.SendEmails); await sendEmailEvents.ForEachAsync(evt => { SendEmails(evt); }).ConfigureAwait(false); 

Other microservices can be filtered differently.

+10
source

Source: https://habr.com/ru/post/1260288/


All Articles