TPL architectural issue

I am currently working on a project where we have a task to process elements in parallel. So far, not very much;) Now to the problem. We have a list of identifiers, where we periodically (every 2 sec.) Call StoredProcedure for each identifier. 2 sec must be checked for each element separately, as they are added and deleted at runtime. In addition, we want to configure the maximum degree of parallelism, since the database should not be flooded with 300 threads at the same time. The element that is being processed should not be transferred for processing until it completes with the previous execution. The reason is that we want to prevent a queue in the queue for many elements in case of delays in the database.

Now we use a self-developing component that has a main thread that periodically checks which items need to be scheduled for processing. After it has a list, it deletes them in the IOCP-based user thread pool, and then uses waithandles to wait for processing of the items being processed. Then the next iteration begins. IOCP due to the theft of the work it provides.

I would like to replace this custom implementation with TPL / .NET 4 version, and I would like to know how you solve it (perfectly simple and well read / supported). I know about this article: http://msdn.microsoft.com/en-us/library/ee789351.aspx , but it just limits the number of threads used. Leaves work by stealing, periodically executing objects ....

Ideally, this will become a universal component that can be used for some tasks that need to be performed periodically for a list of items.

any input is welcome, TIA Martin

+6
source share
2 answers

I don’t think you really need to go down and mess with the direct TPL Tasks . To start, I created a BlockingCollection around a ConcurrentQueue (by default) without a BoundedCapacity , set to a BlockingCollection to store the identifiers that need to be processed.

 // Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service) BlockingCollection<string> idsToProcess = new BlockingCollection<string>(); 

From there, I simply used Parallel::ForEach in the enum returned from BlockingCollection::GetConsumingEnumerable . In the ForEach call, you configure ParallelOptions::MaxDegreeOfParallelism . Inside the body of ForEach you execute your stored procedure.

Now, as soon as the execution of the stored procedure is completed, you say that you do not want to reschedule the execution for at least two seconds. No problem, plan a System.Threading.Timer with a callback that simply adds the identifier back to the BlockingCollection in the supplied callback.

 Parallel.ForEach( idsToProcess.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 4 // read this from config }, (id) => { // ... execute sproc ... // Need to declare/assign this before the delegate so that we can dispose of it inside Timer timer = null; timer = new Timer( _ => { // Add the id back to the collection so it will be processed again idsToProcess.Add(id); // Cleanup the timer timer.Dispose(); }, null, // no state, id wee need is "captured" in the anonymous delegate 2000, // probably should read this from config Timeout.Infinite); } 

Finally, when the process completes, you will call BlockingCollection::CompleteAdding so that the enumerable is processed with a lock lock and termination, and Parallel :: ForEach will exit, If it was a Windows service, for example, you would do it in OnStop .

 // When ready to shutdown you just signal you're done adding idsToProcess.CompleteAdding(); 

Update

You have expressed serious concern in your comment that you can process a large number of identifiers at any given point and fear that there will be too much overhead in the timer for each identifier. I would agree with that. Therefore, in the case when you are dealing with a large list of identifiers at the same time, I would refuse to use a timer for an identifier to use another queue to store "sleeping" identifiers that are controlled by one short time interval. First you need a ConcurrentQueue , on which you need to put the identifiers that are sleeping:

 ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>(); 

Now I use the two-part Tuple here for illustrative purposes, but you can create a more strongly typed structure for it (or at least an alias with a using statement) for better readability. A tuple has an identifier and a DateTime that represents when it was queued.

Now you also want to set up a timer that will monitor this queue:

 Timer wakeSleepingIdsTimer = new Timer( _ => { DateTime utcNow = DateTime.UtcNow; // Pull all items from the sleeping queue that have been there for at least 2 seconds foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) { // Add this id back to the processing queue idsToProcess.Enqueue(id); } }, null, // no state Timeout.Infinite, // no due time 100 // wake up every 100ms, probably should read this from config ); 

Then you just change Parallel::ForEach to do the following, rather than setting a timer for each of them:

 (id) => { // ... execute sproc ... sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); } 
+9
source

This is very similar to the approach you talked about in your question, but does it with TPL tasks. A task simply adds itself back to the list of things to plan when it is done.

Using locks in a regular list is pretty ugly in this example, it will probably require the best collection to contain a list of things to plan

 // Fill the idsToSchedule for (int id = 0; id < 5; id++) { idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id)); } // LongRunning will tell TPL to create a new thread to run this on Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning); 

Starts a SchedulingLoop, which actually performs a check if it was two seconds since something started

 // Tuple of the last time an id was processed and the id of the thing to schedule static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>(); static int currentlyProcessing = 0; const int ProcessingLimit = 3; // An event loop that performs the scheduling public static void SchedulingLoop() { while (true) { lock (idsToSchedule) { DateTime currentTime = DateTime.Now; for (int index = idsToSchedule.Count - 1; index >= 0; index--) { var scheduleItem = idsToSchedule[index]; var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds; // start it executing in a background task if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit) { Interlocked.Increment(ref currentlyProcessing); Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun); // Schedule this task to be processed Task.Factory.StartNew(() => { Console.WriteLine("Executing {0}", scheduleItem.Item2); // simulate the time taken to call this procedure Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500); lock (idsToSchedule) { idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2)); } Console.WriteLine("Done Executing {0}", scheduleItem.Item2); Interlocked.Decrement(ref currentlyProcessing); }); // remove this from the list of things to schedule idsToSchedule.RemoveAt(index); } } } Thread.Sleep(100); } } 
+1
source

Source: https://habr.com/ru/post/890233/


All Articles