Parallel distributed celery task with multiprocessing

I have a CPU Celery task. I would like to use all the processing power (cores) across multiple EC2 instances in order to do this work faster faster (parallel distributed task with celery with multiprocessing - I think) .

The terms streaming , multiprocessing , distributed computing , distributed parallel processing are all terms that I'm trying to understand better.

Example task:

@app.task for item in list_of_millions_of_ids: id = item # do some long complicated equation here very CPU heavy!!!!!!! database.objects(newid=id).save() 

Using the code above (with an example, if possible) , how would you distribute this task using Celery before, allowing you to share this single task using all the processing power of the processor on the entire available machine in the cloud?

+48
python multithreading django multiprocessing celery
May 28 '14 at 15:53
source share
4 answers

Your goals:

  • Extend your work to many machines (distributed computing / distributed parallel processing)
  • Distribute the work on this machine to all CPUs (multiprocessor / threads)

Celery can do this for you quite easily. The first thing to understand is that each celery worker is configured by default to run as many tasks as possible, since processor cores are available in the system:

Concurrency - the number of handlers executed to process your tasks at the same time, when they are all busy doing new work, tasks will have to wait for one of the tasks to complete before it can be processed.

The default concurrency number is the number of processors on this computer (including kernels) , you can specify your own number using the -c option. There is no recommended value, since the optimal number depends on the number of factors, but if your tasks are mainly related to I / O binding, you can try to increase it, experiments have shown that adding more than twice as much CPU is rarely effective and may worsen instead.

This means that for each individual task, there is no need to worry about using multiprocessing / streaming to use multiple processors / cores. Instead, celery will perform enough tasks at the same time to use every available processor.

To this end, the next step is to create a task that processes some subset of your list_of_millions_of_ids . Here you have a couple of options: each should process one identifier, so you run N tasks, where N == len(list_of_millions_of_ids) . This ensures that the work will be distributed evenly between all your tasks, as there will never be a case when one worker finishes early and just waits; if he needs work, he can get the identifier out of the queue. You can do this (as mentioned by John Doe) using the celery group .

tasks.py:

 @app.task def process_id(item): id = item #long complicated equation here database.objects(newid=id).save() 

And to complete the tasks:

 from celery import group from tasks import process_id jobs = group(process_id.s(item) for item in list_of_millions_of_ids) result = jobs.apply_async() 

Another option is to break the list into smaller pieces and distribute the parts to your employees. This approach runs the risk of spending several cycles because you may encounter some employees waiting while others continue to work. However,

+80
Jun 03 '14 at 15:43
source share

Why not use the group celery task for this?

http://celery.readthedocs.org/en/latest/userguide/canvas.html#groups

Basically, you should split ids into chunks (or ranges) and give them a bunch of tasks in the group .

For more complex ones, such as aggregating the results of specific celery tasks, I have successfully used the chord task for a similar purpose:

http://celery.readthedocs.org/en/latest/userguide/canvas.html#chords

Increase settings.CELERYD_CONCURRENCY to a reasonable amount that you can afford, then celery workers will continue to carry out your tasks in a group or chord until they are completed.

Note: due to an error in kombu there were problems with kombu workers for a large number of tasks in the past, I do not know if this is fixed now. Perhaps this is so, but if not, reduce CELERYD_MAX_TASKS_PER_CHILD.

An example based on the simplified and modified code that I run:

 @app.task def do_matches(): match_data = ... result = chord(single_batch_processor.s(m) for m in match_data)(summarize.s()) 

summarize gets the results of all single_batch_processor tasks. Each task is performed on any celery worker, kombu determines the coordinates.

Now I get: single_batch_processor and summarize ALSO there should be celery tasks, not ordinary functions, otherwise, of course, it will not be parallelized (I'm not even sure that the chord constructor will accept it if it does not have celery).

+9
May 30 '14 at 7:48
source share

In the world of distribution, there is only one thing that you must remember first:

Premature optimization is the root of all evil. D. Knuth

I know this seems obvious, but before distributing the double check you use the best algorithm (if one exists ...). Having said that, distribution optimization is a balancing act between three things:

  • Writing / reading data from a constant environment,
  • Moving data from environment A to environment B,
  • Data processing,

Computers are made so that the closer you get to your processor (3), the faster and more efficient (1) and (2). An order in a classic cluster will be: a network hard drive, a local hard drive, RAM, the inside of the processing unit ... Currently, processors are becoming complex enough to be considered an ensemble of independent hardware processors, usually called cores, these processes process data (3) through streams (2). Imagine that your kernel is so fast that when sending data in one thread, you use 50% of the computer's power, if there are 2 threads in the kernel, you will use 100%. Two threads per core are called hyper threads, and your OS will see 2 processors per hyper kernel.

Thread management in a processor is commonly called multithreaded. Managing processors from the OS is usually called multiprocessing. Managing concurrent tasks in a cluster is usually called concurrent programming. Managing dependent tasks in a cluster is usually called distributed programming.

So where is your bottleneck?

  • In (1): try to save and transfer from the upper level (closer to your processor, for example, if the network hard drive is slower to save to the local hard drive first)
  • In (2): This is the most common question, try to avoid communication packets that are not needed to distribute or compress packets "on the fly" (for example, if HD is slow, save only the message "batch calculation" and save the intermediate results in RAM).
  • In (3): You are done! You use all the computing power at your disposal.

How about celery?

Celery is a messaging environment for distributed programming that will use a brokerage module for communication (2) and a backend module for saving (1), which means that you can change the configuration to avoid most of the bottlenecks (if possible) in your network and only on your network. First create your code to achieve the best performance on a single computer. Then use celery in your cluster with the default configuration and set CELERY_RESULT_PERSISTENT=True :

 from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//', backend='redis://localhost') @app.task def process_id(all_the_data_parameters_needed_to_process_in_this_computer): #code that does stuff return result 

At runtime, open your favorite monitoring tools, I use the default for rabbitMQ and the flower for celery and the top for the processor, your results will be saved in your backend. An example of a network bottleneck is that task queues grow so much that they delay execution, you can proceed to change the modules or celery configuration, if not your bottleneck somewhere else.

+7
Jun 06 '14 at 2:10
source share

Adding more celery workers will certainly speed up this task. Perhaps you may have another bottleneck: the database. Make sure it can handle simultaneous inserts / updates.

Regarding your question: you add celery workers by assigning another process in your EC2 instances as celeryd . Depending on how many workers you need, you can add even more instances.

+2
May 28 '14 at 20:05
source share



All Articles