How to expand multiple columns in Spark SQL?

I need to expand more than one column in the pyspark framework. Example data frame

>>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)] >>> mydf = spark.createDataFrame(d,['id','day','price','units']) >>> mydf.show() +---+---+-----+-----+ | id|day|price|units| +---+---+-----+-----+ |100| 1| 23| 10| |100| 2| 45| 11| |100| 3| 67| 12| |100| 4| 78| 13| |101| 1| 23| 10| |101| 2| 45| 13| |101| 3| 67| 14| |101| 4| 78| 15| |102| 1| 23| 10| |102| 2| 45| 11| |102| 3| 67| 16| |102| 4| 78| 18| +---+---+-----+-----+ 

Now, if I need to get a price column in a row for each identifier based on the day, then I can use the pivot as method,

 >>> pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.first('price')) >>> pvtdf.show() +---+-------+-------+-------+-------+ | id|price_1|price_2|price_3|price_4| +---+-------+-------+-------+-------+ |100| 23| 45| 67| 78| |101| 23| 45| 67| 78| |102| 23| 45| 67| 78| +---+-------+-------+-------+-------+ 

so when I need unit columns as well as for transferring as a price, or I need to create another data frame as indicated above for units, and then join using id.But when I have more columns as such, I tried a function for this,

 >>> def pivot_udf(df,*cols): ... mydf = df.select('id').drop_duplicates() ... for c in cols: ... mydf = mydf.join(df.withColumn('combcol',F.concat(F.lit('{}_'.format(c)),df['day'])).groupby('id').pivot('combcol').agg(F.first(c)),'id') ... return mydf ... >>> pivot_udf(mydf,'price','units').show() +---+-------+-------+-------+-------+-------+-------+-------+-------+ | id|price_1|price_2|price_3|price_4|units_1|units_2|units_3|units_4| +---+-------+-------+-------+-------+-------+-------+-------+-------+ |100| 23| 45| 67| 78| 10| 11| 12| 13| |101| 23| 45| 67| 78| 10| 13| 14| 15| |102| 23| 45| 67| 78| 10| 11| 16| 18| +---+-------+-------+-------+-------+-------+-------+-------+-------+ 

Need suggestions if this is good practice, and if any other best way to do it. Thanks in advance!

+14
source share
4 answers

The solution to the question is the best I could get. The only improvement would be the cache input dataset to avoid double scanning, i.e.

 mydf.cache pivot_udf(mydf,'price','units').show() 
+2
source

As in the version with spark 1.6, I believe that the only way, because pivot occupies only one column, and there is a second attribute value, on which you can pass various values ​​of this column, which will make your code work faster, because otherwise In case a spark should start this for you, so yes, this is the right way to do it.

+1
source

Here's a non-UDF method involving one consolidation point (hence, just scanning a single column to determine all unique dates).

 mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit')) 

Here is the result (apologies for the inappropriate order and name):

 +---+-------+------+-------+------+-------+------+-------+------+ | id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit| +---+-------+------+-------+------+-------+------+-------+------+ |100| 23| 10| 45| 11| 67| 12| 78| 13| |101| 23| 10| 45| 13| 67| 14| 78| 15| |102| 23| 10| 45| 11| 67| 16| 78| 18| +---+-------+------+-------+------+-------+------+-------+------+ 

We simply aggregate both by price and by the unit column after a daily turn.

+1
source

I think you can create a merged column (combining all the columns you want to rotate to) and use a summary to the merged column.

0
source

Source: https://habr.com/ru/post/1269719/


All Articles