Cannot sort companion objects when ProcessPoolExecutor is used in a class

I am trying to get asyncio working with subprocesses and restrictions. I am doing this in a functional way, but when I tried to implement the same opp-style logic, several problems appeared. Basically, it is not possible to sort the coprocessor / generator errors. I followed some of them, but not all

import asyncio from concurrent.futures import ProcessPoolExecutor from itertools import islice from random import randint class async_runner(object): def __init__(self): self.futures = [] # container to store current futures self.futures_total = [] self.loop = asyncio.get_event_loop() # main event_loop self.executor = ProcessPoolExecutor() self.limit = 1 def run(self, func, *args): temp_loop = asyncio.new_event_loop() try: coro = func(*args) asyncio.set_event_loop(temp_loop) ret = temp_loop.run_until_complete(coro) return ret finally: temp_loop.close() def limit_futures(self, futures, limit): self.futures_total = iter(futures) self.futures = [future for future in islice(self.futures_total,0,limit)] async def first_to_finish(): while True: await asyncio.sleep(0) for f in self.futures: if f.done(): # here raised TypeError: can't pickle coroutine objects print(f.done()) self.futures.remove(f) try: #newf = next(self.futures_total) #self.futures.append(newf) print(f.done()) except StopIteration as e: pass return f.result() while len(self.futures) > 0: yield first_to_finish() async def run_limited(self, func, args, limit): self.limit = int(limit) self.futures_total = (self.loop.run_in_executor(self.executor, self.run, func, x) for x in range(110000,119990)) for ret in self.limit_futures(self.futures_total, 4): # limitation - 4 per all processes await ret def set_execution(self, func, args, limit): ret = self.loop.run_until_complete(self.run_limited(func, args, limit)) return ret async def asy(x): print('enter: ', x) await asyncio.sleep(randint(1,3)) print('finishing ', x) return x runner = async_runner() ret = runner.set_execution(asy,urls,2) print(ret) 

But this works great:

 import asyncio from concurrent.futures import ProcessPoolExecutor from itertools import islice import time async def asy(x): print('enter: ', x) await asyncio.sleep(1) print('finishing ', x) return x def run(corofn, *args): loop = asyncio.new_event_loop() try: coro = corofn(*args) asyncio.set_event_loop(loop) ret = loop.run_until_complete(coro) #print(ret) return ret finally: loop.close() def limit_futures(futures, limit): futures_sl = [ c for c in islice(futures, 0, limit) ] print(len(futures_sl)) async def first_to_finish(futures): while True: await asyncio.sleep(0) for f in futures_sl: if f.done(): futures_sl.remove(f) try: newf = next(futures) futures_sl.append(newf) except StopIteration as e: pass return f.result() while len(futures_sl) > 0: yield first_to_finish(futures) async def main(): loop = asyncio.get_event_loop() executor = ProcessPoolExecutor() futures = (loop.run_in_executor(executor, run, asy, x) for x in range(110000,119990)) ''' CASE balls to the wall! await asyncio.gather(*futures) ''' for ret in limit_futures(futures, 4): # limitation - 4 per all processes await ret if __name__ == '__main__': start = time.time() ''' # CASE single ret = [asy(x) for x in range(510000,510040)] exit() ''' loop = asyncio.get_event_loop() loop.run_until_complete(main()) print("Elapsed time: {:.3f} sec".format(time.time() - start)) 

I can’t understand why the multiprocessor module tries to reveal something only when objects are used, but not in any scenario

+5
source share

Source: https://habr.com/ru/post/1273870/


All Articles