Sharing the async wait of a complex coroutine object for multiprocessing

I know, as a rule, objects should not be divided between a multiprocess and problems that may arise from it. But my demand is that this must be done.

I have a complex object with all the good asynchronous coroutines expectations. A function that performs a lengthy process on this object in a separate, separate process. Now I want to start the IPython shell in the main process and work with this complex object while this lengthy process is running in another process.

To share this complex object through a process, I tried using the BaseManager multiprocessor approach that I came across on SO:

import multiprocessing import multiprocessing.managers as m class MyManager(m.BaseManager): pass MyManager.register('complex_asynio_based_class', complex_asynio_based_class) manager = MyManager() manager.start() c = manager.complex_asynio_based_class() process = multiprocessing.Process( target=long_running_process, args=(c,), ) 

but this gives an error:

 Unserializable message: Traceback (most recent call last): File "/usr/3.6/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client send(msg) File "/usr/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "/usr/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) TypeError: can't pickle coroutine objects 

It does not work because there are coroutines in the object. I can’t come up with a better solution to make it work, and I am stuck on it.

If it weren't for Python, I would have spawned a thread for a lengthy process and can still work on it.

If I'm not mistaken, this should be a common template for multiprocessor applications to start the background process and the main process, which simply performs some read-only operation on it, as in my case, and does not change it. I want to know how this is done in general?

How are complex objects that cannot be selected shared between multiple processes?

+5
source share
2 answers

The execution of accompanying messages cannot be automatically divided between processes, because a coroutine is executed inside a certain event loop in a process that belongs to the async class. There is a state in coroutine that cannot be selected, and even if it were possible, it would not make sense outside the context of its event loop.

What you can do is create an adapter based on the callback for your async class, with each coroutine method represented by a callback method with the semantics of “start making X and call this function upon completion”. If the callback is multiprocessor, these operations can be called from other processes. Then you can start the event loop in each process and create the look of the coroutine code on top of proxied calls based on the callback.

For example, consider the trivial async class:

 class Async: async def repeat(self, n, s): for i in range(n): print(s, i, os.getpid()) await asyncio.sleep(.2) return s 

The adapter with the callback can use the asyncio public API to convert the repeat coroutine into a classic asynchronous JavaScript-style callback hell function:

 class CallbackAdapter: def repeat_start(self, n, s, on_success): fut = asyncio.run_coroutine_threadsafe( self._async.repeat(n, s), self._loop) # Once the coroutine is done, notify the caller. fut.add_done_callback(lambda _f: on_success(fut.result())) 

(The conversion can be automated, the above hand-written code just shows the concept.)

CallbackAdapter can be registered using multiprocessing, so different processes can start the adapter method (and, therefore, the original asynchronous coroutine) using proxies that provide multiprocessing. It only requires that the callback be passed as on_success for multiprocessing.

As a last step, you could complete a full circle and create an asynchronous adapter for the callback API (!), Start an event loop in another process, and use asyncio and async def . This adapter class for the adapter will have a full-function repeat coroutine that Async.repeat original Async.repeat coroutine Async.repeat without even trying to sort the coroutine state.

