Python dynamic multiprocessing and signaling system

I have a python setup multiprocessing(i.e. workflows) with custom signal processing, which prevents the worker from using cleanly multiprocessing. (See details of the problem below.)

Customization

The master class , which generates all workflows, looks as follows (some parts are divided only into important parts).

Here he re-links his own signalprint-only Master teardown; actually received signals propagate through the process tree and must be processed by the workers themselves. This is achieved by re-linking the signals after the appearance of workers.

class Midlayer(object):
    def __init__(self, nprocs=2):
        self.nprocs = nprocs
        self.procs = []

    def handle_signal(self, signum, frame):
        log.info('Master teardown')
        for p in self.procs:
            p.join()
        sys.exit()

    def start(self):
        # Start desired number of workers
        for _ in range(nprocs):
            p = Worker()
            self.procs.append(p)
            p.start()

        # Bind signals for master AFTER workers have been spawned and started
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

        # Serve forever, only exit on signals
        for p in self.procs:
            p.join()

multiprocessing.Process run() -.

. : SIGINT SIGTERM. ; , , ( quit_req True).

class Worker(Process):
    def __init__(self):
        self.quit_req = False
        Process.__init__(self)

    def handle_signal(self, signum, frame):
        print('Stopping worker (pid: {})'.format(self.pid))
        self.quit_req = True

    def run(self):
        # Set signals for worker process
        signal.signal(signal.SIGINT, self.handle_signal)
        signal.signal(signal.SIGTERM, self.handle_signal)

        q = connect_to_some_distributed_message_queue()

        # Start consuming
        print('Starting worker (pid: {})'.format(self.pid))
        while not self.quit_req:
            message = q.poll()
            if len(message):
                try:
                    print('{} handling message "{}"'.format(
                        self.pid, message)
                    )
                    # Facade pattern: Pick the correct target function for the
                    # requested message and execute it.
                    MessageRouter.route(message)
                except Exception as e:
                    print('{} failed handling "{}": {}'.format(
                        self.pid, message, e.message)
                    )

, () :

  • -
  • ,
  • ( MessageRouter)

: ( message MessageRouter) -, .

, , - :

nproc = 4
# Spawn a pool, because we have expensive calculation here
p = Pool(processes=nproc)
# Collect result proxy objects for async apply calls to 'some_expensive_calculation'
rpx = [p.apply_async(some_expensive_calculation, ()) for _ in range(nproc)]
# Collect results from all processes
res = [rpx.get(timeout=.5) for r in rpx]
# Print all results
print(res)

, Pool, SIGINT SIGTERM handle_signal (- ), Stopping worker (pid: ...) . , - , , .

: , , ( ), , ( ) . , multiprocessing .

, ( , ) , .

- ? - ? , - , !

+4
3

, . , ( Python, C) .

.

, (HTTP, Pipe, Queb Queue..) . , KeyboardInterrupt .

try:
    while 1:
        task = get_next_task()
        service.process(task)
except KeyboardInterrupt:
    service.wait_for_pending_tasks()
    logging.info("Sayonara!")

multiprocessing.Pool concurrent.futures.ProcessPoolExecutor. , -, billiard, pebble.

SIGINT, . SIGTERM .

systemd, supervisord. , SIGINT (CTL + C).

SIGTERM , SIGKILL. SIGKILL , .

" , "

, , Luigi Celery.

, . , - .

, - , , .

+2

, Python 3 set_start_method(method) 'forkserver'. Python 3 > Python 2!

"this" :

  • , .
  • , ...
  • , .

Ctrl-C:

  • .
  • (, stop , , , , ), .
  • .

, , , , - - run() -.

:

forkserver, . , , . , os.fork(). .

Unix, Unix-.

, " " , .

:

from multiprocessing import Process, set_start_method
import sys
from signal import signal, SIGINT
from time import sleep


class NormalWorker(Process):

    def run(self):
        while True:
            print('%d %s work' % (self.pid, type(self).__name__))
            sleep(1)


class SpawningWorker(Process):

    def handle_signal(self, signum, frame):
        print('%d %s handling signal %r' % (
            self.pid, type(self).__name__, signum))

    def run(self):

        signal(SIGINT, self.handle_signal)
        sub = NormalWorker()
        sub.start()
        print('%d joining %d' % (self.pid, sub.pid))
        sub.join()
        print('%d %s joined sub worker' % (self.pid, type(self).__name__))


def main():
    set_start_method('forkserver')

    processes = [SpawningWorker() for ii in range(5)]

    for pp in processes:
        pp.start()

    def sig_handler(signum, frame):
        print('main handling signal %d' % signum)
        for pp in processes:
            pp.join()
        print('main out')
        sys.exit()

    signal(SIGINT, sig_handler)

    while True:
        sleep(1.0)

if __name__ == '__main__':
    main()
+2

python 3, , , python 2, python 3. Windows, ...

multiprocessing os.fork() , reset :

import os
from signal import SIGINT, SIG_DFL

def patch_fork():

    print('Patching fork')
    os_fork = os.fork

    def my_fork():
        print('Fork fork fork')
        cpid = os_fork()
        if cpid == 0:
            # child
            signal(SIGINT, SIG_DFL)
        return cpid

    os.fork = my_fork

Worker ( ), , .

, , , , , python.

+2

Source: https://habr.com/ru/post/1661157/


All Articles