Python ThreadPool with Limited Task Queue Size

My problem is this: I have a multiprocessing.pool.ThreadPool object with worker_count workers and the main pqueue from which I load tasks into the pool.

The flow is as follows: there is a main loop that receives a level element from pqueue and sends it to the pool using apply_async . When an element is processed, it generates level + 1 elements. The problem is that the pool accepts all tasks and processes them in the order in which they were sent.

More precisely, the fact that level 0 elements are processed and each generates 100 level 1 elements that are extracted directly from pqueue and added to the pool, each level 1 element produces 100 level 2 elements that are sent to the pool, etc., and elements processed in BFS mode.

I need to tell the pool so that it does not accept more worker_count elements in order to be able to get a higher level from pqueue in order to process elements in DFS mode.

The current solution I came to is this: for each task submitted, save the AsyncResult object in the asyncres_list and before removing items from pqueue I delete the processed elements (if any), check if the length of asyncres_list less than the number of threads in the pool every 0 , 5 seconds, and only thread_number tags will be processed as thread_number .

I am wondering if there is a cleaner way to achieve this behavior, and I cannot find some parameters in the documentation to limit the maximum number of tasks that can be sent to the pool.

0
source share
1 answer

ThreadPool is a simple tool for a common task. If you want to manage the queue yourself to get DFS behavior; you could implement the necessary functions in the upper threading and queue modules directly.

To prevent scheduling the next root task until all tasks generated by the current task are completed (DFS order), you can use Queue.join() :

 #!/usr/bin/env python3 import queue import random import threading import time def worker(q, multiplicity=5, maxlevel=3, lock=threading.Lock()): for task in iter(q.get, None): # blocking get until None is received try: if len(task) < maxlevel: for i in range(multiplicity): q.put(task + str(i)) # schedule the next level time.sleep(random.random()) # emulate some work with lock: print(task) finally: q.task_done() worker_count = 2 q = queue.LifoQueue() threads = [threading.Thread(target=worker, args=[q], daemon=True) for _ in range(worker_count)] for t in threads: t.start() for task in "01234": # populate the first level q.put(task) q.join() # block until all spawned tasks are done for _ in threads: # signal workers to quit q.put(None) for t in threads: # wait until workers exit t.join() 

Sample code is derived from the example in the queue module documentation .

A task at each level spawns multiplicity direct child tasks that spawn their own sub-tasks until maxlevel .

None used to alert workers that they should stop working. t.join() used to wait until threads become elegant. If the main thread is interrupted for any reason, then the daemon threads will be killed if there are no other threads not associated with daemons (you might want to provide a SIGINT hanlder to signal workers to gracefully exit Ctrl+C instead of just die).

queue.LifoQueue() used to get the "Last In First Out" order (it is approximate due to multiple threads).

maxsize not set, because otherwise the workers may get into a dead end - you still have to set a task. worker_count background threads run independently of the task queue.

0
source

Source: https://habr.com/ru/post/1011851/


All Articles