You can use the following idea if the criterion for a flush is the number of packets (up to 1000). I did not check. He uses the Stephen Cleary AsyncProducerConsumerQueue<T> provided in this question .
AsyncProducerConsumerQueue<byte[]> _queue; Stream _stream; // producer async Task ReceiveAsync(CancellationToken token) { while (true) { var list = new List<byte>(); while (true) { token.ThrowIfCancellationRequested(token); var packet = await _device.ReadAsync(token); list.Add(packet); if (list.Count == 1000) break; } // push next batch await _queue.EnqueueAsync(list.ToArray(), token); } } // consumer async Task LogAsync(CancellationToken token) { Task previousFlush = Task.FromResult(0); CancellationTokenSource cts = null; while (true) { token.ThrowIfCancellationRequested(token); // get next batch var nextBatch = await _queue.DequeueAsync(token); if (!previousFlush.IsCompleted) { cts.Cancel(); // cancel the previous flush if not ready throw new Exception("failed to flush on time."); } await previousFlush; // it completed, observe for any errors // start flushing cts = CancellationTokenSource.CreateLinkedTokenSource(token); previousFlush = _stream.WriteAsync(nextBatch, 0, nextBatch.Count, cts.Token); } }
If you do not want the registrar to crash, but prefer to cancel the flash and go to the next installment, you can do this with a minimal change to this code.
In response to @ l3arnon's comment:
- The packet is not a byte, but a byte []. 2. You have not used OP ToHexString. 3. AsyncProducerConsumerQueue is much less reliable and tested than the .NET TPL Dataflow. 4. You expect the previousFlush for errors immediately after you throw an exception that makes this line redundant. etc. In short: I think that the possible added value does not justify this very difficult decision.
- "The packet is not bytes, its byte []" - the packet is a byte, this is obvious from the OP code:
buffer[i] = await device.ReadAsync() . Then the packet package byte[] . "You have not used OP ToHexString." - The goal was to show how to use Stream.WriteAsync , which initially accepts a cancellation token, instead of WriteLineAsync , which does not allow cancellation. It is trivial to use ToHexString with Stream.WriteAsync and still use undo support:
var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + Environment.NewLine); _stream.WriteAsync(hexBytes, 0, hexBytes.Length, token);
"AsyncProducerConsumerQueue is much less reliable and proven than .Net TPL Dataflow" - I do not think this is a definite fact. However, if the OP is concerned about this, it can use the usual BlockingCollection , which does not block the producer thread. It is normal to block the consumer flow, waiting for the next batch, because the recording is performed in parallel. In contrast, your version of TPL Dataflow contains one redundant processor and intensive work with locking: moving data from the producer pipeline to pipleline using logAction.Post(packet) , byte by byte. My code does not do this.
"You expect a previousFlush for errors right after you throw an exception that makes this line redundant." - This line is not redundant. Perhaps you are missing this point: previousFlush.IsCompleted can be true when previousFlush.IsFaulted or previousFlush.IsCancelled also true . Thus, await previousFlush is important for observing any errors in completed tasks (for example, write failure) that would otherwise be lost.
source share