Spark - reading JSON from shared folders using Firehose

Kinesis firehose manages saving files, in this case JSON time series, to a folder hierarchy that is divided into YYYY / MM / DD / HH (up to an hour in 24 numbers) ... excellent.

How to use Spark 2.0, can I read these nested subfolders and create a static Dataframe from all sheet json files? Is there an “option” for a data reader?

My next goal is streaming DF, where the new files that Firehose in s3 saves naturally become part of the streaming frame using the new structured streaming in Spark 2.0. I know that all this is experimental - in the hope that someone used S3 as the source of the stream file before the data is broken into patches, as described above. Of course, we would prefer a direct Kinesis stream, but there is no date on this connector, so Firehose-> S3 is intermediate.

ND: I use databricks, which mounts S3 in DBFS, but can easily be EMR of course or other Spark providers. Be healthy to see a laptop if it is available for sharing, which gives an example.

Hooray!

+6
source share
2 answers

Can I read subfolders and create static DataFrames from all JSON sheet files? Is it possible to read DataFrame data?

Yes, since your directory structure is regular ( YYYY/MM/DD/HH ), you can specify the path to the node sheet with wildcard characters, as shown below

 val spark: SparkSession = SparkSession.builder.master("local").getOrCreate val jsonDf = spark.read.format("json").json("base/path/*/*/*/*/*.json") /* Here */*/*/*/*.json maps to YYYY/MM/DD/HH/filename.json */ 

Of course, we would prefer a direct Kinesis stream, but there is no date on this connector, so Firehose-> S3 is intermediate.

I saw that there is a library for integrating Kinesis with Spark Streaming . Thus, you can directly read streaming data and perform SQL operations on it without reading from S3.

 groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.11 version = 2.0.0 

Sample code with Spark Streaming and SQL

 import org.apache.spark.streaming.Duration import org.apache.spark.streaming.kinesis._ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream val kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2) kinesisStream.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val jsonDf = rdd.toDF() // or rdd.toDF("specify schema/columns here") // Create a temporary view with DataFrame jsonDf.createOrReplaceTempView("json_data_tbl") //As we have DataFrame and SparkSession object we can perform most //of the Spark SQL stuff here } 
+3
source

Full disclosure: I work for the Databricks, but I do not represent them in Stack Overflow.

How to use Spark 2.0, can I read these nested subfolders and create a static Dataframe from all sheet json files? Is there an “option” for a data reader?

DataFrameReader supports loading sequences. See the documentation for def json (paths: String *): DataFrame . You can specify a sequence, use a globe template, or create it programmatically (recommended):

 val inputPathSeq = Seq[String]("/mnt/myles/structured-streaming/2016/12/18/02", "/mnt/myles/structured-streaming/2016/12/18/03") val inputPathGlob = "/mnt/myles/structured-streaming/2016/12/18/*" val basePath = "/mnt/myles/structured-streaming/2016/12/18/0" val inputPathList = (2 to 4).toList.map(basePath+_+"/*.json") 

I know that this is all experimental - in the hope that someone used S3 as the source of the stream file before, where the data is divided into folders, as described above. Of course, we would prefer a direct Kinesis stream, but there is no date on this connector, so Firehose-> S3 is intermediate.

Since you are using DBFS, I am going to assume that the S3 buckets where data is transferred from Firehose are already installed in DBFS. Check out the Databricks documentation if you need help installing your S3 bucket in DBFS . After you have specified your input path described above, you can simply upload the files to a static or stream data frame:

Static

 val staticInputDF = spark .read .schema(jsonSchema) .json(inputPathSeq : _*) staticInputDF.isStreaming res: Boolean = false 

Streaming

 val streamingInputDF = spark .readStream // `readStream` instead of `read` for creating streaming DataFrame .schema(jsonSchema) // Set the schema of the JSON data .option("maxFilesPerTrigger", 1) // Treat a sequence of files as a stream by picking one file at a time .json(inputPathSeq : _*) streamingCountsDF.isStreaming res: Boolean = true 

Most of them come directly from the Databricks Documentation on Structured Streaming . There is even an example of a laptop that you can import directly into the Databricks.

+2
source

Source: https://habr.com/ru/post/1011865/


All Articles