[this question is in the area of Reactive Extensions (Rx) ]
Subscription to be continued upon application restart
int nValuesBeforeOutput = 123; myStream.Buffer(nValuesBeforeOutput).Subscribe( i => Debug.WriteLine("Something Critical on Every 123rd Value"));
Now I need to serialize and deserialize the state of this subscription, so that the next time the application is launched, the buffer count does NOT start from zero, but from what the counter had before the application exited ,
- How could you save the state of IObservable.Subscribe () in this case and then load it?
- Is there a general solution to keep observer state in Rx?
From the answer to the decision
Based on Paul Batts approach, here is a semi-generalizing implementation that worked in my initial testing
Using
int nValuesBeforeOutput = 123; var myRecordableStream = myStream.Record(serializer); myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe( i => Debug.WriteLine("Something Critical on Every 123rd Value"));
Extension Methods
private static bool _alreadyRecording; public static IObservable<T> Record<T>(this IObservable<T> input, IRepositor repositor) { IObservable<T> output = input; List<T> records = null; if (repositor.Deserialize(ref records)) { ISubject<T> history = new ReplaySubject<T>(); records.ForEach(history.OnNext); output = input.Merge(history); } if (!_alreadyRecording) { _alreadyRecording = true; input.Subscribe(i => repositor.SerializeAppend(new List<T> {i})); } return output; } public static IObservable<T> ClearRecords<T>(this IObservable<T> input, IRepositor repositor) { input.Subscribe(i => repositor.Clear()); return input; }
Notes
- This will not work for storing states that depend on time intervals between received values.
- You need a serializer implementation that supports T serialization
_alreadyRecording
is required if you subscribe to myRecordableStream
more than once_alreadyRecording
is static logical, very ugly and does not allow extension methods to be used in more than one place if parallel signatures are needed - re-implementation is required for future use
source share