I am using an asynchronous iterator that will be used with async for
, which should return a new value in the (mostly) regular interval.
We can illustrate such an iterator with a simple clock that will increment the counter every ~ n seconds:
import asyncio
class Clock(object):
def __init__(self, interval=1):
self.counter = 0
self.interval = interval
self.tick = asyncio.Event()
asyncio.ensure_future(self.tick_tock())
async def tick_tock(self):
while True:
self.tick.clear()
await asyncio.sleep(self.interval)
self.counter = self.__next__()
self.tick.set()
def __next__(self):
self.counter += 1
return self.counter
def __aiter__(self):
return self
async def __anext__(self):
await self.tick.wait()
return self.counter
Is there a better or cleaner approach than using asyncio.Event
? There will be async for
more than one coroutine on this iterator .
source
share