Explosion of an array- (Dataframe) pySpark

I have a dataframe like this:

+-----+--------------------+ |index| merged| +-----+--------------------+ | 0|[[2.5, 2.4], [3.5...| | 1|[[-1.0, -1.0], [-...| | 2|[[-1.0, -1.0], [-...| | 3|[[0.0, 0.0], [0.5...| | 4|[[0.5, 0.5], [1.0...| | 5|[[0.5, 0.5], [1.0...| | 6|[[-1.0, -1.0], [0...| | 7|[[0.0, 0.0], [0.5...| | 8|[[0.5, 0.5], [1.0...| +-----+--------------------+ 

And I want to blow the combined column in

 +-----+-------+-------+ |index|Column1|Column2| +-----+-------+-------+ | 0| 2.5| 2.4 | | 1| 3.5| 0.5| | 2| -1.0| -1.0| | 3| -1.0| -1.0| | 4| 0.0 | 0.0 | | 5| 0.5| 0.74| +-----+-------+-------+ 

Each tuple [[2.5, 2.4], [3.5,0,5]] re-censors two columns, knowing that 2,5 and 3,5 will be stored in column 1 and (2,4,0,5) will be saved in the second column

So i tried this

 df= df.withColumn("merged", df["merged"].cast("array<array<float>>")) df= df.withColumn("merged",explode('merged')) 

then I would use udf to create another DF

but I can’t use the data or apply the explosion and I got an error

 pyspark.sql.utils.AnalysisException: u"cannot resolve 'cast(merged as array<array<float>)' due to data type mismatch: cannot cast StringType to ArrayType(StringType,true) 

I also tried

 df= df.withColumn("merged", df["merged"].cast("array<string>")) 

but nothing works and if I use an explosion without actuation, I get

 pyspark.sql.utils.AnalysisException: u"cannot resolve 'explode(merged)' due to data type mismatch: input to function explode should be array or map type, not StringType; 
+5
source share
1 answer

You can try the following code:

 from pyspark import SparkConf, SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import FloatType, StringType, IntegerType from pyspark.sql.functions import udf, col def col1_calc(merged): return merged[0][0] def col2_calc(merged): return merged[0][1] if __name__ == '__main__': spark = SparkSession \ .builder \ .appName("Python Spark SQL Hive integration example") \ .getOrCreate() df = spark.createDataFrame([ (0, [[2.5,2.4],[3.5]]), (1, [[-1.0,-1.0],[3.5]]), (2, [[-1.0,-1.0],[3.5]]), ], ["index", "merged"]) df.show() column1_calc = udf(col1_calc, FloatType()) df = df.withColumn('Column1', column1_calc(df['merged'])) column2_calc = udf(col2_calc, FloatType()) df = df.withColumn('Column2', column2_calc(df['merged'])) df = df.select(['Column1', 'Column2', 'index']) df.show() 

Output:

 +-------+-------+-----+ |Column1|Column2|index| +-------+-------+-----+ | 2.5| 2.4| 0| | -1.0| -1.0| 1| | -1.0| -1.0| 2| +-------+-------+-----+ 
0
source

Source: https://habr.com/ru/post/1258404/


All Articles