Why do blocks work in this order?

This is a short code example to quickly introduce you to what my question is about:

using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var firstBlock = new TransformBlock<int, int>(x => x, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var secondBlock = new TransformBlock<int,string>(async x => { if (x == 12) { await Task.Delay(5000); return $"{DateTime.Now}: Message is {x} (This is delayed message!) "; } return $"{DateTime.Now}: Message is {x}"; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var thirdBlock = new ActionBlock<string>(s => Console.WriteLine(s), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); firstBlock.LinkTo(secondBlock); secondBlock.LinkTo(thirdBlock); var populateTask = Task.Run(async () => { foreach (var x in Enumerable.Range(1, 15)) { await firstBlock.SendAsync(x); } }); populateTask.Wait(); secondBlock.Completion.Wait(); } } } 

Conclusion:

 09.08.2016 15:03:08: Message is 1 09.08.2016 15:03:08: Message is 5 09.08.2016 15:03:08: Message is 6 09.08.2016 15:03:08: Message is 7 09.08.2016 15:03:08: Message is 8 09.08.2016 15:03:08: Message is 9 09.08.2016 15:03:08: Message is 10 09.08.2016 15:03:08: Message is 11 09.08.2016 15:03:08: Message is 3 09.08.2016 15:03:08: Message is 2 09.08.2016 15:03:08: Message is 4 09.08.2016 15:03:13: Message is 12 (This is delayed message!) 09.08.2016 15:03:08: Message is 15 09.08.2016 15:03:08: Message is 13 09.08.2016 15:03:08: Message is 14 

Why this order and how can I change the network to get the result below?

 09.08.2016 15:03:08: Message is 1 09.08.2016 15:03:08: Message is 5 09.08.2016 15:03:08: Message is 6 09.08.2016 15:03:08: Message is 7 09.08.2016 15:03:08: Message is 8 09.08.2016 15:03:08: Message is 9 09.08.2016 15:03:08: Message is 10 09.08.2016 15:03:08: Message is 11 09.08.2016 15:03:08: Message is 3 09.08.2016 15:03:08: Message is 2 09.08.2016 15:03:08: Message is 4 09.08.2016 15:03:08: Message is 15 09.08.2016 15:03:08: Message is 13 09.08.2016 15:03:08: Message is 14 09.08.2016 15:03:13: Message is 12 (This is delayed message!) 

So, I wonder why all the other blocks (or tasks here) are waiting for a pending block?


UPDATE

Since you guys asked me to explain my problem in more detail, I made this sample, which is closer to the real conveyor I'm working on. Let's say an application loads some data and computes a hash based on the returned response.

 using System; using System.Diagnostics; using System.Linq; using System.Net.Http; using System.Security.Cryptography; using System.Text; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var firstBlock = new TransformBlock<int, string>(x => x.ToString(), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x => { using (var httpClient = new HttpClient()) { if (x == "4") await Task.Delay(5000); var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}"); return new Tuple<string, string>(x, result); } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x => { using (var algorithm = SHA256.Create()) { var bytes = Encoding.UTF8.GetBytes(x.Item2); var hash = algorithm.ComputeHash(bytes); return new Tuple<string, byte[]>(x.Item1, hash); } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x => { var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}"; Console.WriteLine(output); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); firstBlock.LinkTo(secondBlock); secondBlock.LinkTo(thirdBlock); thirdBlock.LinkTo(fourthBlock); var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x)); Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait(); fourthBlock.Completion.Wait(); } private static string GetHashAsString(byte[] bytes) { var sb = new StringBuilder(); int i; for (i = 0; i < bytes.Length; i++) { sb.AppendFormat("{0:X2}", bytes[i]); if (i % 4 == 3) sb.Append(" "); } return sb.ToString(); } } } 

Let's look at the order of queries:

inquiries

That definitely makes sense. All inquiries are accepted as soon as possible. A slow fourth query is at the end of the list.

Now let's see what result we have:

 09.08.2016 20:44:53: Hash for element #3: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32 09.08.2016 20:44:53: Hash for element #2: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32 09.08.2016 20:44:53: Hash for element #1: 4D0AB933 EE521204 CA784F3E 248EC698 F9E4D5F3 8F23A78F 3A00E069 29E73E32 09.08.2016 20:44:58: Hash for element #6: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #8: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #9: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #10: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #4: 44A63CBF 8E27D0DD AFE5A761 AADA4E49 AA52FE8E E3D7DC82 AFEAAF1D 72A9BC7F 09.08.2016 20:44:58: Hash for element #5: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 09.08.2016 20:44:58: Hash for element #7: FC86E4F8 A83036BA 365BC7EE F9371778 59A11186 ED12A43C 3885D686 5004E6B3 

