Parallel.ForEach with custom TaskScheduler to prevent OutOfMemoryException

I process PDF files with very different sizes (simple scanning from 2 MB to high DPI for several hundred MB) through Parallel.ForEach and sometimes get an OutOfMemoryException - for obvious reasons because of the 32-bit process and the threads generated by Parallel.ForEach is busy with an unknown amount of memory work.

The MaxDegreeOfParallelism limitation works, although the bandwidth for the time when there is a large (10k +) batch of small PDF files to work is insufficient, as there may be more threads working due to the small memory capacity of these flows, This is a difficult process with a Parallel processor. ForEach, which easily reaches 100% of the CPU before hitting random groups of large PDF files and getting an OutOfMemoryException. Running Performance Profiler supports this.

In my opinion, having a separator for my Parallel.ForEach will not improve my performance.

This leads me to use a custom TaskScheduler passed to my Parallel.ForEach with a MemoryFailPoint tag. Searching around it seems insufficient to create custom TaskScheduler objects.

A look between the Specialized Task Schedulers in .NET 4 Parallel Extensions Additionally , the Custom TaskScheduler in C # and various answers here in Stackoverflow, I created my own TaskScheduler and have my QueueTask method as such:

 protected override void QueueTask(Task task) { lock (tasks) tasks.AddLast(task); try { using (MemoryFailPoint memFailPoint = new MemoryFailPoint(600)) { if (runningOrQueuedCount < maxDegreeOfParallelism) { runningOrQueuedCount++; RunTasks(); } } } catch (InsufficientMemoryException e) { // somehow return thread to pool? Console.WriteLine("InsufficientMemoryException"); } } 

While try / catch is a little expensive, my goal here is to catch when a probable maximum PDF size (+ some extra memory overhead) of 600 MB will throw an OutOfMemoryException. This solution seems to kill the thread trying to do this work when I catch the InsufficientMemoryException. If there are a large number of PDF files, my code ends with one Parallel.ForEach stream.

Other issues found in Stackoverflow on Parallel.ForEach and OutOfMemoryExceptions do not seem to be suitable for my use with maximum throughput using dynamic memory in streams and often just use MaxDegreeOfParallelism as a static solution, for example:

Thus, in order to have maximum throughput for variable working memory sizes, either:

  • How to return a thread back to threadpool when it was denied operation using the MemoryFailPoint check?
  • How / where do I safely create new threads to pick up work again when there is free memory?

Edit: The size of the PDF on disk cannot linearly represent the size in memory due to rasterization and the rasterized image processing component, which depends on the contents of the PDF.

+5
source share
1 answer

Using LimitedConcurrencyLevelTaskScheduler from Samples for Parallel Programming with the .NET Framework I was able to make a small adjustment to get what I was thinking. The following is the NotifyThreadPoolOfPendingWork method of the NotifyThreadPoolOfPendingWork class after modification:

 private void NotifyThreadPoolOfPendingWork() { ThreadPool.UnsafeQueueUserWorkItem(_ => { // Note that the current thread is now processing work items. // This is necessary to enable inlining of tasks into this thread. _currentThreadIsProcessingItems = true; try { // Process all available items in the queue. while (true) { Task item; lock (_tasks) { // When there are no more items to be processed, // note that we're done processing, and get out. if (_tasks.Count == 0) { --_delegatesQueuedOrRunning; break; } // Get the next item from the queue item = _tasks.First.Value; _tasks.RemoveFirst(); } // Execute the task we pulled out of the queue //base.TryExecuteTask(item); try { using (MemoryFailPoint memFailPoint = new MemoryFailPoint(650)) { base.TryExecuteTask(item); } } catch (InsufficientMemoryException e) { Thread.Sleep(500); lock (_tasks) { _tasks.AddLast(item); } } } } // We're done processing items on the current thread finally { _currentThreadIsProcessingItems = false; } }, null); } 

We will look at the trick, but vice versa. We will add a task over which we are going to return to the list of tasks ( _tasks ), which fires an event in order to get an accessible thread for selecting this work. But first, we sleep the current thread so that it does not immediately take away the work and does not return to the failed MemoryFailPoint check.

0
source

Source: https://habr.com/ru/post/1272649/


All Articles