How to aggregate data from the manufacturer of asynchronization and write it to a file?

I am learning async / await patterns in C #. I am currently trying to solve a problem like this:

  • There is a manufacturer (hardware device) that generates 1000 packets per second. I need to write this data to a file.

  • The device has only the ReadAsync() method to send one packet at a time.

  • I need to buffer packets and write them in the order in which they are generated in the file, only once per second.

  • The write operation should fail if the write process is not completed on time when the next batch of packets is ready for recording.

So far I have written something like below. It works, but I'm not sure if this is the best way to solve the problem. Any comments or suggestions? What is the best practice to solve this producer / consumer problem when the consumer needs to aggregate data received from the manufacturer?

 static async Task TestLogger(Device device, int seconds) { const int bufLength = 1000; bool firstIteration = true; Task writerTask = null; using (var writer = new StreamWriter("test.log"))) { do { var buffer = new byte[bufLength][]; for (int i = 0; i < bufLength; i++) { buffer[i] = await device.ReadAsync(); } if (!firstIteration) { if (!writerTask.IsCompleted) throw new Exception("Write Time Out!"); } writerTask = Task.Run(() => { foreach (var b in buffer) writer.WriteLine(ToHexString(b)); }); firstIteration = false; } while (--seconds > 0); } } 
+6
source share
3 answers

You can use the following idea if the criterion for a flush is the number of packets (up to 1000). I did not check. He uses the Stephen Cleary AsyncProducerConsumerQueue<T> provided in this question .

 AsyncProducerConsumerQueue<byte[]> _queue; Stream _stream; // producer async Task ReceiveAsync(CancellationToken token) { while (true) { var list = new List<byte>(); while (true) { token.ThrowIfCancellationRequested(token); var packet = await _device.ReadAsync(token); list.Add(packet); if (list.Count == 1000) break; } // push next batch await _queue.EnqueueAsync(list.ToArray(), token); } } // consumer async Task LogAsync(CancellationToken token) { Task previousFlush = Task.FromResult(0); CancellationTokenSource cts = null; while (true) { token.ThrowIfCancellationRequested(token); // get next batch var nextBatch = await _queue.DequeueAsync(token); if (!previousFlush.IsCompleted) { cts.Cancel(); // cancel the previous flush if not ready throw new Exception("failed to flush on time."); } await previousFlush; // it completed, observe for any errors // start flushing cts = CancellationTokenSource.CreateLinkedTokenSource(token); previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token); } } 

If you do not want the registrar to crash, but prefer to cancel the flash and go to the next installment, you can do this with a minimal change to this code.

In response to @ l3arnon's comment:

  • The packet is not a byte, but a byte []. 2. You have not used OP ToHexString. 3. AsyncProducerConsumerQueue is much less reliable and tested than the .NET TPL Dataflow. 4. You expect the previousFlush for errors immediately after you throw an exception that makes this line redundant. etc. In short: I think that the possible added value does not justify this very difficult decision.
  • "The packet is not bytes, its byte []" - the packet is a byte, this is obvious from the OP code: buffer[i] = await device.ReadAsync() . Then the packet package byte[] .
  • "You have not used OP ToHexString." - The goal was to show how to use Stream.WriteAsync , which initially accepts a cancellation token, instead of WriteLineAsync , which does not allow cancellation. It is trivial to use ToHexString with Stream.WriteAsync and still use undo support:

     var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + Environment.NewLine); _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token); 
  • "AsyncProducerConsumerQueue is much less reliable and proven than .Net TPL Dataflow" - I do not think this is a definite fact. However, if the OP is concerned about this, it can use the usual BlockingCollection , which does not block the producer thread. It is normal to block the consumer flow, waiting for the next batch, because the recording is performed in parallel. In contrast, your version of TPL Dataflow contains one redundant processor and intensive work with locking: moving data from the producer pipeline to pipleline using logAction.Post(packet) , byte by byte. My code does not do this.

  • "You expect a previousFlush for errors right after you throw an exception that makes this line redundant." - This line is not redundant. Perhaps you are missing this point: previousFlush.IsCompleted can be true when previousFlush.IsFaulted or previousFlush.IsCancelled also true . Thus, await previousFlush is important for observing any errors in completed tasks (for example, write failure) that would otherwise be lost.

+1
source

The best approach IMHO will have 2 "workers", producer and consumer. The producer reads from the device and simply fills out the list. The consumer wakes up every second and writes the packet to a file.

 List<byte[]> _data = new List<byte[]>(); async Task Producer(Device device) { while (true) { _data.Add(await device.ReadAsync()); } } async Task Consumer(Device device) { using (var writer = new StreamWriter("test.log"))) { while (true) { Stopwatch watch = Stopwatch.StartNew(); var batch = _data; _data = new List<byte[]>(); foreach (var packet in batch) { writer.WriteLine(ToHexString(packet)); if (watch.Elapsed >= TimeSpan.FromSeconds(1)) { throw new Exception("Write Time Out!"); } } await Task.Delay(TimeSpan.FromSeconds(1) - watch.Elapsed); } } } 

while (true) should probably be replaced with a system cancellation token.

+1
source

Assuming you can execute batches (1000) instead of time (1 second), the simplest solution probably uses the TPL Dataflow BatchBlock , which automatically splits the stream of elements by size:

 async Task TestLogger(Device device, int seconds) { var writer = new StreamWriter("test.log"); var batch = new BatchBlock<byte[]>(1000); var logAction = new ActionBlock<byte[]>( packet => { return writer.WriteLineAsync(ToHexString(packet)); }); ActionBlock<byte[]> transferAction; transferAction = new ActionBlock<byte[][]>( bytes => { foreach (var packet in bytes) { if (transferAction.InputCount > 0) { return; // or throw new Exception("Write Time Out!"); } logAction.Post(packet); } } ); batch.LinkTo(transferAction); logAction.Completion.ContinueWith(_ => writer.Dispose()); while (true) { batch.Post(await device.ReadAsync()); } } 
+1
source

Source: https://habr.com/ru/post/970378/


All Articles