I am currently using the RX Framework to implement a thread-like message processing pipeline. Essentially, I have a message producer (deserializes network messages and calls OnNext () in the subject), and I have several users.
NOTE. If conversion is the extension methods that I have encoded, just return IObservable.
The consumer does something like the following:
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x) .Where(y => y.Value > 5) .Select(y => y.ComplexObject) .If(z => z.IsPaid, respond(z)) .Do(z => SendError(z));
commerceRequest then consumed by another similar pipeline, and this continues to the top where it ends with those who call Subscribe() in the final pipeline. The problem I am facing is that messages from the database are not distributed until the subscription is called directly in the messages.
How can I push messages at the top of the stack? I know this is an unorthodox approach, but I feel that the code is very simple to understand what is happening with the message. Can someone suggest another way to do the same if you think this is an absolutely terrible idea?
source share