Parallel .ForEach using Thread.Sleep equivalent

So here is the situation. I need to call the website that will start the search. This search continues for an unknown time, and the only way to find out if the search has completed is to periodically request the website to see if there is a “Download data” link somewhere on it (it uses some strange ajax call to javascript timer to check backend and refresh page, I think).

So here is the trick. I have hundreds of items that I need to search, one at a time. So I have code that looks something like this:

var items = getItems(); Parallel.ForEach(items, item => { startSearch(item); var finished = isSearchFinished(item); while(finished == false) { finished = isSearchFinished(item); //<--- How do I delay this action 30 Secs? } downloadData(item); } 

Now, obviously, this is not real code, because there may be things that cause isSearchFinished to always be false.

The obvious danger of an infinite loop aside, how would I properly save isSearchFinished () from the call again and again, but instead call it every, say, 30 seconds or 1 minute?

I know that Thread.Sleep () is not the right solution, and I think that the solution can be done using Threading.Timer (), but I am not very familiar with it, and there are so many thread options that I just not sure what to use.

+6
source share
2 answers

This is pretty easy to implement with tasks and async/await , as @KevinS noted in the comments:

 async Task<ItemData> ProcessItemAsync(Item item) { while (true) { if (await isSearchFinishedAsync(item)) break; await Task.Delay(30 * 1000); } return await downloadDataAsync(item); } // ... var items = getItems(); var tasks = items.Select(i => ProcessItemAsync(i)).ToArray(); await Task.WhenAll(tasks); var data = tasks.Select(t = > t.Result); 

Thus, you do not block ThreadPool threads in vain for what is mainly related to network operations related to I / O. If you are new to async/await , the async-await wiki tag may be a good place to start.

I assume that you can convert your synchronous isSearchFinished and downloadData methods to asynchronous versions using something like HttpClient for a non-blocking HTTP request and returning a Task<> . If you cannot do this, you can simply wrap them in Task.Run , like await Task.Run(() => isSearchFinished(item)) and await Task.Run(() => downloadData(item)) . This is usually not recommended, but since you have hundreds of elements, this threshold will give you a much better level of concurrency than using Parallel.ForEach in this case, because you will not block pool threads for 30 s, thanks to the asynchronous Task.Delay .

+8
source

You can also write a generic function using TaskCompletionSource and Threading.Timer to return a Task that will be completed after the successful implementation of the specified retry function.

 public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval) { return RetryAsync(retryFunc, retryInterval, CancellationToken.None); } public static Task RetryAsync(Func<bool> retryFunc, TimeSpan retryInterval, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource<object>(); cancellationToken.Register(() => tcs.TrySetCanceled()); var timer = new Timer((state) => { var taskCompletionSource = (TaskCompletionSource<object>) state; try { if (retryFunc()) { taskCompletionSource.TrySetResult(null); } } catch (Exception ex) { taskCompletionSource.TrySetException(ex); } }, tcs, TimeSpan.FromMilliseconds(0), retryInterval); // Once the task is complete, dispose of the timer so it doesn't keep firing. Also captures the timer // in a closure so it does not get disposed. tcs.Task.ContinueWith(t => timer.Dispose(), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); return tcs.Task; } 

Then you can use RetryAsync as follows:

 var searchTasks = new List<Task>(); searchTasks.AddRange(items.Select( downloadItem => RetryAsync( () => isSearchFinished(downloadItem), TimeSpan.FromSeconds(2)) // retry timout .ContinueWith(t => downloadData(downloadItem), CancellationToken.None, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default))); await Task.WhenAll(searchTasks.ToArray()); 

The ContinueWith part indicates what you do after the task completes successfully. In this case, it will run your downloadData method in the thread of the thread pool, because we specified TaskScheduler.Default , and the continuation will be performed only if the task completed before completion, i.e. It was not canceled and no exception was selected.

+2
source

Source: https://habr.com/ru/post/971444/


All Articles