I have a pyspark framework consisting of a single column called json , where each row is a unicode json string. I would like to parse every row and return a new data framework where each row is json parsed.
# Sample Data Frame jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}' jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}' jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}' df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
I tried matching every line with json.loads :
(df .select('json') .rdd .map(lambda x: json.loads(x)) .toDF() ).show()
But this returns a TypeError: expected string or buffer
I suspect part of the problem is that when converting from dataframe to rdd information is lost, so I also tried to manually enter the schema information:
schema = StructType([StructField('json', StringType(), True)]) rdd = (df .select('json') .rdd .map(lambda x: json.loads(x)) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show()
But I get the same TypeError .
Looking at this answer , it seems that aligning lines with flatMap might be useful here, but I'm not sure about this:
schema = StructType([StructField('json', StringType(), True)]) rdd = (df .select('json') .rdd .flatMap(lambda x: x) .flatMap(lambda x: json.loads(x)) .map(lambda x: x.get('body')) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show()
I get this error: AttributeError: 'unicode' object has no attribute 'get' .