The subprocess ends, but still does not end, causing a dead end

Well, since there is currently no answer, I am not doing it too badly. Although I'm still interested in what is actually going on behind the scenes to cause this problem, my most pressing questions are those that are listed in Update 2. Those who,

What is the difference between JoinableQueue and a Manager().Queue() (and when should you use one over the other?). And what's important, is it possible to replace one with another in this example?


In the following code, I have a simple process pool. Each process is passed a process queue ( pq ) to retrieve the data to be processed, and a return value queue ( rq ) to pass the return processing values ​​back to the main thread. If I do not join the return value queue, this works, but as soon as I do this, for some reason, the processes are blocked from stopping. In both cases, the run processes are returned, so it does not put when the return queue is blocked, but in the second case the processes themselves do not end, so the program blocks when I join for the processes. Why should it be?

Update:

  • There seems to be something with the number of items in the queue.

    On my machine, at least I can have up to 6570 items in line, and it really works, but no more than this and this deadlock.

  • This seems to work with Manager().Queue() .

    Is this a limitation of JoinableQueue or just I don’t understand the difference between the two objects, I "I found that if I replace the return queue with Manager().Queue() , it works as expected. What are the differences between them, and when should you use one over the other?

  • The error does not occur if I consume from rq

    Oop. There was an answer for a moment, and when I commented on it, he disappeared. In any case, one of the things that was mentioned has cast doubt on whether I am adding a consumer, this error is still happening. I tried this and the answer is no.

    Another that he mentioned was a quote from multiprocessing documents as a possible key to the problem. Turning to JoinableQueue , he says:

    ... The semaphore used to count the number of unfinished tasks may result in an overflow exception.


 import multiprocessing class _ProcSTOP: pass class Proc(multiprocessing.Process): def __init__(self, pq, rq): self._pq = pq self._rq = rq super().__init__() print('++', self.name) def run(self): dat = self._pq.get() while not dat is _ProcSTOP: # self._rq.put(dat) # uncomment me for deadlock self._pq.task_done() dat = self._pq.get() self._pq.task_done() print('==', self.name) def __del__(self): print('--', self.name) if __name__ == '__main__': pq = multiprocessing.JoinableQueue() rq = multiprocessing.JoinableQueue() pool = [] for i in range(4): p = Proc(pq, rq) p.start() pool.append(p) for i in range(10000): pq.put(i) pq.join() for i in range(4): pq.put(_ProcSTOP) pq.join() while len(pool) > 0: print('??', pool) pool.pop().join() # hangs here (if using rq) print('** complete') 

Example output without using the reverse queue:

 ++ Proc-1 ++ Proc-2 ++ Proc-3 ++ Proc-4 == Proc-4 == Proc-3 == Proc-1 ?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>] == Proc-2 ?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>] -- Proc-3 ?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>] -- Proc-2 ?? [<Proc(Proc-1, stopped)>] -- Proc-1 ** complete -- Proc-4 

Example output using a return queue:

 ++ Proc-1 ++ Proc-2 ++ Proc-3 ++ Proc-4 == Proc-2 == Proc-4 == Proc-1 ?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>] == Proc-3 # here it hangs 
+6
source share
1 answer

From the documentation :

Warning

As mentioned above, if the child process has queued items (and it did not use JoinableQueue.cancel_join_thread ()), then this process will not end until all buffered items are flushed to the pipe.

This means that if you try to join this process, you can get a dead end if you are not sure that all the items that were placed in the queue were destroyed. Similarly, if the child process is not demonic, then the parent process may freeze on exit when it tries to connect all its non-demonic children.

Please note that the queue created using the dispatcher does not have this problem. See the programming guide.

Thus, JoinableQueue () uses the channel and will wait until it can clear all data before closing.

On the other hand, the Manager.Queue () object takes a completely different approach. Managers start a separate process, which immediately receives all the data (and stores it in their memory).

Managers provide a way to create data that can be used for different processes. An object manager manages a server process that manages shared objects. Other processes can access shared objects using a proxy.

...

Queue ([MAXSIZE]) Create a shared Queue.Queue object and return the proxy to it.

0
source

Source: https://habr.com/ru/post/900876/


All Articles