RuntimeError: there is no current event loop in the stream in async + apscheduler

I have an async function and you need to run apscheduller every N minutes. Below is python code

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a url, using specified ClientSession."""
    async with session.get(url) as response:
        resp = await response.read()
        print(resp)

if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

But when I tried to start it, I got the following error information

Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.

Could you help me with this? Python 3.6, APScheduler 3.3.1,

+20
source share
5 answers

Just pass fetch_allin scheduler.add_job()directly. The asyncio scheduler supports coroutine functions as targets.

If the target call is not an accompanying function, it will be launched in the workflow (due to historical reasons), therefore, an exception.

+5
source

def demo_async(urls) :

loop = asyncio.get_event_loop()

:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
+47

, , - , . , , , .

get_event_loop BaseDefaultEventLoopPolicy:

class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    ...

    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

, self.set_event_loop(self.new_event_loop()) :

  • self._local._loop is None - _local._loop
  • not self._local._set_called - set_event_loop
  • isinstance(threading.current_thread(), threading._MainThread) - ( )

, :

if self._local._loop is None:
    raise RuntimeError('There is no current event loop in thread %r.'
                       % threading.current_thread().name)
+3

asyncio.run() . .

"":

if events._get_running_loop() is not None:
    raise RuntimeError(
        "asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()
0

, .

RuntimeError: There is no current event loop in thread 'Thread-X'. - Bleak.


: , .

, :

class BLE:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    # function example, improvement of
    # https://github.com/hbldh/bleak/blob/master/examples/discover.py :
    def list_bluetooth_low_energy(self) -> list:
        async def run() -> list:
            BLElist = []
            devices = await bleak.discover()
            for d in devices:
                BLElist.append(d.name)
            return 'success', BLElist
        return self.loop.run_until_complete(run())

:

ble = path.to.lib.BLE()
list = ble.list_bluetooth_low_energy()

:

. , , import , :

import asyncio, platform
from bleak import discover

def listBLE() -> dict:
    async def run() -> dict:
        # my code that keep throwing exceptions.

    loop = asyncio.get_event_loop()
    ble_list = loop.run_until_complete(run())
    return ble_list

, - , , get_event_loop():

loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop()

, .

But does not answer. And my code relied on a timeout to return some values, so that was pretty bad for my application.

It took me almost two hours to figure out what the problem was import, and here is my (working) code:

def list() -> dict:
    import asyncio, platform
    from bleak import discover

    async def run() -> dict:
        # my code running perfectly

    loop = asyncio.get_event_loop()
    ble_list  = loop.run_until_complete(run())
    return ble_list
0
source

Source: https://habr.com/ru/post/1687427/


All Articles