Uploading images using gevent

My task is to load 1M + images from a specific list of URLs. What is the recommended way to do this?

After reading Greenlet Vs. Themes I looked at gevent , but I can’t run it reliably. I played with a test set of 100 URLs, and sometimes it ends in 1.5 seconds, but sometimes it takes more than 30 seconds, which is strange since the timeout * for each request is 0.1, so it should not take more than 10 seconds .

* see below in code

I also looked at grequests , but they seem to have problems with exception handling.

My "requirements" are that I can

  • check errors that occur during loading (timeouts, corrupted images ...),
  • track the number of processed images and
  • be as fast as possible.
 from gevent import monkey; monkey.patch_all() from time import time import requests from PIL import Image import cStringIO import gevent.hub POOL_SIZE = 300 def download_image_wrapper(task): return download_image(task[0], task[1]) def download_image(image_url, download_path): raw_binary_request = requests.get(image_url, timeout=0.1).content image = Image.open(cStringIO.StringIO(raw_binary_request)) image.save(download_path) def download_images_gevent_spawn(list_of_image_urls, base_folder): download_paths = ['/'.join([base_folder, url.split('/')[-1]]) for url in list_of_image_urls] parameters = [[image_url, download_path] for image_url, download_path in zip(list_of_image_urls, download_paths)] tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters] for task in tasks: try: task.get() except Exception: print 'x', continue print '.', test_urls = # list of 100 urls t1 = time() download_images_gevent_spawn(test_urls, 'download_temp') print time() - t1 
+5
source share
3 answers

I think it would be better to stick with urllib2, for example https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1

Try this code, I suppose, this is what you are asking for.

 import gevent from gevent import monkey # patches stdlib (including socket and ssl modules) to cooperate with other greenlets monkey.patch_all() import sys urls = sorted(chloya_files) if sys.version_info[0] == 3: from urllib.request import urlopen else: from urllib2 import urlopen def download_file(url): data = urlopen(url).read() img_name = url.split('/')[-1] with open('c:/temp/img/'+img_name, 'wb') as f: f.write(data) return True from time import time t1 = time() tasks = [gevent.spawn(download_file, url) for url in urls] gevent.joinall(tasks, timeout = 12.0) print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks)) print time() - t1 
+1
source

There is a simple solution using gevent and Requests simple-requests

Use the Requests Session for a permanent HTTP connection. Since gevent makes Requests asynchronous, I think there is no need for a timeout in HTTP requests.

By default, requests.Session caches TCP connections ( pool_connections ) for 10 hosts and limits 10 concurrent HTTP requests to cached TCP connections ( pool_maxsize ). The default configuration should be changed as needed by explicitly creating the http adapter.

 session = requests.Session() http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) session.mount('http://', http_adapter) 

Abort tasks as a producer-consumer. Downloading images is the task of the manufacturer, and image processing is a consumer task.

If the PIL image processing library is not asynchronous, it may block manufacturers coroutines. If so, the user pool could be gevent.threadpool.ThreadPool . FE

 from gevent.threadpool import ThreadPool consumer = ThreadPool(POOL_SIZE) 

This is an overview of how this can be done. I have not tested the code.

 from gevent import monkey; monkey.patch_all() from time import time import requests from PIL import Image from io import BytesIO import os from urlparse import urlparse from gevent.pool import Pool def download(url): try: response = session.get(url) except Exception as e: print(e) else: if response.status_code == requests.codes.ok: file_name = urlparse(url).path.rsplit('/',1)[-1] return (response.content,file_name) response.raise_for_status() def process(img): if img is None: return None img, name = img img = Image.open(BytesIO(img)) path = os.path.join(base_folder, name) try: img.save(path) except Exception as e: print(e) else: return True def run(urls): consumer.map(process, producer.imap_unordered(download, urls)) if __name__ == '__main__': POOL_SIZE = 300 producer = Pool(POOL_SIZE) consumer = Pool(POOL_SIZE) session = requests.Session() http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100) session.mount('http://', http_adapter) test_urls = # list of 100 urls base_folder = 'download_temp' t1 = time() run(test_urls) print time() - t1 
+1
source

I suggest paying attention to Grablib http://grablib.org/

This is an asynchronous parser based on pycurl and multicurl. It also tries to automatically resolve a network error (for example, try again if the timeout, etc.).

I believe that the Grab: Spider module will solve your problems by 99%. http://docs.grablib.org/en/latest/index.html#spider-toc

-1
source

Source: https://habr.com/ru/post/1235227/


All Articles