Planning an Asynchronous TPL Data Flow

Scheduling async Task does not work as I expected in a TPL data stream. In the example below, I expected ActionBlock process data from TransformBlock as soon as it is available. But he waits for the second (delayed) result before he moves on to the third. What am I misunderstood here? Is there any requirement for processing order?

 public class TestDataFlow { public System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch(); public async Task Flow() { watch.Start(); var plus10 = new TransformBlock<int, int>(async input => { if (input == 2) { await Task.Delay(5000); } Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed); return input + 10; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, }); var printSolution = new ActionBlock<int>(input => { Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, }); plus10.LinkTo(printSolution); List<int> inputs = new List<int> { 1, 2, 3 }; foreach (var input in inputs) { await plus10.SendAsync(input); } } } 

Conclusion:

 Exiting plus10 for input 1 @ 115.8583 Exiting plus10 for input 3 @ 116.6973 Solution: 11 @ 126.0146 Exiting plus10 for input 2 @ 5124.4074 Solution: 12 @ 5124.9014 Solution: 13 @ 5126.4834 
+5
source share
2 answers

TPL Dataflow guarantees the order of the input and output queues, regardless of how many elements are processed in parallel.

"Because each predefined block type of the source data stream ensures that messages are distributed in the order in which they are received, each message must be read from the source block before the source block can process the next message."

From data stream (parallel task library)

If you want the elements to go to the next block exactly when they finished processing, you must explicitly pass them yourself, which will turn your TransformBlock into an ActionBlock :

 var printSolution = new ActionBlock<int>(input => { Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds); },executionDataflowBlockOptions); var plus10 = new ActionBlock<int>(async input => { if (input == 2) { await Task.Delay(5000); } Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed); await printSolution.SendAsync(input + 10); }, executionDataflowBlockOptions); 
+5
source

As of (at least) System.Threading.Tasks.Dataflow.4.6.0 , ExecutionDataflowBlockOptions now has the EnsureOrdered property, which can be set to false .

To update:

 Install-Package System.Threading.Tasks.Dataflow 

the code:

 var options = new ExecutionDataflowBlockOptions { EnsureOrdered = false }; var transform = new TransformBlock<int, int>(i => Transform(i), options); 

A few more examples: fooobar.com/questions/1207804 / ...

A development story that I thought was neat: https://github.com/dotnet/corefx/issues/536 https://github.com/dotnet/corefx/pull/5191

+2
source

Source: https://habr.com/ru/post/1207803/


All Articles