Well, since there is currently no answer, I am not doing it too badly. Although I'm still interested in what is actually going on behind the scenes to cause this problem, my most pressing questions are those that are listed in Update 2. Those who,
What is the difference between JoinableQueue and a Manager().Queue() (and when should you use one over the other?). And what's important, is it possible to replace one with another in this example?
In the following code, I have a simple process pool. Each process is passed a process queue ( pq ) to retrieve the data to be processed, and a return value queue ( rq ) to pass the return processing values ββback to the main thread. If I do not join the return value queue, this works, but as soon as I do this, for some reason, the processes are blocked from stopping. In both cases, the run processes are returned, so it does not put when the return queue is blocked, but in the second case the processes themselves do not end, so the program blocks when I join for the processes. Why should it be?
Update:
There seems to be something with the number of items in the queue.
On my machine, at least I can have up to 6570 items in line, and it really works, but no more than this and this deadlock.
This seems to work with Manager().Queue() .
Is this a limitation of JoinableQueue or just I donβt understand the difference between the two objects, I "I found that if I replace the return queue with Manager().Queue() , it works as expected. What are the differences between them, and when should you use one over the other?
The error does not occur if I consume from rq
Oop. There was an answer for a moment, and when I commented on it, he disappeared. In any case, one of the things that was mentioned has cast doubt on whether I am adding a consumer, this error is still happening. I tried this and the answer is no.
Another that he mentioned was a quote from multiprocessing documents as a possible key to the problem. Turning to JoinableQueue , he says:
... The semaphore used to count the number of unfinished tasks may result in an overflow exception.
import multiprocessing class _ProcSTOP: pass class Proc(multiprocessing.Process): def __init__(self, pq, rq): self._pq = pq self._rq = rq super().__init__() print('++', self.name) def run(self): dat = self._pq.get() while not dat is _ProcSTOP:
Example output without using the reverse queue:
++ Proc-1 ++ Proc-2 ++ Proc-3 ++ Proc-4 == Proc-4 == Proc-3 == Proc-1 ?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>] == Proc-2 ?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>] -- Proc-3 ?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>] -- Proc-2 ?? [<Proc(Proc-1, stopped)>] -- Proc-1 ** complete -- Proc-4
Example output using a return queue:
++ Proc-1 ++ Proc-2 ++ Proc-3 ++ Proc-4 == Proc-2 == Proc-4 == Proc-1 ?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>] == Proc-3 # here it hangs