Pyspark: parse json row column

I have a pyspark framework consisting of a single column called json , where each row is a unicode json string. I would like to parse every row and return a new data framework where each row is json parsed.

 # Sample Data Frame jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}' jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}' jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}' df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)]) 

I tried matching every line with json.loads :

 (df .select('json') .rdd .map(lambda x: json.loads(x)) .toDF() ).show() 

But this returns a TypeError: expected string or buffer

I suspect part of the problem is that when converting from dataframe to rdd information is lost, so I also tried to manually enter the schema information:

 schema = StructType([StructField('json', StringType(), True)]) rdd = (df .select('json') .rdd .map(lambda x: json.loads(x)) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show() 

But I get the same TypeError .

Looking at this answer , it seems that aligning lines with flatMap might be useful here, but I'm not sure about this:

 schema = StructType([StructField('json', StringType(), True)]) rdd = (df .select('json') .rdd .flatMap(lambda x: x) .flatMap(lambda x: json.loads(x)) .map(lambda x: x.get('body')) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show() 

I get this error: AttributeError: 'unicode' object has no attribute 'get' .

+19
source share
4 answers

Converting a DataFrame with json strings to a structured dataframe is'a actually quite simple in the spark case, if you previously converted a dataframe to RDD strings (see http://spark.apache.org/docs/latest/sql-programming- guide.html # json-datasets )

For instance:

 >>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json)) >>> new_df.printSchema() root |-- body: struct (nullable = true) | |-- id: long (nullable = true) | |-- name: string (nullable = true) | |-- sub_json: struct (nullable = true) | | |-- id: long (nullable = true) | | |-- sub_sub_json: struct (nullable = true) | | | |-- col1: long (nullable = true) | | | |-- col2: string (nullable = true) |-- header: struct (nullable = true) | |-- foo: string (nullable = true) | |-- id: long (nullable = true) 
+21
source

For Spark 2. 1+, you can use from_json which allows you to save other non-json columns in the data frame as follows:

 from pyspark.sql.functions import from_json, col json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema df.withColumn('json', from_json(col('json'), json_schema)) 

You let Spark get the json string column schema. Then the df.json column df.json no longer a StringType, but the correctly decoded json structure, i.e. the nested StrucType and all other df columns, are saved as they are.

You can access the json content as follows:

 df.select(col('json.header').alias('header')) 
+23
source

Existing answers do not work if your JSON is different from the ideal / traditional format. For example, RDD-based schema output expects JSON in curly braces {} and will provide an invalid schema (resulting in null values) if, for example, your data looks like this:

 [ { "a": 1.0, "b": 1 }, { "a": 0.0, "b": 2 } ] 

I wrote a function to get around this problem by clearing JSON so that it is in another JSON object:

 def parseJSONCols(df, *cols, sanitize=True): """Auto infer the schema of a json column and parse into a struct. rdd-based schema inference works if you have well-formatted JSON, like ''{"key": "value", ...}'', but breaks if your 'JSON' is just a string (''"data"'') or is an array (''[1, 2, 3]''). In those cases you can fix everything by wrapping the data in another JSON object (''{"key": [1, 2, 3]}''). The ''sanitize'' option (default True) automatically performs the wrapping and unwrapping. The schema inference is based on this 'SO Post <https://stackoverflow.com/a/45880574)/>'_. Parameters ---------- df : pyspark dataframe Dataframe containing the JSON cols. *cols : string(s) Names of the columns containing JSON. sanitize : boolean Flag indicating whether you'd like to sanitize your records by wrapping and unwrapping them in another JSON object layer. Returns ------- pyspark dataframe A dataframe with the decoded columns. """ res = df for i in cols: # sanitize if requested. if sanitize: res = ( res.withColumn( i, psf.concat(psf.lit('{"data": '), i, psf.lit('}')) ) ) # infer schema and apply it schema = spark.read.json(res.rdd.map(lambda x: x[i])).schema res = res.withColumn(i, psf.from_json(psf.col(i), schema)) # unpack the wrapped object if needed if sanitize: res = res.withColumn(i, psf.col(i).data) return res 

Note: psf = pyspark.sql.functions .

+7
source

Here's a short (sparkling SQL) version of @ nolan-conaway parseJSONCols .

 SELECT explode( from_json( concat('{"data":', '[{"a": 1.0,"b": 1},{"a": 0.0,"b": 2}]', '}'), 'data array<struct<a:DOUBLE, b:INT>>' ).data) as data; 

PS. I also added a diversity function: P

You Need to Know Some Types of HIVE SQL

0
source

Source: https://habr.com/ru/post/1261156/


All Articles