Distribute serial python code using celery

I have the following tasks:

get_filters() # returns a list of filters
get_customers(filter_a, filter_b) # returns a pandas DataFrame containing customers
get_orders(filter_a, filter_b) # returns a pandas DataFrame of customers and aggregate purchase statistics
get_returns(filter_a, filter_b) # returns a pandas DataFrame of customers and aggregate return statistics 

The serial code works as follows:

def extract_customer():
    filt_a, filt_b = get_filters()
    cust = get_customers(filt_a, filt_b)
    ord = get_orders(filt_a, filt_b)
    ret = get_returns(filt_a, filt_b)
    merged = cust.join([ord, ret])

I would like to distribute the celery task so that get_filters is executed first, and then get_customers, get_orders and get_returns are executed at the same time. and finally, when they are done, the merge function returns the combined dataset.

I'm not sure how to do this using canvas in celery.

Thank you for your help.

+4
source share
1 answer

In your case you need to call get_filters(), wait for the result and pass it to get_customers, get_orders, get_returnssign in chordwith a new function mergedas a callback.

:

def extract_customer():
    filt_a, filt_b = get_filters()

    result = chord([
        get_customers.s(filt_a, filt_b),
        get_orders.s(filt_a, filt_b),
        get_returns.s(filt_a, filt_b)
    ])(merged.s())

    result.get()

@shared_task
def merged(args):
    cust, ord, ret = args
    return cust.join([ord, ret])

, , , get_ * . , json, pickle .

+3

Source: https://habr.com/ru/post/1542063/


All Articles