If you don't have an asynchronous imap library based on I / O, you can simply use concurrent.futures.ThreadPoolExecutor for I / O in threads. Python will issue the GIL during I / O, so you get true concurrency:
def init_connection(d): username = d['usern'] password = d['passw'] connection = imaplib.IMAP4_SSL('imap.bar.de') connection.login(username, password) connection.select() return connection local = threading.local() # We use this to get a different connection per thread def do_fetch(num, d, rfc): try: connection = local.connection except AttributeError: connnection = local.connection = init_connection(d) return connnection.fetch(num, rfc) @asyncio.coroutine def get_attachment(d, pool): connection = init_connection(d) # list all available mails typ, data = connection.search(None, 'ALL') # Kick off asynchronous tasks for all the fetches loop = asyncio.get_event_loop() futs = [asyncio.async(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)')) for num in data[0].split()] # Process each fetch as it completes for fut in asyncio.as_completed(futs): typ, data = yield from fut raw_string = data[0][1].decode('utf-8') msg = email.message_from_string(raw_string) for part in msg.walk(): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: continue if part.get_filename(): body = part.get_payload(decode=True) # do something with the body, async? connection.close() connection.logout() loop = asyncio.get_event_loop() pool = ThreadPoolExecutor(max_workers=5) # You can probably increase max_workers, because the threads are almost exclusively doing I/O. loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data])) loop.close()
This is not as good as a truly asynchronous I / O solution because you still have the overhead of creating threads, which limits scalability and increases the extra overhead. You also get some GIL reduction because all the code wraps the actual I / O calls. However, if you are dealing with less than a thousand letters, it should still work fine.
We use run_in_executor to use ThreadPoolExecutor as part of the asyncio, asyncio.async event loop to wrap the coroutine object returned in asyncio.Future , and as_completed to as_completed through futures in the order in which they end.
Edit
It seems imaplib not thread safe. I edited my answer to use thread-local storage via threading.local , which allows us to create one connection object in the thread that can be reused for the whole life of the thread (this means that you only create num_workers communication num_workers , not a new one connection for each fetch ).
source share