There is a simple solution using gevent and Requests simple-requests
Use the Requests Session for a permanent HTTP connection. Since gevent makes Requests asynchronous, I think there is no need for a timeout in HTTP requests.
By default, requests.Session caches TCP connections ( pool_connections ) for 10 hosts and limits 10 concurrent HTTP requests to cached TCP connections ( pool_maxsize ). The default configuration should be changed as needed by explicitly creating the http adapter.
session = requests.Session() http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) session.mount('http://', http_adapter)
Abort tasks as a producer-consumer. Downloading images is the task of the manufacturer, and image processing is a consumer task.
If the PIL image processing library is not asynchronous, it may block manufacturers coroutines. If so, the user pool could be gevent.threadpool.ThreadPool . FE
from gevent.threadpool import ThreadPool consumer = ThreadPool(POOL_SIZE)
This is an overview of how this can be done. I have not tested the code.
from gevent import monkey; monkey.patch_all() from time import time import requests from PIL import Image from io import BytesIO import os from urlparse import urlparse from gevent.pool import Pool def download(url): try: response = session.get(url) except Exception as e: print(e) else: if response.status_code == requests.codes.ok: file_name = urlparse(url).path.rsplit('/',1)[-1] return (response.content,file_name) response.raise_for_status() def process(img): if img is None: return None img, name = img img = Image.open(BytesIO(img)) path = os.path.join(base_folder, name) try: img.save(path) except Exception as e: print(e) else: return True def run(urls): consumer.map(process, producer.imap_unordered(download, urls)) if __name__ == '__main__': POOL_SIZE = 300 producer = Pool(POOL_SIZE) consumer = Pool(POOL_SIZE) session = requests.Session() http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) session.mount('http://', http_adapter) test_urls =
source share