Strange Queue.PriorityQueue Behavior with Multiprocessing in Python 2.7.6

As you know from the name, I am trying to use PriorityQueue with multiprocessing. More precisely, I wanted to make a general PriorityQueue, wrote the code, and it does not work as I expected.

Take a look at the code:

import time from multiprocessing import Process, Lock from Queue import PriorityQueue def worker(queue): lock = Lock() with lock: for i in range(100): queue.put(i) print "worker", queue.qsize() pr_queue = PriorityQueue() worker_process = Process(target = worker, args = (pr_queue,)) worker_process.start() time.sleep(5) # nope, race condition, you shall not pass (probably) print "main", pr_queue.qsize() 

Gets the following output:

 worker 100 main 0 

What is happening and how to do what I want correctly? Thanks.

+6
source share
1 answer

The problem is not that in this case it cannot be foiled - if you use a Unix-like platform, the queue can be passed on to the child without etching. (On Windows, I think you get an etching error here). The root problem is that you are not using a process safe queue. The only queues that can be used between processes are Queue objects that live inside the multiprocessing module. Unfortunately, there is no PriorityQueue implementation. However, you can easily create it by registering PriorityQueue with the multiprocessing.Manager class, for example:

 import time from multiprocessing import Process from multiprocessing.managers import SyncManager from Queue import PriorityQueue class MyManager(SyncManager): pass MyManager.register("PriorityQueue", PriorityQueue) # Register a shared PriorityQueue def Manager(): m = MyManager() m.start() return m def worker(queue): print(queue) for i in range(100): queue.put(i) print "worker", queue.qsize() m = Manager() pr_queue = m.PriorityQueue() # This is process-safe worker_process = Process(target = worker, args = (pr_queue,)) worker_process.start() time.sleep(5) # nope, race condition, you shall not pass (probably) print "main", pr_queue.qsize() 

Output:

 worker 100 main 100 

Note that this probably won't work as well as if it were a standard subclass of multiprocessing.Queue . Manager -based Manager is implemented by creating a Manager server process that actually contains a regular PriorityQueue , and then provides your core and Proxy workflows that use IPC to read / write to the queue in the server process. The usual multiprocessing.Queue just writing / reading data to / from Pipe . If this is a concern, you can try to implement your own multiprocessing.PriorityQueue by subclassing or delegating from multiprocessing.Queue . However, this may not be worth the effort.

+9
source

Source: https://habr.com/ru/post/973859/


All Articles