The following is an example implementation of the above approach:

 import asyncio, multiprocessing.managers, threading, os class Async: # The async class we are bridging. This class is unaware of multiprocessing # or of any of the code that follows. async def repeat(self, n, s): for i in range(n): print(s, i, 'pid', os.getpid()) await asyncio.sleep(.2) return s def start_asyncio_thread(): # Since the manager controls the main thread, we have to spin up the event # loop in a dedicated thread and use asyncio.run_coroutine_threadsafe to # submit stuff to the loop. setup_done = threading.Event() loop = None def loop_thread(): nonlocal loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) setup_done.set() loop.run_forever() threading.Thread(target=loop_thread).start() setup_done.wait() return loop class CallbackAdapter: _loop = None # the callback adapter to the async class, also running in the # worker process def __init__(self, obj): self._async = obj if CallbackAdapter._loop is None: CallbackAdapter._loop = start_asyncio_thread() def repeat_start(self, n, s, on_success): # Submit a coroutine to the event loop and obtain a Task/Future. This # is normally done with loop.create_task, but repeat_start will be # called from the main thread, owned by the multiprocessng manager, # while the event loop will run in a separate thread. future = asyncio.run_coroutine_threadsafe( self._async.repeat(n, s), self._loop) # Once the coroutine is done, notify the caller. # We could propagate exceptions by accepting an additional on_error # callback, and nesting fut.result() in a try/except that decides # whether to call on_success or on_error. future.add_done_callback(lambda _f: on_success(future.result())) def remote_event_future(manager): # Return a function/future pair that can be used to locally monitor an # event in another process. # # The returned function and future have the following property: when the # function is invoked, possibly in another process, the future completes. # The function can be passed as a callback argument to a multiprocessing # proxy object and therefore invoked by a different process. loop = asyncio.get_event_loop() result_pipe = manager.Queue() future = loop.create_future() def _wait_for_remote(): result = result_pipe.get() loop.call_soon_threadsafe(future.set_result, result) t = threading.Thread(target=_wait_for_remote) t.start() return result_pipe.put, future class AsyncAdapter: # The async adapter for a callback-based API, eg the CallbackAdapter. # Designed to run in a different process and communicate to the callback # adapter via a multiprocessing proxy. def __init__(self, cb_proxy, manager): self._cb = cb_proxy self._manager = manager async def repeat(self, n, s): set_result, future = remote_event_future(self._manager) self._cb.repeat_start(n, s, set_result) return await future class CommManager(multiprocessing.managers.SyncManager): pass CommManager.register('Async', Async) CommManager.register('CallbackAdapter', CallbackAdapter) def get_manager(): manager = CommManager() manager.start() return manager def other_process(manager, cb_proxy): print('other_process (pid %d)' % os.getpid()) aadapt = AsyncAdapter(cb_proxy, manager) loop = asyncio.get_event_loop() # Create two coroutines printing different messages, and gather their # results. results = loop.run_until_complete(asyncio.gather( aadapt.repeat(3, 'message A'), aadapt.repeat(2, 'message B'))) print('coroutine results (pid %d): %s' % (os.getpid(), results)) print('other_process (pid %d) done' % os.getpid()) def start_other_process(loop, manager, async_proxy): cb_proxy = manager.CallbackAdapter(async_proxy) other = multiprocessing.Process(target=other_process, args=(manager, cb_proxy,)) other.start() return other def main(): loop = asyncio.get_event_loop() manager = get_manager() async_proxy = manager.Async() # Create two external processes that drive coroutines in our event loop. # Note that all messages are printed with the same PID. start_other_process(loop, manager, async_proxy) start_other_process(loop, manager, async_proxy) loop.run_forever() if __name__ == '__main__': main() 

The code works correctly on Python 3.5, but does not work on 3.6 and 3.7 due to an error in multiprocessing .

+3
source

I used the multiprocessor module and the asynchronous programming module for litte while.

You do not exchange objects between processes. You create an object (referent) in one process, return a proxy object and transfer it to another process. Another proxy process object used to call reference methods.

In your code, the referent is an instance of complex_asynio_based_class.

Here is the dumb code you can reference. The main thread is a single asyncio loop with a UDP server and other asynchronous operations. A lengthy process simply checks the state of the loop.

 import multiprocessing import multiprocessing.managers as m import asyncio import logging import time logging.basicConfig(filename="main.log", level=logging.DEBUG) class MyManager(m.BaseManager): pass class sinkServer(asyncio.Protocol): def connection_made(self, transport): self.transport = transport def datagram_received(self, data, addr): message = data.decode() logging.info('Data received: {!r}'.format(message)) class complex_asynio_based_class: def __init__(self, addr=('127.0.0.1', '8080')): self.loop = asyncio.new_event_loop() listen = self.loop.create_datagram_endpoint(sinkServer, local_addr=addr, reuse_address=True, reuse_port=True) self.loop.run_until_complete(listen) for name, delay in zip("abcdef", (1,2,3,4,5,6)): self.loop.run_until_complete(self.slow_op(name, delay)) def run(self): self.loop.run_forever() def stop(self): self.loop.stop() def is_running(self): return self.loop.is_running() async def slow_op(self, name, delay): logging.info("my name: {}".format(name)) asyncio.sleep(delay) def long_running_process(co): logging.debug('address: {!r}'.format(co)) logging.debug("status: {}".format(co.is_running())) time.sleep(6) logging.debug("status: {}".format(co.is_running())) MyManager.register('complex_asynio_based_class', complex_asynio_based_class) manager = MyManager() manager.start() c = manager.complex_asynio_based_class() process = multiprocessing.Process( target=long_running_process, args=(c,), ) process.start() c.run() #run the loop 
0
source

Source: https://habr.com/ru/post/1275094/


All Articles