Making serial processing as easy as parallel processing

I have two .net task objects that I can run in parellel or in sequence. In any case, I do not want to block the thread to wait for them. As it turned out, Reactive Extensions make the parallel story just beautiful. But when I try to arrange tasks in a sequence, the code works, but just feels awkward.

I would like to know if anyone can show how to make the serial version more concise or to encode as easily as the parallel version. There is no need to use reactive extensions to answer this question.

For reference, here are my two solutions for parallel and serial processing.

Parallel Processing Version

This is pure joy:

public Task<string> DoWorkInParallel() { var result = new TaskCompletionSource<string>(); Task<int> AlphaTask = Task.Factory.StartNew(() => 4); Task<bool> BravoTask = Task.Factory.StartNew(() => true); //Prepare for Rx, and set filters to allow 'Zip' to terminate early //in some cases. IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5); IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y); Observable .Zip( AsyncAlpha, AsyncBravo, (x, y) => y.ToString() + x.ToString()) .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe( (x) => { result.TrySetResult(x); }, (x) => { result.TrySetException(x.GetBaseException()); }, () => { result.TrySetResult("Nothing"); }); return result.Task; } 

Serial / Pipeline Processing Version

This works, but just awkwardly:

  public Task<string> DoWorkInSequence() { var result = new TaskCompletionSource<string>(); Task<int> AlphaTask = Task.Factory.StartNew(() => 4); AlphaTask.ContinueWith(x => { if (x.IsFaulted) { result.TrySetException(x.Exception.GetBaseException()); } else { if (x.Result != 5) { Task<bool> BravoTask = Task.Factory.StartNew(() => true); BravoTask.ContinueWith(y => { if (y.IsFaulted) { result.TrySetException(y.Exception.GetBaseException()); } else { if (y.Result) { result.TrySetResult(x.Result.ToString() + y.Result.ToString()); } else { result.TrySetResult("Nothing"); } } }); } else { result.TrySetResult("Nothing"); } } } ); return result.Task; } 

In the above serial code, this became a mess, and I didn’t even add a wait time according to the parallel version!

Requirements (UPDATED on 8/6)

For those in charge, remember that:

  • The sequential script should allow the location where the output of the first task feeds the input of the second. My sample “inconvenient” code above can be easily organized to achieve this.

  • I am interested in .net 4.5 answer, but .net 4.0 answer is equally or more important to me.

  • Alpha and Bravo tasks have a combined time limit of 200 ms to complete; they do not have 200 ms each. This is also true in the sequential case.

  • The SourceCompletionTask must end before both jobs end if any task returns an invalid result. The invalid result is either [AlphaTask: 5] or [BravoTask: false], as indicated by explicit tests in the code example.
    Update 8/8: Clarification . In the sequential case, BravoTask should not be executed at all if AlphaTask is not running or if a timeout has already occurred.

  • Assume that both AlphaTask and BravoTask cannot block. Not that this is important, but in my actual scenario, they are actually asynchronous calls to the WCF service.

Maybe there is an aspect of Rx that I could use to clear the serial version. But even just programming tasks in and of itself should have the best story I'd imagine. We'll see.

ERRATA In both code examples, I changed the return type to "Task" because the responses to the poster were completely correct and I did not have to return a TaskCompletion object.

+4
source share
4 answers

If you can use async / wait, Brandon has a good answer. If you're still on VS2010, the first thing I would do to clear the serial version is to get an extension method similar to Then method by Stephen Tuub described in a blog post . I would also implement the Task.FromResult method if you are not using .NET 4.5. With them you can get:

 public Task<string> DoWorkInSequence() { return Task.FromResult(4) .Then(x => { if (x != 5) { return Task.FromResult(true) .Then(y => { if (y) { return Task.FromResult(x.ToString() + y.ToString()); } else { return Task.FromResult("Nothing"); } }); } else { return Task.FromResult("Nothing"); } }); } 

In addition, you usually have to return the task instead of the TaskCompletionSource object (which you can get by calling .Task on the TaskCompletionSource), since you do not want the caller to call the result for the task you are returning to them.

Brandon's answer also provides a good way to implement timeout functionality (correcting the lack of async / await keywords).

EDIT To reduce arrow code, we can implement more LINQ methods. The implementation of SelectMany is provided in a previously related blog post. Other methods we will need for LINQ are Select and Where. They should be simple enough once you have done Then and SelectMany, but here they are:

 public static Task<T> Where<T>(this Task<T> task, Func<T, bool> predicate) { if (task == null) throw new ArgumentNullException("task"); if (predicate == null) throw new ArgumentNullException("predicate"); var tcs = new TaskCompletionSource<T>(); task.ContinueWith((completed) => { if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions); else if (completed.IsCanceled) tcs.TrySetCanceled(); else { try { if (predicate(completed.Result)) tcs.TrySetResult(completed.Result); else tcs.TrySetCanceled(); } catch (Exception ex) { tcs.TrySetException(ex); } } }); return tcs.Task; } public static Task<TResult> Select<T, TResult>(this Task<T> task, Func<T, TResult> selector) { if (task == null) throw new ArgumentNullException("task"); if (selector == null) throw new ArgumentNullException("selector"); var tcs = new TaskCompletionSource<TResult>(); task.ContinueWith((completed) => { if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions); else if (completed.IsCanceled) tcs.TrySetCanceled(); else { try { tcs.TrySetResult(selector(completed.Result)); } catch (Exception ex) { tcs.TrySetException(ex); } } }); return tcs.Task; } 