You can see that all the hashes after the third were calculated immediately after the fourth answer appeared.

So, based on these two facts, we can say that all the loaded pages were expecting a slow fourth request. It would be better not to wait for the fourth request and calculate the hashes as soon as the data is loaded. Is there any way to achieve this?

+3
source share
3 answers

Well, through the link from @SirRufo, I started thinking about implementing my own TransformBlock , which would fit my needs and process incoming elements without respect for the order. Thus, this will not ruin the network, creating a gap between the blocks in the download part and will be an elegant way.

So, I began to look at what and how I can do. Studying TransformBlock sources seemed like a good starting point, so I discovered TransformBlock sources on Github and started analyzing This.

From the very beginning of the class, I found this interesting thing: // If parallelism is used, we will need to support reordering messages that end out of order.

 // However, a developer can override this with EnsureOrdered == false. if (dataflowBlockOptions.SupportsParallelExecution && dataflowBlockOptions.EnsureOrdered) { _reorderingBuffer = new ReorderingBuffer<TOutput>(this, (owningSource, message) => ((TransformBlock<TInput, TOutput>)owningSource)._source.AddMessage(message)); } 

Looks like what we want! Let's look at this EnsureOrdered option in the DataflowBlockOptions class on Github :

 /// <summary>Gets or sets whether ordered processing should be enforced on a block handling of messages.</summary> /// <remarks> /// By default, dataflow blocks enforce ordering on the processing of messages. This means that a /// block like <see cref="TransformBlock{TInput, TOutput}"/> will ensure that messages are output in the same /// order they were input, even if parallelism is employed by the block and the processing of a message N finishes /// after the processing of a subsequent message N+1 (the block will reorder the results to maintain the input /// ordering prior to making those results available to a consumer). Some blocks may allow this to be relaxed, /// however. Setting <see cref="EnsureOrdered"/> to false tells a block that it may relax this ordering if /// it able to do so. This can be beneficial if the immediacy of a processed result being made available /// is more important than the input-to-output ordering being maintained. /// </remarks> public bool EnsureOrdered { get { return _ensureOrdered; } set { _ensureOrdered = value; } } 

It looked very good, so I immediately switched to the IDE to install it. Unfortunately, there were no such settings:

No EnsureOrdered

I continued searching and found this note :

4.5.25-beta-23019

The package has been renamed to System.Threading.Tasks.Dataflow

When I googled and found this package , it is called System.Threading.Tasks.Dataflow ! So I uninstalled the Microsoft.Tpl.Dataflow package and installed System.Threading.Tasks.Dataflow by issuing:

 Install-Package System.Threading.Tasks.Dataflow 

And there was an EnsureOrdered option. I updated the code from setting EnsureOrdered to false :

 using System; using System.Diagnostics; using System.Linq; using System.Net.Http; using System.Security.Cryptography; using System.Text; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace DataflowTest { class Program { static void Main(string[] args) { var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, EnsureOrdered = false }; var firstBlock = new TransformBlock<int, string>(x => x.ToString(), options); var secondBlock = new TransformBlock<string, Tuple<string, string>>(async x => { using (var httpClient = new HttpClient()) { if (x == "4") await Task.Delay(5000); var result = await httpClient.GetStringAsync($"http://scooterlabs.com/echo/{x}"); return new Tuple<string, string>(x, result); } }, options); var thirdBlock = new TransformBlock<Tuple<string, string>, Tuple<string, byte[]>>(x => { using (var algorithm = SHA256.Create()) { var bytes = Encoding.UTF8.GetBytes(x.Item2); var hash = algorithm.ComputeHash(bytes); return new Tuple<string, byte[]>(x.Item1, hash); } }, options); var fourthBlock = new ActionBlock<Tuple<string, byte[]>>(x => { var output = $"{DateTime.Now}: Hash for element #{x.Item1}: {GetHashAsString(x.Item2)}"; Console.WriteLine(output); }, options); firstBlock.LinkTo(secondBlock); secondBlock.LinkTo(thirdBlock); thirdBlock.LinkTo(fourthBlock); var populateTasks = Enumerable.Range(1, 10).Select(x => firstBlock.SendAsync(x)); Task.WhenAll(populateTasks).ContinueWith(x => firstBlock.Complete()).Wait(); fourthBlock.Completion.Wait(); } private static string GetHashAsString(byte[] bytes) { var sb = new StringBuilder(); int i; for (i = 0; i < bytes.Length; i++) { sb.AppendFormat("{0:X2}", bytes[i]); if (i % 4 == 3) sb.Append(" "); } return sb.ToString(); } } } 

