I have a WCF service hosted in a windows service. This service provides 2 methods:
bool ProcessClaim(string options, ref string xml); It takes some data as input, performs some processing (including operations with binding to IO, for example, database queries) and returns the result.void RunJob(string ticket); Refund immediately. According to ticket , it reads the input from the repository (for example, a database or file system), performs the same processing for each data item and saves the result back to the repository. A package usually consists of many claims.
Users can call ProcessClaim to process individual requests and RunJob to run packages. Several parties can work simultaneously. Each processing request ends as a Task , so all requests are executed in parallel. The problem was not to allow parties to block the processing queue by scheduling multiple requests. In other words, if the user executes a huge batch, he blocks small batches and individual processing requests for a significant amount of time. So I came up with the following diagram, well described by Albahari (very briefly):
public sealed class ProcessingQueue : IDisposable { private class WorkItem { public readonly TaskCompletionSource<string> TaskSource; public readonly string Options; public readonly string Claim; public readonly CancellationToken? CancelToken; public WorkItem( TaskCompletionSource<string> taskSource, string options, string claim, CancellationToken? cancelToken) { TaskSource = taskSource; Options = options; Claim = claim; CancelToken = cancelToken; } } public ProcessingQueue() : this(Environment.ProcessorCount) { } public ProcessingQueue(int workerCount) { _taskQ = new BlockingCollection<WorkItem>(workerCount * 2); for (var i = 0; i < workerCount; i++) Task.Factory.StartNew(Consume); } public void Dispose() { _taskQ.CompleteAdding(); } private readonly BlockingCollection<WorkItem> _taskQ; public Task<string> EnqueueTask(string options, string claim, CancellationToken? cancelToken = null) { var tcs = new TaskCompletionSource<string>(); _taskQ.Add(new WorkItem(tcs, options, claim, cancelToken)); return tcs.Task; } public static Task<string> ProcessRequest(string options, string claim, CancellationToken? cancelToken = null) { return Task<string>.Factory.StartNew(() => ProcessItem(options, claim)); } private void Consume() { foreach (var workItem in _taskQ.GetConsumingEnumerable()) { if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested) workItem.TaskSource.SetCanceled(); else { try { workItem.TaskSource.SetResult(ProcessItem(workItem.Options, workItem.Claim)); } catch (Exception ex) { workItem.TaskSource.SetException(ex); } } } } private static string ProcessItem(string options, string claim) {
The static ProcessRequest method can be used to process individual requests, while the EnqueueTask instance method can be used for batch processing. Of course, all parties must use one common instance of ProcessingQueue . Although this approach works very well and allows you to control the pace of several parties working simultaneously, it seems to me that something is wrong:
- You must maintain a workflow pool manually
- It is difficult to guess the optimal number of worker threads (by default I use the number of processor cores)
- The thread bundle remains blocked when no batches are performed, wasting system resources
- IO-related parts of workflow workflows that reduce CPU utilization
I wonder if there is a better way to deal with such scenarios?
Update: One of the requirements is to provide full capacity for batches, that is, when the user performs one batch, and there are no other incoming requests, all resources should be allocated for processing this batch.