Multiprocessing. Sequential deadlocks after the death of the "reader"

I played with a multiprocessor package and noticed that the queue can be locked for reading when:

  • The reader process uses get with a timeout> 0:

    self.queue.get(timeout=3)
    
  • the "reader" dies, and get is blocked due to a timeout.

After this, the queue is blocked forever.

An application showing the problem

I create two child processes: Worker (puts in the queue) and Recipient (receive from the queue). Also, the parent process periodically checks whether his children live and, if necessary, launches a new child.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
import procname
import time

class Receiver(multiprocessing.Process):
    ''' Reads from queue with 3 secs timeout '''

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Receiver')
        while True:
            try:
                msg = self.queue.get(timeout=3)
                print '<<< `{}`, queue rlock: {}'.format(
                    msg, self.queue._rlock)
            except multiprocessing.queues.Empty:
                print '<<< EMPTY, Queue rlock: {}'.format(
                    self.queue._rlock)
                pass


class Worker(multiprocessing.Process):
    ''' Puts into queue with 1 sec sleep '''

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Worker')
        while True:
            time.sleep(1)
            print 'Worker: putting msg, Queue size: ~{}'.format(
                self.queue.qsize())
            self.queue.put('msg from Worker')


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    worker = Worker(queue)
    worker.start()

    receiver = Receiver(queue)
    receiver.start()

    while True:
        time.sleep(1)
        if not worker.is_alive():
            print 'Restarting worker'
            worker = Worker(queue)
            worker.start()
        if not receiver.is_alive():
            print 'Restarting receiver'
            receiver = Receiver(queue)
            receiver.start()

What does the process tree look like in ps

bash
 \_ python queuetest.py
     \_ Worker
     \_ Receiver

Console exit

$ python queuetest.py
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Worker: putting msg, Queue size: ~0
<<< `msg from Worker`, queue rlock: <Lock(owner=None)>
Restarting receiver                        <-- killed Receiver with SIGTERM
Worker: putting msg, Queue size: ~0
Worker: putting msg, Queue size: ~1
Worker: putting msg, Queue size: ~2
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~3
Worker: putting msg, Queue size: ~4
Worker: putting msg, Queue size: ~5
<<< EMPTY, Queue rlock: <Lock(owner=SomeOtherProcess)>
Worker: putting msg, Queue size: ~6
Worker: putting msg, Queue size: ~7

? get_nowait , " ".

$ uname -sr
Linux 3.11.8-200.fc19.x86_64

$ python -V
Python 2.7.5

In [3]: multiprocessing.__version__
Out[3]: '0.70a1'

" "

- Receiver:

class Receiver(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        procname.setprocname('Receiver')
        while True:
            time.sleep(1)
            while True:
                try:
                    msg = self.queue.get_nowait()
                    print '<<< `{}`, queue rlock: {}'.format(
                        msg, self.queue._rlock)
                except multiprocessing.queues.Empty:
                    print '<<< EMPTY, Queue rlock: {}'.format(
                        self.queue._rlock)
                    break

.

+4
1

, , * not_empty.release() * Queue.get() ( ). TERM ?

+2

Source: https://habr.com/ru/post/1523683/


All Articles