And the result of the result was exactly what I want:

 10.08.2016 11:03:23: Hash for element #3: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481 10.08.2016 11:03:23: Hash for element #1: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481 10.08.2016 11:03:23: Hash for element #2: 8BA8A86D F25E058E 180F7AA9 1EE996B0 8D721C84 AEE8AA19 0A3F7C44 9FFED481 10.08.2016 11:03:23: Hash for element #10: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #8: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #9: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #5: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #7: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:23: Hash for element #6: C8C87B26 B17A6329 3F6213CD 4F9AFE0D 4F90127B AAE49EB0 7D8C15DF 74F020B1 10.08.2016 11:03:27: Hash for element #4: FD25E52B FCD8DE81 BD38E11B 13C20B96 09473283 F25346B2 04593B70 E4357BDA 
+4
source

It is by design and documented.

Since each predefined type of source data source block ensures that messages are distributed in the order in which they are received, ...

Evidence:

 var ts = Environment.TickCount; var firstBlock = new TransformBlock<int, int>( x => x, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, } ); var secondBlock = new TransformBlock<int, string>( x => { var start = Environment.TickCount; if ( x == 3 ) { Thread.Sleep( 5000 ); return $"Start {start-ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) "; } return $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}"; }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, // limit the internal queue to 10 items BoundedCapacity = 10, } ); var thirdBlock = new ActionBlock<string>( s => { Console.WriteLine( s ); }, new ExecutionDataflowBlockOptions { // limit to a single task to watch the order MaxDegreeOfParallelism = 1, } ); firstBlock.LinkTo( secondBlock, new DataflowLinkOptions { PropagateCompletion = true, } ); secondBlock.LinkTo( thirdBlock, new DataflowLinkOptions { PropagateCompletion = true, } ); foreach ( var x in Enumerable.Range( 1, 15 ) ) { // to ensure order of items firstBlock.Post( x ); } firstBlock.Complete(); thirdBlock.Completion.Wait(); 

Conclusion:

  Start 31 Finished 31: Message is 1
 Start 31 Finished 31: Message is 2
 Start 31 Finished 5031: Message is 3 (This is delayed message!)
 Start 31 Finished 31: Message is 4
 Start 31 Finished 31: Message is 5
 Start 31 Finished 31: Message is 6
 Start 31 Finished 31: Message is 7
 Start 31 Finished 31: Message is 8
 Start 31 Finished 31: Message is 9
 Start 31 Finished 31: Message is 10
 Start 31 Finished 31: Message is 11
 Start 31 Finished 31: Message is 12
 Start 5031 Finished 5031: Message is 13
 Start 5031 Finished 5031: Message is 14
 Start 5031 Finished 5031: Message is 15

Solution 1

Do not use DataFlow as part of the download because the order guarantee will block the processing you process.

 var ts = Environment.TickCount; var thirdBlock = new ActionBlock<string>( s => { Console.WriteLine( s ); }, new ExecutionDataflowBlockOptions { // limit to a single task to watch the order MaxDegreeOfParallelism = 4, } ); Parallel.ForEach( Enumerable.Range( 1, 15 ), new ParallelOptions { MaxDegreeOfParallelism = 4, }, x => { var start = Environment.TickCount; string result; if ( x == 12 ) { Thread.Sleep( 5000 ); result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x} (This is delayed message!) "; } else result = $"Start {start - ts} Finished {Environment.TickCount - ts}: Message is {x}"; thirdBlock.Post( result ); } ); thirdBlock.Complete(); thirdBlock.Completion.Wait(); 

Conclusion:

  Start 32 Finished 32: Message is 2
 Start 32 Finished 32: Message is 6
 Start 32 Finished 32: Message is 5
 Start 32 Finished 32: Message is 8
 Start 32 Finished 32: Message is 9
 Start 32 Finished 32: Message is 10
 Start 32 Finished 32: Message is 11
 Start 32 Finished 32: Message is 7
 Start 32 Finished 32: Message is 13
 Start 32 Finished 32: Message is 14
 Start 32 Finished 32: Message is 15
 Start 32 Finished 32: Message is 3
 Start 32 Finished 32: Message is 4
 Start 32 Finished 32: Message is 1
 Start 32 Finished 5032: Message is 12 (This is delayed message!)

Decision 2

Of course, you can implement IPropagatorBlock<TInput,TOutput> in a custom class that did not guarantee the order of the elements.

+3
source

Looking at the timestamps, the output of the second block works as you expect - the TransformBlock delay starts after all the other TransformBlocks. It seems to be Console.WriteLine in the ActionBlock, which is not being called in the expected order.

Is your code secondBlock.Completion.Wait(); wrong - should it be thirdBlock.Completion.Wait(); to get the expected results?

0
source

Source: https://habr.com/ru/post/1207804/


All Articles