How to return complex types using spark UDF

Hello and thank you in advance.

My program is written in java and I cannot switch to scala.

I am currently working with a spark DataFrame extracted from a json file using the following line:

DataFrame dff = sqlContext.read().json("filePath.son");

SQLContext and SparkContext correctly initialize and work fine.

The problem is that the json I'm reading has nested structures and I want to clear / check the internal data without changing the schema.

One of the data columns in particular is of type "GenericRowWithSchema".

Say I want to clear only the column named "data".

The solution that occurred to me was to define a user-defined function (UDF) with the name "cleanDataField" and then run it on the "data" columns. Here is the code:

 UDF1<GenericRowWithSchema,GenericRowWithSchema> cleanDataField = new UDF1<GenericRowWithSchema, GenericRowWithSchema>(){ public GenericRowWithSchema call( GenericRowWithSchema grws){ cleanGenericRowWithSchema(grws); return grws; } }; 

Then I register the function in the SQLContext:

 sqlContext.udf().register("cleanDataField", cleanDataField, DataTypes.StringType); 

And after that I would call

df.selectExpr("cleanDataField(data)").show(10, false);

To see the first 10 lines with clean data.

Ultimately, the question arises: can I return complex data (for example, a custom class object)? And if possible, how can I do this? I think I need to change the third udf registration parameter, because I am not returning a string, but why replace it?

thanks

+5
source share
2 answers

Suppose you want to build a data type as struct<companyid:string,loyaltynum:int,totalprice:int,itemcount:int>

To do this, you can do the following:

  // I am just copying the json string as is but you will need to escape it properly for java. DataType dt = DataType.fromJson({"type":"struct","fields":[{"name":"companyid","type":"string","nullable":false,"metadata":{}},{"name":"loyaltynum","type":"integer","nullable":false,"metadata":{}},{"name":"totalprice","type":"integer","nullable":false,"metadata":{}},{"name":"itemcount","type":"integer","nullable":false,"metadata":{}}]}) 

You can then use this data type as the return type when registering your UDF.

+2
source

I do not know if your question remains valid, but in case, here is the answer:

You need to replace the third argument with Encoders.bean(GenericRowWithSchema).schema()

0
source

Source: https://habr.com/ru/post/1257399/


All Articles