Asyncio imap extracts python3 letters

I am testing the asyncio module, but I need a hint / recommendation on how to receive large emails asynchronously.

I have a list with usernames and passwords for mail accounts.

data = [ {'usern': ' foo@bar.de ', 'passw': 'x'}, {'usern': ' foo2@bar.de ', 'passw': 'y'}, {'usern': ' foo3@bar.de ', 'passw': 'z'} (...) ] 

I'm thinking of:

 loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait([get_attachment(d) for d in data])) loop.close() 

However, the long part is to download email attachments.

Email:

 @asyncio.coroutine def get_attachment(d): username = d['usern'] password = d['passw'] connection = imaplib.IMAP4_SSL('imap.bar.de') connection.login(username, password) connection.select() # list all available mails typ, data = connection.search(None, 'ALL') for num in data[0].split(): # fetching each mail typ, data = connection.fetch(num, '(RFC822)') raw_string = data[0][1].decode('utf-8') msg = email.message_from_string(raw_string) for part in msg.walk(): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: continue if part.get_filename(): body = part.get_payload(decode=True) # do something with the body, async? connection.close() connection.logout() 

How can I process all emails (upload attachments) asynchronously?

+6
source share
2 answers

If you don't have an asynchronous imap library based on I / O, you can simply use concurrent.futures.ThreadPoolExecutor for I / O in threads. Python will issue the GIL during I / O, so you get true concurrency:

 def init_connection(d): username = d['usern'] password = d['passw'] connection = imaplib.IMAP4_SSL('imap.bar.de') connection.login(username, password) connection.select() return connection local = threading.local() # We use this to get a different connection per thread def do_fetch(num, d, rfc): try: connection = local.connection except AttributeError: connnection = local.connection = init_connection(d) return connnection.fetch(num, rfc) @asyncio.coroutine def get_attachment(d, pool): connection = init_connection(d) # list all available mails typ, data = connection.search(None, 'ALL') # Kick off asynchronous tasks for all the fetches loop = asyncio.get_event_loop() futs = [asyncio.async(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)')) for num in data[0].split()] # Process each fetch as it completes for fut in asyncio.as_completed(futs): typ, data = yield from fut raw_string = data[0][1].decode('utf-8') msg = email.message_from_string(raw_string) for part in msg.walk(): if part.get_content_maintype() == 'multipart': continue if part.get('Content-Disposition') is None: continue if part.get_filename(): body = part.get_payload(decode=True) # do something with the body, async? connection.close() connection.logout() loop = asyncio.get_event_loop() pool = ThreadPoolExecutor(max_workers=5) # You can probably increase max_workers, because the threads are almost exclusively doing I/O. loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data])) loop.close() 

This is not as good as a truly asynchronous I / O solution because you still have the overhead of creating threads, which limits scalability and increases the extra overhead. You also get some GIL reduction because all the code wraps the actual I / O calls. However, if you are dealing with less than a thousand letters, it should still work fine.

We use run_in_executor to use ThreadPoolExecutor as part of the asyncio, asyncio.async event loop to wrap the coroutine object returned in asyncio.Future , and as_completed to as_completed through futures in the order in which they end.

Edit

It seems imaplib not thread safe. I edited my answer to use thread-local storage via threading.local , which allows us to create one connection object in the thread that can be reused for the whole life of the thread (this means that you only create num_workers communication num_workers , not a new one connection for each fetch ).

+6
source

I had the same needs: receiving emails with python 3 is completely async. If I'm interested in others, I clicked here asyncio IMAP lib: https://github.com/bamthomas/aioimaplib

You can use it as follows:

 import asyncio from aioimaplib import aioimaplib @asyncio.coroutine def wait_for_new_message(host, user, password): imap_client = aioimaplib.IMAP4(host=host) yield from imap_client.wait_hello_from_server() yield from imap_client.login(user, password) yield from imap_client.select() asyncio.async(imap_client.idle()) id = 0 while True: msg = yield from imap_client.wait_server_push() print('--> received from server: %s' % msg) if 'EXISTS' in msg: id = msg.split()[0] imap_client.idle_done() break result, data = yield from imap_client.fetch(id, '(RFC822)') email_message = email.message_from_bytes(data[0]) attachments = [] body = '' for part in email_message.walk(): if part.get_content_maintype() == 'multipart': continue if part.get_content_maintype() == 'text' and 'attachment' not in part.get('Content-Disposition', ''): body = part.get_payload(decode=True).decode(part.get_param('charset', 'ascii')).strip() else: attachments.append( {'type': part.get_content_type(), 'filename': part.get_filename(), 'size': len(part.as_bytes())}) print('attachments : %s' % attachments) print('body : %s' % body) yield from imap_client.logout() if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(wait_for_new_message('my.imap.server', 'user', 'pass')) 

Large emails with attachments are also downloaded using asyncio.

+1
source

Source: https://habr.com/ru/post/973100/


All Articles