Trying to turn blob into multiple columns in Spark

I have a serialized blob and a function that converts it to a java map. I registered the function as UDF and tried to use it in Spark SQL as follows:

sqlCtx.udf.register("blobToMap", Utils.blobToMap) val df = sqlCtx.sql(""" SELECT mp['c1'] as c1, mp['c2'] as c2 FROM (SELECT *, blobToMap(payload) AS mp FROM t1) a """) 

I succeed, but for some reason, the very heavy blobToMap function blobToMap executed twice for each row, and in fact I am extracting 20 fields and it is executed 20 times for each row. I saw sentences in Derive multiple columns from a single column in a Spark DataFrame but they really are not scalable - I do not want to create a class for every time I need to retrieve data.

How can I get Spark to do what is reasonable? I tried to separate before two stages. The only thing that worked was to cache the internal selection - but this is not possible, because it is really a big drop, and I need only a few dozen fields.

+1
source share
1 answer

I will answer myself, hoping this will help someone ... so after dozens of experiments, I was able to get the spark to evaluate udf and turn it into a card once, instead of recounting it again and again for each key request, breaking the request and doing an evil ugly trick - turning it into an RDD and back into a DataFrame:

 val df1 = sqlCtx.sql("SELECT *, blobToMap(payload) AS mp FROM t1") sqlCtx.createDataFrame(df.rdd, df.schema).registerTempTable("t1_with_mp") val final_df = sqlCtx.sql("SELECT mp['c1'] as c1, mp['c2'] as c2 FROM t1_with_mp") 
+1
source

Source: https://habr.com/ru/post/1011847/


All Articles