Pyspark: Create MapType columns from existing columns

I need to create a new Spark DF MapType Column based on existing columns, where the column name is the key and value is the value.

As an example - I have this DF:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6), ('d23d', 1.5, 2.0, 2.2), ('as3d', 2.2, 4.3, 9.0) ]) schema = StructType([StructField('key', StringType(), True), StructField('metric1', FloatType(), True), StructField('metric2', FloatType(), True), StructField('metric3', FloatType(), True)]) df = sqlContext.createDataFrame(rdd, schema) +----+-------+-------+-------+ | key|metric1|metric2|metric3| +----+-------+-------+-------+ |123k| 1.3| 6.3| 7.6| |d23d| 1.5| 2.0| 2.2| |as3d| 2.2| 4.3| 9.0| +----+-------+-------+-------+ 

I can still create structType from this:

 nameCol = struct([name for name in df.columns if ("metric" in name)]).alias("metric") df2 = df.select("key", nameCol) +----+-------------+ | key| metric| +----+-------------+ |123k|[1.3,6.3,7.6]| |d23d|[1.5,2.0,2.2]| |as3d|[2.2,4.3,9.0]| +----+-------------+ 

But I need a metric column with am MapType, where the key is the name of the column:

 +----+-------------------------+ | key| metric| +----+-------------------------+ |123k|Map(metric1 -> 1.3, me...| |d23d|Map(metric1 -> 1.5, me...| |as3d|Map(metric1 -> 2.2, me...| +----+-------------------------+ 

Any hints how can I convert the data?

Thanks!

+10
source share
1 answer

In Spark 2.0 or later, you can use create_map . First import:

 from pyspark.sql.functions import lit, col, create_map from itertools import chain 

create_map expects an alternating sequence of keys and values that can be created, for example, as follows:

 metric = create_map(list(chain(*( (lit(name), col(name)) for name in df.columns if "metric" in name )))).alias("metric") 

and used with select :

 df.select("key", metric) 

With sample data, the result is:

 +----+---------------------------------------------------------+ |key |metric | +----+---------------------------------------------------------+ |123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6) | |d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2) | |as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0) | +----+---------------------------------------------------------+ 

If you are using an earlier version of Spark, you will have to use UDF:

 from pyspark.sql import Column from pyspark.sql.functions import struct from pyspark.sql.types import DataType, DoubleType, StringType, MapType def as_map(*cols: str, key_type: DataType=DoubleType()) -> Column: args = [struct(lit(name), col(name)) for name in cols] as_map_ = udf( lambda *args: dict(args), MapType(StringType(), key_type) ) return as_map_(*args) 

which can be used as follows:

 df.select("key", as_map(*[name for name in df.columns if "metric" in name]).alias("metric")) 
+17
source

Source: https://habr.com/ru/post/1013432/


All Articles