After that, one final non-LINQ extension method allows you to use the default values ​​for cancellation to return:

 public static Task<T> IfCanceled<T>(this Task<T> task, T defaultValue) { if (task == null) throw new ArgumentNullException("task"); var tcs = new TaskCompletionSource<T>(); task.ContinueWith((completed) => { if (completed.IsFaulted) tcs.TrySetException(completed.Exception.InnerExceptions); else if (completed.IsCanceled) tcs.TrySetResult(defaultValue); else tcs.TrySetResult(completed.Result); }); return tcs.Task; } 

And the new and improved DoWork (sans timeout):

 public static Task<string> DoWorkInSequence() { return (from x in Task_FromResult(5) where x != 5 from y in Task_FromResult(true) where y select x.ToString() + y.ToString() ).IfCanceled("Nothing"); } 

The Timeout method from Brandon's response (after overwriting, if necessary without async / await) may get stuck at the end of the chain for a common timeout and / or after each step in the chain if you want to continue further steps as soon as a common timeout is reached. Another possibility to break the chain would be to take all the individual steps by taking a cancellation token and modify the Timeout method to take the CancellationTokenSource and cancel it if a timeout occurs, and also throw a timeout exception.

EDIT (Brent Arias)

Taking fantastic ideas from what you presented, I developed what, in my opinion, is the final answer from my POV. It is based on the .net 4.0 extension methods found in the nuget ParallelExtensionsExtras package. In the example below, a third task is added to help illustrate the “feel” of programming for sequential tasks, given my stated requirements:

 public Task<string> DoWorkInSequence() { var cts = new CancellationTokenSource(); Task timer = Task.Factory.StartNewDelayed(200, () => { cts.Cancel(); }); Task<int> AlphaTask = Task.Factory .StartNew(() => 4 ) .Where(x => x != 5 && !cts.IsCancellationRequested); Task<bool> BravoTask = AlphaTask .Then(x => true) .Where(x => x && !cts.IsCancellationRequested); Task<int> DeltaTask = BravoTask .Then(x => 7) .Where(x => x != 8); Task<string> final = Task.Factory .WhenAny(DeltaTask, timer) .ContinueWith(x => !DeltaTask.IsCanceled && DeltaTask.Status == TaskStatus.RanToCompletion ? AlphaTask.Result.ToString() + BravoTask.Result.ToString() + DeltaTask.Result.ToString(): "Nothing"); //This is here just for experimentation. Placing it at different points //above will have varying effects on what tasks were cancelled at a given point in time. cts.Cancel(); return final; } 

There are a few key points I made in this discussion and collaborative effort:

  • Using the "Then" extension is good in trivial cases, but has noticeable limited applicability. For more complex cases, it is necessary to replace it, for example .ContinueWith(x => true, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default) . When replacing "Then" with "ContinueWith" for my declared scenario, it is important to add the OnlyOnRanToCompletion parameter.
  • Using the Timeout extension ultimately just doesn't work in my scenario. This is because it will only result in the cancellation of the Task to which it is attached directly, instead of canceling all instances of the Antecedant instance in the sequence. That's why I switched to StartNewDelayed(...) tactics and added express cancellation in every Where clause.
  • Despite the fact that the ParallelExtensionsExtras library has LINQ to Tasks , which was defined by you, I came to the conclusion that it is better to avoid LINQ-ish with tasks. This is because tasks with LINQ are very esoteric ; this most likely confuses the hell out of the average developer. It's hard enough to make them understand asynchronous coding. Even the author of LINQ Tasks said, " How useful this LINQ implementation is in practice is controversial , but at least it provides interesting thought exercises." Yes, I agree, an interesting thought. Of course, I should at least admit the LINQ to Tasks “Where” method, since it played a key role in the solution that I listed above.
+4
source
 public Task<string> DoWorkInSequence() { Task<int> AlphaTask = Task.Factory.StartNew(() => 4); Func<int> BravoFunc = x => 2 * x; //Prepare for Rx, and set filters to allow 'Zip' to terminate early //in some cases. IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5); return AsyncAlpha .Do(x => Console.WriteLine(x)) //This is how you "Do WORK in sequence" .Select(BravoFunc) //This is how you map results from Alpha //via a second method. .Timeout(TimeSpan.FromMilliseconds(200)).Subscribe( (x) => { result.TrySetResult(x); }, (x) => { result.TrySetException(x.GetBaseException()); }, () => { result.TrySetResult("Nothing"); }).ToTask(); } 

Ultimately, however, I would just do it all in TPL if you want Tasks, or use Observable.ToTask(this IObservable<T> observable) as opposed to using TaskCompletionSource

+1
source

Firstly, I did not return TaskCompletionSource . This is a means to an end ... a detail of the implementation of your method that should be hidden from the public API. Instead, your method should return Task (it should just return result.Task ).

In any case, if you just work with tasks, you should just use TPL and not use Rx. Use Rx only if you really need to integrate your tasks with other rx code. Even your DoWorkInParallel can be made much simpler if you do not mix Rx material. Rx works fabulously with complex tasks. But the scenarios that you describe are relatively simple and can be solved simply using TPL.

Here's how to run the parallel and serial versions in TPL:

 /// <summary>Extension methods for timing out tasks</summary> public static class TaskExtensions { /// <summary> throws an error if task does not complete before the timer.</summary> public static async Task Timeout(this Task t, Task timer) { var any = await Task.WhenAny(t, timer); if (any != t) { throw new TimeoutException("task timed out"); } } /// <summary> throws an error if task does not complete before the timer.</summary> public static async Task<T> Timeout<T>(this Task<T> t, Task timer) { await Timeout((Task)t, timer); return t.Result; } /// <summary> throws an error if task does not complete in time.</summary> public static Task Timeout(this Task t, TimeSpan delay) { return t.IsCompleted ? t : Timeout(t, Task.Delay(delay)); } /// <summary> throws an error if task does not complete in time.</summary> public static Task<T> Timeout<T>(this Task<T> t, TimeSpan delay) { return Timeout((Task)t, delay); } } // .. elsewhere .. public async Task<string> DoWorkInParallel() { var timer = Task.Delay(TimeSpan.FromMilliseconds(200)); var alphaTask = Task.Run(() => 4); var betaTask = Task.Run(() => true); // wait for one of the tasks to complete var t = await Task.WhenAny(alphaTask, betaTask).Timeout(timer); // exit early if the task produced an invalid result if ((t == alphaTask && alphaTask.Result != 5) || (t == betaTask && !betaTask.Result)) return "Nothing"; // wait for the other task to complete // could also just write: await Task.WhenAll(alphaTask, betaTask).Timeout(timer); await ((t == alphaTask) ? (Task)betaTask : (Task)alphaTask).Timeout(timer); // unfortunately need to repeat the validation logic here. // this logic could be moved to a helper method that is just called in both places. var alpha = alphaTask.Result; var beta = betaTask.Result; return (alpha != 5 && beta) ? (alpha.ToString() + beta.ToString()) : "Nothing"; } public async Task<string> DoWorkInSequence() { var timer = Task.Delay(TimeSpan.FromMilliseconds(200)); var alpha = await Task.Run(() => 4).Timeout(timer); if (alpha != 5) { var beta = await Task.Run(() => true).Timeout(timer); if (beta) { return alpha.ToString() + beta.ToString(); } } return "Nothing"; } 

If you need to do your job in .Net 4.0, you can use the Microsoft.Bcl.Async nuget package, which allows you to use the VS2012 compiler to target .Net 4.0 and still use async / wait. See This SO Question: Using Async Waiting in .net 4

Edit: I changed the code to complete the work for both parallel and serial versions if the tasks result in invalid values, and I changed the timeout for combining instead of a single task. Although in a sequential case, this timer will also count the time between two tasks.

+1
source

Aron had almost a spot on

 public Task<string> DoWorkSequentially() { Task<int> AlphaTask = Task.Run(() => 4); //Some work; Task<bool> BravoTask = Task.Run(() => true);//Some other work; //Prepare for Rx, and set filters to allow 'Zip' to terminate early //in some cases. IObservable<int> AsyncAlpha = AlphaTask.ToObservable().TakeWhile(x => x != 5); IObservable<bool> AsyncBravo = BravoTask.ToObservable().TakeWhile(y => y); return (from alpha in AsyncAlpha from bravo in AsyncBravo select bravo.ToString() + alpha.ToString()) .Timeout(TimeSpan.FromMilliseconds(200)) .Concat(Observable.Return("Nothing")) //Return Nothing if no result .Take(1) .ToTask(); } 

Here I just returned BravoFunc to BravoTask . I deleted TaskCompletionSource (as Aron did). Finally, you use the ToTask() operator to return the Rx continuation back to Task<string> .

note that

  from alpha in AsyncAlpha from bravo in AsyncBravo select bravo.ToString() + alpha.ToString() 

Can also be written as

  AsyncAlpha.SelectMany(a=>AsyncBravo.Select(b=> b.ToString() + a.ToString())) 

The SelectMany statement is very convenient for these types of extensions. This is even more convenient in the syntax for understanding the query, since you still have access to bravo and alpha in the final select clause.

As you can see, this becomes extremely useful when you have many sequels. For example, consider an example where you need 3 or 4 continuations

  from a in Alpha from b in Bravo from c in Charlie from d in Delta select a+b+c+d 

It has many real world applications. I see this as a general outline. Some examples include: Waiting for a server connection, and then receiving a session token to access the service client.

  from isConnected in _server.ConnectionState.Where(c=>c) from session in _server.GetSession() from customer in _customerServiceClient.GetCustomers(session) select customer; 

or, perhaps, in the Social Media channel, where we need to authenticate, find a contact, get a list of our letters, and then pull out the first 20 headers of these letters.

  from accessToken in _oauth.Authenticate() from contact in _contactServiceClient.GetContact(emailAddress, accessToken) from imapMessageId in _mailServiceClient.Search(contact).Take(20) from email in _mailServiceClient.GetEmailHeaders(imapMessageId) select email; 
+1
source

Source: https://habr.com/ru/post/1495330/


All Articles