The chain assigns a celery task to a distributed group

As in this other question , I want to create a celery group from the list returned by the celery task. The idea is that the first task will return the list, and the second task will blow this list into parallel tasks for each element in the list.

The plan is to use this when loading content. The first task gets links from the website, and the second task is a chain that loads the page, processes it, and then loads it on s3. Finally, as soon as all subpages are completed, the website will be marked as completed in our database. Sort of:

chain(
    get_links_from_website.si('https://www.google.com'),
    dmap.s(  # <-- Distributed map
        download_sub_page.s() | 
        process_sub_page.s() | 
        upload_sub_page_to_s3.s()
    ),
    mark_website_done.s()
)

The solution that I have seen so far seems to do the proper job, but does not work when the second task is a chain due to problems with clonenot doing a deep copy (see comments for this answer ):

@task
def dmap(it, callback):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)
    return group(callback.clone([arg,]) for arg in it)()

The problem also arises that if iterability is 10,000 items, it will create a group of 10,000 items. This, as you can imagine, is blowing up our memory usage.

So I'm looking for a way to do dmapthat:

  • Do not explode RAM by creating monstrous groups (maybe there is a way to cut the iteration?)
  • Works on celery chains without problems with a deep copy.
+6
source share
3 answers

celery canvas chunks, . , , chain, group.

dmap/clone.

ch = chain(
    download_sub_page.s(),
    process_sub_page.s(),
    upload_sub_page.s(),
)

@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
    result = kwargs['result']    
    header = [ch(i) for i in result]
    callback = mark_website_done.si()
    chord(header)(callback)

. , get_links_from_website .

, , get_links_from_website -. , .

+2

, deepcopy ,

def dmap(it, callback, final=None):
    # Map a callback over an iterator and return as a group
    callback = subtask(callback)

    run_in_parallel = group(subtask(copy.deepcopy(dict(callback))).clone([arg, ]) for arg in it)

    if len(run_in_parallel.tasks) == 0:
        return []

    if final:
        return chord(run_in_parallel)(final)

    return run_in_parallel.delay()

, (.. //),

, ,

# Hack to completely clone a signature with possibly complex subtasks (chains, chords, etc...)
run_in_parallel = group(pickle.loads(pickle.dumps(callback)).clone([arg, ]) for arg in it)

+1

I am trying to do the same, but both of the answers provided DO NOT WORK.

@app.task
def dmap(it, callback):
    callback = subtask(callback)
    return group(subtask(copy.deepcopy(dict(callback))).clone([arg,]) for arg in it)()

@app.task
def irange(n=10):
    return list(range(n))

@app.task
def addone(x):
    print(f"ADDONE: {x+1}")
    return x+1

app.conf.beat_schedule = {
    'chained_schedule': {
        'task': 'tasks.irange',
        'schedule': crontab(),
        'options': {
              'link': dmap.s(addone.s() | addone.s()),
        },
        'args': (100,),
    }
}

This question needs a correct working answer.

0
source

Source: https://habr.com/ru/post/1673549/


All Articles