RX IObservable as a pipeline

I am currently using the RX Framework to implement a thread-like message processing pipeline. Essentially, I have a message producer (deserializes network messages and calls OnNext () in the subject), and I have several users.

NOTE. If conversion is the extension methods that I have encoded, just return IObservable.

The consumer does something like the following:

var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x) .Where(y => y.Value > 5) .Select(y => y.ComplexObject) .If(z => z.IsPaid, respond(z)) .Do(z => SendError(z)); 

commerceRequest then consumed by another similar pipeline, and this continues to the top where it ends with those who call Subscribe() in the final pipeline. The problem I am facing is that messages from the database are not distributed until the subscription is called directly in the messages.

How can I push messages at the top of the stack? I know this is an unorthodox approach, but I feel that the code is very simple to understand what is happening with the message. Can someone suggest another way to do the same if you think this is an absolutely terrible idea?

+4
source share
1 answer

Why would they go through the pipeline if there are no subscribers? If one of your intermediate steps is useful for their side effects (you want them to run even if there are no other subscribers), you should rewrite the side effect operation as a subscriber.

You can also take a step with a side effect as an end-to-end operation (or tee if you want) if you want to continue the chain.

+1
source

Source: https://habr.com/ru/post/1300966/


All Articles