How to create a global connection with asyncio and redis

I am new to python 3 and asyncio starting with gevent and 2.7 ....

How to create a global connection that everyone will use for reids? For instance. I will have one process, for example. 10 asynchronous threads, but I don't want a separate connection per thread. Why? .. will have, for example, 100 cores with 10 threads per core and do not want many connections to be redis

import asyncio
import asyncio_redis

async def worker():
    while True:
        data = await connection.brpop(['queue'], timeout=0)
        print(data)
        res = blocking_code(data)
        await connection.set('test',res)

#Process raw data here and all code is blocking
def blocking_code(data):
    results = {}
    return results

if __name__ == '__main__':
    connection = asyncio_redis.Connection.create(host='127.0.0.1', port=6379, poolsize=2)
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(worker()), asyncio.ensure_future(worker())]
    loop.run_until_complete(asyncio.gather(*tasks))
    connection.close()


    Traceback (most recent call last):
      File "/Users//worker.py", line 14, in <module>
        loop.run_until_complete(example())
      File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
        return future.result()
      File "/Users//worker.py", line 7, in example
        data = yield from connection.brpop(['queue'], timeout=0)
    AttributeError: 'generator' object has no attribute 'brpop'

So, in the above example, I have two tasks, but I want only 1 redis connection

+4
source share
1 answer

10 asynchronous threads

- asyncio . Concurrency -.

?

asyncio_redis.Connection.create - , , yield from, :

connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)

, , , asyncio. , . asyncio_redis , :

import asyncio
import asyncio_redis


@asyncio.coroutine
def main():
    connection = yield from asyncio_redis.Pool.create(host='127.0.0.1', port=6379, poolsize=10)
    try:
        # 3 requests running concurrently in single thread using connections from pool: 
        yield from asyncio.gather(
            connection.brpop(['queue:pixel'], timeout=0),
            connection.brpop(['queue:pixel'], timeout=0),
            connection.brpop(['queue:pixel'], timeout=0),
        )
    finally:
        connection.close()



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close() 

Python 3.5 +

Python 3.5+, .

Upd:

(, , ) : , asyncio. .

run_in_executor :

from concurrent.futures import ProcessPoolExecutor


executor = ProcessPoolExecutor(max_workers=10)  # use number of cores here


async def worker():
    while True:
        data = await connection.brpop(['queue'], timeout=0)
        print(data)

        # await blocking_code from separate process:
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(executor, blocking_code, data)
+1

Source: https://habr.com/ru/post/1683436/


All Articles