I donβt think you really need to go down and mess with the direct TPL Tasks . To start, I created a BlockingCollection around a ConcurrentQueue (by default) without a BoundedCapacity , set to a BlockingCollection to store the identifiers that need to be processed.
From there, I simply used Parallel::ForEach in the enum returned from BlockingCollection::GetConsumingEnumerable . In the ForEach call, you configure ParallelOptions::MaxDegreeOfParallelism . Inside the body of ForEach you execute your stored procedure.
Now, as soon as the execution of the stored procedure is completed, you say that you do not want to reschedule the execution for at least two seconds. No problem, plan a System.Threading.Timer with a callback that simply adds the identifier back to the BlockingCollection in the supplied callback.
Parallel.ForEach( idsToProcess.GetConsumingEnumerable(), new ParallelOptions { MaxDegreeOfParallelism = 4 // read this from config }, (id) => { // ... execute sproc ... // Need to declare/assign this before the delegate so that we can dispose of it inside Timer timer = null; timer = new Timer( _ => { // Add the id back to the collection so it will be processed again idsToProcess.Add(id); // Cleanup the timer timer.Dispose(); }, null, // no state, id wee need is "captured" in the anonymous delegate 2000, // probably should read this from config Timeout.Infinite); }
Finally, when the process completes, you will call BlockingCollection::CompleteAdding so that the enumerable is processed with a lock lock and termination, and Parallel :: ForEach will exit, If it was a Windows service, for example, you would do it in OnStop .
Update
You have expressed serious concern in your comment that you can process a large number of identifiers at any given point and fear that there will be too much overhead in the timer for each identifier. I would agree with that. Therefore, in the case when you are dealing with a large list of identifiers at the same time, I would refuse to use a timer for an identifier to use another queue to store "sleeping" identifiers that are controlled by one short time interval. First you need a ConcurrentQueue , on which you need to put the identifiers that are sleeping:
ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>();
Now I use the two-part Tuple here for illustrative purposes, but you can create a more strongly typed structure for it (or at least an alias with a using statement) for better readability. A tuple has an identifier and a DateTime that represents when it was queued.
Now you also want to set up a timer that will monitor this queue:
Timer wakeSleepingIdsTimer = new Timer( _ => { DateTime utcNow = DateTime.UtcNow; // Pull all items from the sleeping queue that have been there for at least 2 seconds foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) { // Add this id back to the processing queue idsToProcess.Enqueue(id); } }, null, // no state Timeout.Infinite, // no due time 100 // wake up every 100ms, probably should read this from config );
Then you just change Parallel::ForEach to do the following, rather than setting a timer for each of them:
(id) => {