So now dask has been updated to support custom aggregation functions for groupby. (Thanks to the development team and @chmp for working on this!). I'm currently trying to build a mode function and a corresponding count function. Basically, I see that the mode returns a list for each grouping of the most common values for a particular column (ie [4, 1, 2]). In addition, there is a corresponding function count, which returns the number of instances of these values, i.e. 3.
Now I am trying to implement this in code. According to the groupby.py file, the parameters for user aggregations are as follows:
Parameters
name : str
the name of the aggregation. It should be unique, since intermediate
result will be identified by this name.
chunk : callable
a function that will be called with the grouped column of each
partition. It can either return a single series or a tuple of series.
The index has to be equal to the groups.
agg : callable
a function that will be called to aggregate the results of each chunk.
Again the argument(s) will be grouped series. If ``chunk`` returned a
tuple, ``agg`` will be called with all of them as individual positional
arguments.
finalize : callable
an optional finalizer that will be called with the results from the
aggregation.
Here is the code for the mean:
custom_mean = dd.Aggregation(
'custom_mean',
lambda s: (s.count(), s.sum()),
lambda count, sum: (count.sum(), sum.sum()),
lambda count, sum: sum / count,
)
df.groupby('g').agg(custom_mean)
. :
def custom_count(x):
count = Counter(x)
freq_list = count.values()
max_cnt = max(freq_list)
total = freq_list.count(max_cnt)
return count.most_common(total)
custom_mode = dd.Aggregation(
'custom_mode',
lambda s: custom_count(s),
lambda s1: s1.extend(),
lambda s2: ......
)
, agg- . .
!