Reading JSON Arrays in a Spark Dataframe

I have a large nested NDJ (new JSON line separator file) that I need to read into a single spark data block and save the parquet. In an attempt to visualize the circuit, I use this function:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName) case _ => Array(col(colName)) } }) } 

on a data frame that is returned by reading with

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

I also switched it to val df = spark.read.json(path) , so this only works with NDJ and not with multiple JSON lines - with the same error.

This causes a memory error for working java.lang.OutOfMemoryError: Java heap space .

I changed the jvm memory settings and fixed the executor / driver parameters to no avail

Is there a way to stream a file, smooth the circuit, and gradually add to the data framework? Some JSON lines contain new fields from the previous one, so they must be filled in later.

+5
source share
2 answers

No work. The problem was restricting the JVM object. I ended up using the scala json parser and created the data framework manually.

+2
source

You can achieve this in several ways.

While reading, you can provide a schema for data to read json, or you can let sparks output the schema on its own.

Once json is in the data frame, you can perform the following ways to smooth it out.

a. Using explode () in a dataframe - smooth it out. b. Using spark sql and access to nested fields. operator. You can find examples here.

Finally, if you want to add new columns to dataframe a. The first option, using withColumn (), is one approach. However, this will be done for each added column and for the entire dataset. b. Using sql to generate a new data frame from existing ones - this may be easiest with. Finally, using the map, then accessing the elements, getting the old scheme, add new values, create a new scheme and finally get a new df - as shown below

One withColumn will work on all rdd. Therefore, it is usually not recommended to use a method for each column that you want to add. There is a way that you work with columns and their data inside a map function. Since one map function is used here, the code to add a new column and its data will be executed in parallel.

a. you can collect new values ​​based on calculations

b. Add these new column values ​​to main rdd as shown below

 val newColumns: Seq[Any] = Seq(newcol1,newcol2) Row.fromSeq(row.toSeq.init ++ newColumns) 

Here is the string, this is the link to the string in the map method

with. Create a new diagram below

 val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d. Add to the old scheme

 val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e. Creating a new data frame with new columns

 val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
0
source

Source: https://habr.com/ru/post/1261066/


All Articles