Build Mode and Related Counting Functions Using Custom Aggregation Functions for GroupBy in Dask

So now dask has been updated to support custom aggregation functions for groupby. (Thanks to the development team and @chmp for working on this!). I'm currently trying to build a mode function and a corresponding count function. Basically, I see that the mode returns a list for each grouping of the most common values ​​for a particular column (ie [4, 1, 2]). In addition, there is a corresponding function count, which returns the number of instances of these values, i.e. 3.

Now I am trying to implement this in code. According to the groupby.py file, the parameters for user aggregations are as follows:

Parameters
    ----------
    name : str
        the name of the aggregation. It should be unique, since intermediate
        result will be identified by this name.
    chunk : callable
        a function that will be called with the grouped column of each
        partition. It can either return a single series or a tuple of series.
        The index has to be equal to the groups.
    agg : callable
        a function that will be called to aggregate the results of each chunk.
        Again the argument(s) will be grouped series. If ``chunk`` returned a
        tuple, ``agg`` will be called with all of them as individual positional
        arguments.
    finalize : callable
        an optional finalizer that will be called with the results from the
        aggregation.

Here is the code for the mean:

    custom_mean = dd.Aggregation(
        'custom_mean',
        lambda s: (s.count(), s.sum()),
        lambda count, sum: (count.sum(), sum.sum()),
        lambda count, sum: sum / count,
    )
    df.groupby('g').agg(custom_mean)

. :

def custom_count(x):
    count = Counter(x)
    freq_list = count.values()
    max_cnt = max(freq_list)
    total = freq_list.count(max_cnt)
    return count.most_common(total)

custom_mode = dd.Aggregation(
    'custom_mode',
    lambda s: custom_count(s),
    lambda s1: s1.extend(),
    lambda s2: ......
)

, agg- . .

!

+4
1

, . , . , , , dask.

: :

res = chunk(df.groupby('g')['col'])
res = agg(res.groupby(level=[0]))
res = finalize(res)

:

def chunk(s):
    # for the comments, assume only a single grouping column, the 
    # implementation can handle multiple group columns.
    #
    # s is a grouped series. value_counts creates a multi-series like 
    # (group, value): count
    return s.value_counts()


def agg(s):
    # s is a grouped multi-index series. In .apply the full sub-df will passed
    # multi-index and all. Group on the value level and sum the counts. The
    # result of the lambda function is a series. Therefore, the result of the 
    # apply is a multi-index series like (group, value): count
    return s.apply(lambda s: s.groupby(level=-1).sum())

    # faster version using pandas internals
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).argmax())
    )

mode = dd.Aggregation('mode', chunk, agg, finalize)

, dataframe .mode . .

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({
    'col': [0, 1, 1, 2, 3] * 10,
    'g0': [0, 0, 0, 1, 1] * 10,
    'g1': [0, 0, 0, 1, 1] * 10,
})
ddf = dd.from_pandas(df, npartitions=10)

res = ddf.groupby(['g0', 'g1']).agg({'col': mode}).compute()
print(res)
+1

Source: https://habr.com/ru/post/1685174/


All Articles