Store restores IObservable subscription status in Rx

[this question is in the area of Reactive Extensions (Rx) ]

Subscription to be continued upon application restart

int nValuesBeforeOutput = 123; myStream.Buffer(nValuesBeforeOutput).Subscribe( i => Debug.WriteLine("Something Critical on Every 123rd Value")); 

Now I need to serialize and deserialize the state of this subscription, so that the next time the application is launched, the buffer count does NOT start from zero, but from what the counter had before the application exited ,

  • How could you save the state of IObservable.Subscribe () in this case and then load it?
  • Is there a general solution to keep observer state in Rx?



From the answer to the decision

Based on Paul Batts approach, here is a semi-generalizing implementation that worked in my initial testing

Using

 int nValuesBeforeOutput = 123; var myRecordableStream = myStream.Record(serializer); myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe( i => Debug.WriteLine("Something Critical on Every 123rd Value")); 

Extension Methods

  private static bool _alreadyRecording; public static IObservable<T> Record<T>(this IObservable<T> input, IRepositor repositor) { IObservable<T> output = input; List<T> records = null; if (repositor.Deserialize(ref records)) { ISubject<T> history = new ReplaySubject<T>(); records.ForEach(history.OnNext); output = input.Merge(history); } if (!_alreadyRecording) { _alreadyRecording = true; input.Subscribe(i => repositor.SerializeAppend(new List<T> {i})); } return output; } public static IObservable<T> ClearRecords<T>(this IObservable<T> input, IRepositor repositor) { input.Subscribe(i => repositor.Clear()); return input; } 

Notes

  • This will not work for storing states that depend on time intervals between received values.
  • You need a serializer implementation that supports T serialization
  • _alreadyRecording is required if you subscribe to myRecordableStream more than once
  • _alreadyRecording is static logical, very ugly and does not allow extension methods to be used in more than one place if parallel signatures are needed - re-implementation is required for future use
+6
source share
1 answer

There is no general solution for this, and NonTrivial ™ would be for it. The closest you can do is make myStream a kind of Observable play (i.e., Instead of serializing the state, serializing the state of myStream and re-working to get you back to where you were).

+1
source

Source: https://habr.com/ru/post/912850/


All Articles