Use reactive extension to process multiple responses in the correct order

Situation

I have a system in which one request produces two responses. The request and responses have corresponding observable values:

IObservable<RequestSent> _requests;
IObservable<MainResponseReceived> _mainResponses;
IObservable<SecondResponseReceived> _secondaryResponses;

It is guaranteed that the event RequestSentoccurs earlier than MainResponseReceivedand SecondaryResponseReceived, but the responses arrive randomly.

What i have

I originally wanted a handler that handles both responses, so I zipped up the observables:

_requests
    .SelectMany(async request =>
    {
        var main = _mainResponses.FirstAsync(m => m.Id == request.Id);
        var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id);

        var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived {
            Request = request,
            Main = m, 
            Secondary = s
        });
        return await zippedResponse.FirstAsync(); ;
    })
    .Subscribe(OnMainAndSecondaryResponseReceived);

What I need

Now I need to process also MainResponseReceived, without waiting for SecondaryResponseRecieved, and it must be guaranteed that OnMainResponseRecieved is completed before OnMainAndSecondaryResponseReceivedcalled

How to identify two subscriptions, please?

Test case 1:

  • RequestSent going on
  • MainResponseReceived → OnMainResponseReceived called
  • SecondaryResponseReceive d → OnMainAndSecondaryResponseReceived

2:

  • RequestSent
  • SecondaryResponseReceived
  • MainResponseReceived occurs → OnMainResponseReceived → OnMainAndSecondaryResponseReceived
+4
1

, . Async - .

:

var query =
    _requests
        .SelectMany(request =>
            _mainResponses.Where(m => m.Id == request.Id).Take(1)
                .Do(m => OnMainResponseReceived(m))
                .Zip(
                    _secondaryResponses.Where(s => s.Id == request.Id).Take(1),
                    (m, s) => new MainAndSecondaryResponseReceived()
                    {
                        Request = request,
                        Main = m, 
                        Secondary = s
                    }));

var subscription =
    query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));

.Do(...) - . , OnMainResponseReceived OnMainAndSecondaryResponseReceived , .

:

Subject<RequestSent> _requestsSubject = new Subject<RequestSent>();
Subject<MainResponseReceived> _mainResponsesSubject = new Subject<MainResponseReceived>();
Subject<SecondResponseReceived> _secondaryResponsesSubject = new Subject<SecondResponseReceived>();

IObservable<RequestSent> _requests = _requestsSubject.AsObservable();
IObservable<MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
IObservable<SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();

_requestsSubject.OnNext(new RequestSent() { Id = 42 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 });

_requestsSubject.OnNext(new RequestSent() { Id = 99 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });
+1

Source: https://habr.com/ru/post/1612447/


All Articles