Write to open FileStream using reactive programming

I am writing a small logger, and I want to open the log file once, continue to write reactively when log messages arrive, and dispose of everything at the end of the program.

I'm not sure how I can leave FileStream open and write messages reactively as they arrive.

I would like to update the design from my previous solution, where I had a ConcurrentQueue acting as a buffer, and a loop inside the statements usingthat consumed the queue.

In particular, I want to use the operator construct at the same time using, so I don’t need to explicitly close the stream and record, as well as the reactive, loopback programming style. Currently, I only know how to use one of these constructs at once: either a combination using/ loop or a combination of an explicit thread-close / reaction.

Here is my code:

    BufferBlock<LogEntry> _buffer = new BufferBlock<LogEntry>();


    // CONSTRUCTOR
    public DefaultLogger(string folder)
    {
        var filePath = Path.Combine(folder, $"{DateTime.Now.ToString("yyyy.MM.dd")}.log");

        _cancellation = new CancellationTokenSource();

        var observable = _buffer.AsObservable();

        using (var stream = File.Create(_filePath))
        using (var writer = new StreamWriter(stream))
        using (var subscription = observable.Subscribe(entry =>
                                    writer.Write(GetFormattedString(entry))))
        {
            while (!_cancellation.IsCancellationRequested)
            {
                // what do I do here?
            }
        }
    }
+4
source share
1 answer

You need to use Observable.Using. It is intended to create a resource IDisposblethat is deleted when the sequence ends.

Try something like this:

IDisposable subscription = 
    Observable.Using(() => File.Create(_filePath),
        stream => Observable.Using(() => new StreamWriter(stream),
            writer => _buffer.AsObservable().Select(entry => new { entry, writer })))
        .Subscribe(x => x.writer.Write(GetFormattedString(x.entry)));
+3
source

Source: https://habr.com/ru/post/1685043/


All Articles