I am trying to create a data frame from json to dstream, but the code below does not give the correct data frame -
import sys
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
if __name__ == "__main__":
if len(sys.argv) != 3:
raise IOError("Invalid usage; the correct format is:\nquadrant_count.py <hostname> <port>")
spc = SparkContext(appName="jsonread")
sqlContext = SQLContext(spc)
stc = StreamingContext(spc, 2)
stc.checkpoint("checkpoint")
lines = stc.socketTextStream(sys.argv[1], int(sys.argv[2]))
lines.pprint()
parsed = lines.map(lambda x: json.loads(x))
def process(time, rdd):
print("========= %s =========" % str(time))
try:
sqlContext = getSqlContextInstance(rdd.context)
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = sqlContext.createDataFrame(rowRdd)
wordsDataFrame.registerTempTable("mytable")
testDataFrame = sqlContext.sql("select summary from mytable")
print(testDataFrame.show())
print(testDataFrame.printSchema())
except:
pass
parsed.foreachRDD(process)
stc.start()
stc.awaitTermination()
There are no errors, but when running the script, it successfully reads json from the streaming context, but does not display the values in the summary or in the dataframe.
The JSON example I'm trying to read is
{"reviewerID": "A2IBPI20UZIR0U", "asin": "1384719342", "reviewerName": " \" , , , , ... "," ": [0, 0], "reviewText": " , . -. . , , , , "," ": 5.0," ":" "," unixReviewTime ": 1393545600, "reviewTime": "02 28, 2014"}
, . .