What is the most efficient way to accumulate data in pyspark?

I have a dataframe (or maybe any RDD) containing several million rows in a well-known schema, for example:

Key | FeatureA | FeatureB
--------------------------
U1  |        0 |         1
U2  |        1 |         1

I need to load a dozen other data sets from disk containing different functions for the same number of keys. Some datasets are up to a dozen or so wide. Imagine:

Key | FeatureC | FeatureD |  FeatureE
-------------------------------------
U1  |        0 |        0 |         1

Key | FeatureF
--------------
U2  |        1

This is like a crease or accumulation when I just want to iterate over all the datasets and return something like this:

Key | FeatureA | FeatureB | FeatureC | FeatureD | FeatureE | FeatureF 
---------------------------------------------------------------------
U1  |        0 |        1 |        0 |        0 |        1 |        0
U2  |        1 |        1 |        0 |        0 |        0 |        1

I tried loading every dataframe, then I join, but it takes forever as soon as I pass several datasets. Am I missing a general scheme or an effective way to solve this problem?

+4
1

, DataFrame , , . :

from itertools import chain
from functools import reduce
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, lit, max
from pyspark.sql import DataFrame

df1 = sc.parallelize([
    ("U1", 0, 1), ("U2", 1, 1)
]).toDF(["Key", "FeatureA", "FeatureB"])

df2 = sc.parallelize([
  ("U1", 0, 0, 1)
]).toDF(["Key", "FeatureC", "FeatureD", "FeatureE"])

df3 = sc.parallelize([("U2", 1)]).toDF(["Key", "FeatureF"])

dfs = [df1, df2, df3]

:

output_schema = StructType(
  [df1.schema.fields[0]] + list(chain(*[df.schema.fields[1:] for df in dfs]))
)

DataFrames:

transformed_dfs = [df.select(*[
  lit(None).cast(c.dataType).alias(c.name) if c.name not in df.columns 
  else col(c.name)
  for c in output_schema.fields
]) for df in dfs]

, :

combined = reduce(DataFrame.unionAll, transformed_dfs)
exprs = [max(c).alias(c) for c in combined.columns[1:]]
result = combined.repartition(col("Key")).groupBy(col("Key")).agg(*exprs)

, - , max collect_list/collect_set, explode.

+1

Source: https://habr.com/ru/post/1628578/


All Articles