Aiohttp ClientSession out of coroutine

I have a REST API wrapper that should run in an interactive Python session. HTTP requests are made both through an automated background thread (which uses the API wrapper) and manually by the end user through an interactive session. I'm trying to transfer all HTTP request management to asyncio from the old “new thread to request” approach, but since I cannot run the asyncio loop in the main thread (it should be free for special Python commands / requests), I wrote the following to run him in the background thread:

import aiohttp import asyncio from concurrent.futures import ThreadPoolExecutor def start_thread_loop(pool=None): """Starts thread with running loop, bounding the loop to the thread""" def init_loop(loop): asyncio.set_event_loop(loop) # bound loop to thread loop.run_forever() _pool = ThreadPoolExecutor() if pool is None else pool loop = asyncio.new_event_loop() future = _pool.submit(init_loop, loop) return future, loop def send_to_loop(coro, loop): """Wraps couroutine in Task object and sends it to given loop""" return asyncio.run_coroutine_threadsafe(coro, loop=loop) 

The actual API shell looks something like this:

 class Foo: def __init__(self): _, self.loop = start_thread_loop() self.session = aiohttp.ClientSession(loop=self.loop) self.loop.set_debug(True) def send_url(self, url): async def _request(url): print('sending request') async with self.session.get(url) as resp: print(resp.status) return send_to_loop(_request(url), self.loop) 

However, aiohttp strongly recommends that you do not do ClientSession outside the coroutine and enable asyncio debugging asyncio before initiating ClientSession raises a RuntimeError . So I tried to make a slightly different version using asycio.Queue to avoid creating a ClientSession inside the coroutine:

 class Bar: def __init__(self): _, self.loop = start_thread_loop() self.q = asyncio.Queue(loop=self.loop) self.status = send_to_loop(self.main(), loop=self.loop) async def main(self): async with aiohttp.ClientSession(loop=self.loop) as session: while True: url = await self.q.get() print('sending request') asyncio.ensure_future(self._process_url(url, session), loop=self.loop) def send_url(self, url): send_to_loop(self.q.put(url), loop=self.loop) @staticmethod async def _process_url(url, session): async with session.get(url) as resp: print(resp.status) 

However, this approach is more complicated / verbose, and I really don't understand if this is really necessary.

Questions:

  • Why is there a problem starting ClientSession outside the coprocessor?
  • Is the queuing order more secure / secure? If so, why?
  • Is there a problem in my approach for starting a loop inside a background thread?
+5
source share
1 answer

Why is there a problem with starting ClientSession outside the coroutine?

The way aiohttp is created should theoretically be possible to initialize some kind of client session outside the loop, i.e. outside coroutine, but that’s not how aiohttp is created. AFAIU in the problem that introduced this warning , because a) it is difficult to verify b) it is error prone

Is the / safer queue mode improved? If so, why?

I don’t understand what you are trying to achieve, so I’m not sure how to answer. Perhaps the problem is that you are trying to initialize the ClientSession inside the constructor. __init__ another class. In this case, you should work around this problem by creating a helper method, which is a coroutine that will complete the initialization of the class. This is a well-known pattern when working with asynchronous code.

Is there a problem in my approach for starting a loop inside a background thread?

This is normal.

+1
source

Source: https://habr.com/ru/post/1266102/


All Articles