, DataFrame , , . :
from itertools import chain
from functools import reduce
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, lit, max
from pyspark.sql import DataFrame
df1 = sc.parallelize([
("U1", 0, 1), ("U2", 1, 1)
]).toDF(["Key", "FeatureA", "FeatureB"])
df2 = sc.parallelize([
("U1", 0, 0, 1)
]).toDF(["Key", "FeatureC", "FeatureD", "FeatureE"])
df3 = sc.parallelize([("U2", 1)]).toDF(["Key", "FeatureF"])
dfs = [df1, df2, df3]
:
output_schema = StructType(
[df1.schema.fields[0]] + list(chain(*[df.schema.fields[1:] for df in dfs]))
)
DataFrames:
transformed_dfs = [df.select(*[
lit(None).cast(c.dataType).alias(c.name) if c.name not in df.columns
else col(c.name)
for c in output_schema.fields
]) for df in dfs]
, :
combined = reduce(DataFrame.unionAll, transformed_dfs)
exprs = [max(c).alias(c) for c in combined.columns[1:]]
result = combined.repartition(col("Key")).groupBy(col("Key")).agg(*exprs)
, - , max collect_list/collect_set, explode.