Reactive Extensions - Deserializing a Stream from IObservable <byte []> to Separate Separation Messages without Using a Topic
I take messages downloaded to me from IObservable<byte[]> and deserialize them into strings, which are then pumped through IObservable<string> . A Socket populates IObservable<byte[]> messages with a FromEventPattern transform. Socket deserialized messages are line-delimited strings. Since a single message received from Socket is not necessarily a separate line with delimiters (this can be any part of any number of messages, and partial messages are possible). The first way to solve this that came to mind was with Subject and closure like this:
private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes) { const byte byteLineFeed = 10; var subject = new Subject<string>(); byte[] leftovers = null; bytes.Subscribe(current => { var lastPositionOfLineFeed = -1; for (var i = 0; i < current.Length; i++) { if (current[i] == byteLineFeed) { if (leftovers != null) { subject.OnNext( Encoding.ASCII.GetString( leftovers.Union(current.Slice(lastPositionOfLineFeed + 1, i - lastPositionOfLineFeed)) .ToArray())); leftovers = null; } else { subject.OnNext( Encoding.ASCII.GetString( current.Slice(lastPositionOfLineFeed + 1, i - lastPositionOfLineFeed))); } lastPositionOfLineFeed = i; } } if (lastPositionOfLineFeed != current.Length - 1) { if (leftovers != null) { leftovers = leftovers.Union(current.Slice(lastPositionOfLineFeed + 1, current.Length - lastPositionOfLineFeed - 1)) .ToArray(); } else { leftovers = current.Slice(lastPositionOfLineFeed + 1, current.Length - lastPositionOfLineFeed - 1); } } }); return subject.AsObservable(); } This works well, but I know that using Subjects disapproving for many reasons, some of which are presented in this block of code. It seems to me that I can reinvent the wheel here, since I am not completely familiar with all the methods in Rx. Can I do this without closing and Subject ? If so, how do I do this? Or does the point of using Subject make sense here?
I would use SelectMany with a selector that returns an IEnumerable <string>.
eg:
public static IObservable<string> GetCompleteMessage(this IObservable<byte[]> source) { const byte byteLineFeed = 10; IEnumerable<byte> remanider = Enumerable.Empty<byte>(); Func<byte[], IEnumerable<string>> selector = data => { var result = new List<string>(); var current = new ArraySegment<byte>(data); while (true) { var dividerOffset = ((IList<byte>)current).IndexOf(byteLineFeed); if (dividerOffset == -1) // No newline found { remanider = remanider.Concat(current); break; } var segment = new ArraySegment<byte>(current.Array, current.Offset, dividerOffset); var lineBytes = remanider.Concat(segment).ToArray(); result.Add(Encoding.ASCII.GetString(lineBytes)); remanider = Enumerable.Empty<byte>(); current = new ArraySegment<byte>(current.Array, current.Offset + dividerOffset + 1, current.Count - 1 - dividerOffset); } return result; }; return source.SelectMany(selector); } Alternatively, you can use NetworkStream and StreamReader to achieve the same result:
public static IObservable<string> ReadLineObservable(this TextReader reader) { return Observable.FromAsync(() => reader.ReadLineAsync()) .Repeat() .TakeWhile(x => x != null); } Ignoring the possibly excellent use of TextReader created by the TextReader , if this can be done according to your scenario, I wondered how I would do this using the idiomatic Rx method and came up with the following short query to avoid user-defined operators and objects and all that:
var messages = arrayStream.Select(bytes => bytes.ToObservable()).Concat() .Publish(ps => ps.Where(p => p != 10) .Buffer(() => ps.Where(p => p == 10))) .Select(ls => Encoding.ASCII.GetString(ls.ToArray())); Destruction
I assume that ASCII encoding (as in your question) to suggest that the value of byte 10 is a line separator - with a multibyte encoding scheme this would be naive and would require a more complex framing algorithm (one of the reasons to rely on the infrastructure BCL flow is probably better).
So, assuming the stream of byte arrays IObservable<byte[]> arrayStream , we can smooth the stream IObservable<byte> as follows:
arrayStream.Select(bytes => bytes.ToObservable()).Concat() In this case, Select + Concat , not SelectMany , to ensure that the bytes will be unloaded in strict order. I edited this on a hunch - I did not parse the code to be convenient enough that there is no possibility of a subsequent array overlapping the previous one without this additional protection - and I think it will go down to the scheduler to be used. If you're interested, check out the Rx source here . In any case, it is better to be safe. This is done, we can fill the list of bytes with line separators as follows:
.Pubish(ps => ps.Where(p => p != 10) .Buffer(() => ps.Where(p => p == 10))) Here we publish, because we will subscribe to the stream of bytes twice, so we have to multicast it. We buffer a stream of bytes devoid of linear channels and provide a buffer close function that monitors lines. This emits lists of bytes that have been limited.
Finally, we decode messages with a simple projection:
.Select(ls => Encoding.UTF8.GetString(ls.ToArray())); Here's a complete working example demonstrating some messages whose framing is shared among multiple packets:
var rawMessages = new byte[][] { Encoding.ASCII.GetBytes("This\ni"), Encoding.ASCII.GetBytes("s\na"), Encoding.ASCII.GetBytes("\ntest!\n") }; var arrayStream = new Subject<byte[]>(); var messages = arrayStream.Select(bytes => bytes.ToObservable()).Concat() .Publish(ps => ps.Where(p => p != 10) .Buffer(() => ps.Where(p => p == 10))) .Select(ls => Encoding.ASCII.GetString(ls.ToArray())); messages.Subscribe(b => Console.WriteLine(b)); foreach(var array in rawMessages) { arrayStream.OnNext(array); } Output:
This is a test! Note Alerts
Rx may be idiomatic, but there is something to think about. I ignored the performance here in dropping byte arrays and re-creating them (but then the network is much slower than computing, so this might not be a problem). I assumed that error handling is related to the upstream when creating IObservable<byte[]> , and I did not consider decoding errors - there is no attention to timeouts, network problems, etc. I assumed that the last message has a line postfix.
If you want multiple observers to subscribe to this stream, be sure to multicast via Publish if you need to avoid multiple subscriptions to the basic IObservable<byte[]> .
Here's a complete lab showing how to write a simple test client / host script using Rxx .
Perhaps you are most interested in the ReceiveData method, because it directly solves your question using Rxx Parsers . Note that I am smoothing the byte[] sequence by first using SelectMany to create an IObservable<byte> , which is then passed into a simple binary parser grammar that interrupts it with newlines and project lines in bytes as decrypted lines.
using System; using System.IO; using System.Net; using System.Net.Sockets; using System.Reactive; using System.Reactive.Disposables; using System.Reactive.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using Rxx.Parsers.Reactive.Linq; namespace Rxx.Labs.Forums { public sealed class BinaryParserLab : BaseConsoleLab { private const int port = 30253; private const int bufferSize = 4; private static Encoding encoding = Encoding.ASCII; protected override void Main() { using (new CompositeDisposable(StartHost(), StartClient())) { WaitForKey(); } } private IDisposable StartClient() { return Observable.Using( () => new TcpClient(), client => (from _ in client.ConnectObservable(IPAddress.Loopback, port) from line in Observable.Using(() => client.GetStream(), ReceiveData) select line)) .Subscribe(line => TraceSuccess("Received: {0}", line), () => TraceStatus("Client Completed.")); } private IDisposable StartHost() { return (from client in ObservableTcpListener.Start(IPAddress.Loopback, port, maxConcurrent: 1).Take(1) from _ in Observable.Using( () => client.GetStream(), stream => Observable.Create<Unit>((observer, cancel) => SendData(stream, observer, cancel))) select Unit.Default) .Subscribe(_ => { }, () => TraceStatus("Host Completed.")); } private async Task SendData(NetworkStream stream, IObserver<Unit> observer, CancellationToken cancel) { var data = encoding.GetBytes("Line 1\nLine 2\nLine 3\nLine 4\n"); for (var i = 0; i < data.Length; i += bufferSize) { TraceLine("Sending: {0}", encoding.GetString(data, i, bufferSize).Replace('\n', ' ')); await stream.WriteAsync(data, i, bufferSize, cancel).ConfigureAwait(false); await Task.Delay(TimeSpan.FromSeconds(1), cancel).ConfigureAwait(false); } } private IObservable<string> ReceiveData(NetworkStream stream) { return (from bytes in stream.ReadToEndObservable(bufferSize) from b in bytes select b) .ParseBinary(parser => from next in parser let newLine = parser.Byte.Of((byte)'\n') let line = next.Not(newLine).NoneOrMore() select from bytes in line from _ in newLine from array in bytes.ToArray() select encoding.GetString(array)); } } } Output:
Starting BinaryParser Lab... Sending: Line Sending: 1 L Received: Line 1 Sending: ine Sending: 2 Li Received: Line 2 Sending: ne 3 Sending: Lin Received: Line 3 Sending: e 4 Received: Line 4 Host Completed. Client Completed. BinaryParser Lab Completed. You are right in finding an alternative to use the theme; although there are definitely scenarios where they are extremely useful, this can be done without. But first, notice that the anonymous method passed to Subscription closes over the objects of "local variables" and the "remnants" of local variables, which may lead to poor results when you subscribe to the resulting IObservable<string> twice (for example, both subscriptions will subscribe to the same item, which is probably not the one you want). Whenever I need to transfer some state around several cases, I use the Scan method. The first argument takes the initial state (which should be unchanged, since it is also shared between subscriptions), the second argument is a function that receives your current value and returns a new state, which is then passed to the next value, and therefore to. You get visible from these states, which you then plan (SeletMany). An implementation might look like this:
private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes) { const byte byteLineFeed = 10; return bytes.Scan( new { Leftovers = (byte[])null, Lines = new List<string>(); }, (tuple, current) => { var lastPositionOfLineFeed = -1; var newLeftovers = tuple.Leftovers; var lines = new List<string>(); for (var i = 0; i < current.Length; i++) { if (current[i] == byteLineFeed) { if (tuple.Leftovers != null) { lines.Add(Encoding.ASCII.GetString( tuple.Leftovers.Union(current.Slice(lastPositionOfLineFeed + 1, i - lastPositionOfLineFeed)) .ToArray())); newLeftovers = null; } else { lines.Add(Encoding.ASCII.GetString( current.Slice(lastPositionOfLineFeed + 1, i - lastPositionOfLineFeed))); } lastPositionOfLineFeed = i; } } if (lastPositionOfLineFeed != current.Length - 1) { if (tuple.Leftovers != null) { newLeftovers = tuple.Leftovers.Union(current.Slice(lastPositionOfLineFeed + 1, current.Length - lastPositionOfLineFeed - 1)) .ToArray(); } else { newLeftovers = current.Slice(lastPositionOfLineFeed + 1, current.Length - lastPositionOfLineFeed - 1); } } return new { Leftovers = newLeftovers, Lines = lines, }; }) .SelectMany(tuple => tuple.Lines); } I have not tested this!
This seemed like an easy approach to me:
private IObservable<string> GetCompleteMessage(IObservable<byte[]> bytes) { return Observable.Create<string>(o => { var subject = new Subject<string>(); return new CompositeDisposable( subject.Subscribe(o), bytes .Aggregate( "", (s, bs) => { var parts = (s + Encoding.ASCII.GetString(bs)) .Split(new [] { (char)10 }); foreach (var part in parts.Take(parts.Length - 1)) { subject.OnNext(part); } return parts.Last(); }) .Subscribe(subject)); }); } I tested it with some dummy data and it worked fine for me.
One of the key things you should do to determine this type of operator is to transfer it to the Observable.Create(...) call to make sure that the returned observable can be signed by any number of observers.