How to set max concurrent threads using Task

I am writing a stress testing utility. In this utility, I want me to constantly add a load of 10 threads (out of 10,000). Here is my code

Stopwatch watch = new Stopwatch(); watch.Start(); int itemProcessed = 0; do { List<Task> taskList = new List<Task>(); for (int i = 0; i < _parallelThreadCount; i++) { taskList.Add(Task.Factory.StartNew(() => _taskDelegate())); itemProcessed++; } Task.WaitAll(taskList.ToArray()); } while (itemProcessed < _batchSize); watch.Stop(); 

Now the problem is that I have Task.WaitAll, due to which 10 threads are initially loaded, then 9,8,7,6,5,4,3,2,1,0. And again I add 10 more topics.

Can someone let me know how to achieve this.

+4
source share
2 answers

If you can change the structure of the code a bit (read: replace the do while ), you can use the Parallel class . Here is a quick example:

 List<int> data = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; Parallel.ForEach(data, new ParallelOptions() { MaxDegreeOfParallelism = 10 }, d => { Console.WriteLine(d); }); 

The most likely for you is the MaxDegreeOfParallelism property of ParallelOptions - it determines how many threads can be started at the same time.

EDIT:

Since you do not have a list of tasks, but just need to repeat the same operation several times, you can use Parallel.For . This is what the code looks like:

 int repeatCount = 100; int itemProcessed = 0; Parallel.For(0, repeatCount, new ParallelOptions() { MaxDegreeOfParallelism = 10 }, i => { _taskDelegate(); System.Threading.Interlocked.Increment(ref itemProcessed); }); 

Please note that if the only reason you used itemProcessed was to check how long your loop will work, you can safely remove two lines from the code above.

+8
source

Shaamaan's answer is good and probably the one you want to use for your specific scenario. I just present a couple of other possible options that you could use, and this may be more applicable to other situations.

My blog post shows how to do this with both tasks and actions, and provides an example project that you can download and run to see both In battle.

With action

When using actions, you can use the built-in .Net Parallel.Invoke function. Here we restrict it to running no more than 10 threads in parallel.

 var listOfActions = new List<Action>(); for (int i = 0; i < 10000; i++) { // Note that we create the Action here, but do not start it. listOfActions.Add(() => DoSomething()); } var options = new ParallelOptions {MaxDegreeOfParallelism = 10}; Parallel.Invoke(options, listOfActions.ToArray()); 

With tasks

There is no built-in function with Jobs. However, you can use the one I provide on my blog.

  /// <summary> /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="cancellationToken">The cancellation token.</param> public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { StartAndWaitAllThrottled(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } /// <summary> /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> /// <param name="cancellationToken">The cancellation token.</param> public static void StartAndWaitAllThrottled(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don&#39;t enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List<Task>(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. throttler.Wait(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler&#39;s using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. Task.WaitAll(postTaskTasks.ToArray(), cancellationToken); } } 

And then, by creating your list of tasks and calling the function so that they start, and, for example, no more than 10 simultaneous, you can do this:

 var listOfTasks = new List<Task>(); for (int i = 0; i < 10000; i++) { var count = i; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => Something())); } Tasks.StartAndWaitAllThrottled(listOfTasks, 10); 
+7
source

Source: https://habr.com/ru/post/1492886/


All Articles