Unable to get multiprocessing to run processes simultaneously

The following code does not work at the same time, and I do not know exactly why:

def run_normalizers(config, debug, num_threads, name=None): def _run(): print('Started process for normalizer') sqla_engine = init_sqla_from_config(config) image_vfs = create_s3vfs_from_config(config, config.AWS_S3_IMAGE_BUCKET) storage_vfs = create_s3vfs_from_config(config, config.AWS_S3_STORAGE_BUCKET) pp = PipedPiper(config, image_vfs, storage_vfs, debug=debug) if name: pp.run_pipeline_normalizers(name) else: pp.run_all_normalizers() print('Normalizer process complete') threads = [] for i in range(num_threads): threads.append(multiprocessing.Process(target=_run)) [t.start() for t in threads] [t.join() for t in threads] run_normalizers(...) 

The config variable is just a dictionary defined outside of the _run() function. It seems that all the processes are created, but it is not as fast as if I were doing it with a single process. Basically, what happens in the run_**_normalizers() functions is read from the queue table in the database (SQLAlchemy), then it processes several HTTP requests, and then starts the pipeline of normalizers to modify the data, and then saves them back to a database. I come from JVM-land, where threads are "heavy" and often used for parallelism - I'm a little confused by this, as I thought the multiprocess module should have bypassed the Python GIL limitations.

+6
source share
2 answers

fixed my multiprocessing problem - and actually switched threads. I’m not sure what I actually fixed it - I just rebuilt everything and created workers and tasks, and what not, and now they are flying. Here are the basics of what I did:

 import abc from Queue import Empty, Queue from threading import Thread class AbstractTask(object): """ The base task """ __metaclass__ = abc.ABCMeta @abc.abstractmethod def run_task(self): pass class TaskRunner(object): def __init__(self, queue_size, num_threads=1, stop_on_exception=False): super(TaskRunner, self).__init__() self.queue = Queue(queue_size) self.execute_tasks = True self.stop_on_exception = stop_on_exception # create a worker def _worker(): while self.execute_tasks: # get a task task = None try: task = self.queue.get(False, 1) except Empty: continue # execute the task failed = True try: task.run_task() failed = False finally: if failed and self.stop_on_exception: print('Stopping due to exception') self.execute_tasks = False self.queue.task_done() # start threads for i in range(0, int(num_threads)): t = Thread(target=_worker) t.daemon = True t.start() def add_task(self, task, block=True, timeout=None): """ Adds a task """ if not self.execute_tasks: raise Exception('TaskRunner is not accepting tasks') self.queue.put(task, block, timeout) def wait_for_tasks(self): """ Waits for tasks to complete """ if not self.execute_tasks: raise Exception('TaskRunner is not accepting tasks') self.queue.join() 

all i do is create a TaskRunner and add tasks to it (thousands of them) and then call wait_for_tasks (). therefore, obviously, in the restructuring that I did, I β€œfixed” some other problems that I had. Strange, however.

+3
source

If you are still looking for a multiprocessor solution, you can first check how to use the worker pool, then you do not have to manage the num_threads processes yourself: http://docs.python.org/2/library/multiprocessing.html#using-a-pool -of-workers

And for the slowdown problem, did you try to pass the configuration object as an argument to the _run function? I don’t know if this will change internally, but I think it can change something.

+1
source

Source: https://habr.com/ru/post/949794/


All Articles