I am trying to convert data stored in S3 as JSON-per-line text files into a structured, columnar format such as ORC or Parquet on S3.
The source files contain data from several schemes (for example, an HTTP request, an HTTP response, etc.), which must be parsed into various Spark Dataframes of the correct type.
Examples of schemes:
val Request = StructType(Seq( StructField("timestamp", TimestampType, nullable=false), StructField("requestId", LongType), StructField("requestMethod", StringType), StructField("scheme", StringType), StructField("host", StringType), StructField("headers", MapType(StringType, StringType, valueContainsNull=false)), StructField("path", StringType), StructField("sessionId", StringType), StructField("userAgent", StringType) )) val Response = StructType(Seq( StructField("timestamp", TimestampType, nullable=false), StructField("requestId", LongType), StructField("contentType", StringType), StructField("contentLength", IntegerType), StructField("statusCode", StringType), StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)), StructField("responseDuration", DoubleType), StructField("sessionId", StringType) ))
I got this part of the job perfectly, however, trying to get the data back to S3 as efficiently as possible seems like a problem.
I tried 3 approaches:
- muxPartitions from silex project
- caching the parsed input S3 and cyclic switching to it several times
- creating each schema like a separate RDD partition
In the first case, the JVM ran out of memory, and in the second, on disk.
The third one, which I have not yet fully verified, but this does not seem to be efficient use of computing power (since only one cluster node (the one on which this particular partition) will actually write data back to S3).
Relevant Code:
val allSchemes = Schemes.all().keys.toArray if (false) { import com.realo.warehouse.multiplex.implicits._ val input = readRawFromS3(inputPrefix) // returns RDD[Row] .flatMuxPartitions(allSchemes.length, data => { val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] } data.foreach { logItem => { val schemeIndex = allSchemes.indexOf(logItem.logType) if (schemeIndex > -1) { buffers(schemeIndex).append(logItem.row) } } } buffers }) allSchemes.zipWithIndex.foreach { case (schemeName, index) => val rdd = input(index) writeColumnarToS3(rdd, schemeName) } } else if (false) { // Naive approach val input = readRawFromS3(inputPrefix) // returns RDD[Row] .persist(StorageLevel.MEMORY_AND_DISK) allSchemes.foreach { schemeName => val rdd = input .filter(x => x.logType == schemeName) .map(x => x.row) writeColumnarToS3(rdd, schemeName) } input.unpersist() } else { class CustomPartitioner extends Partitioner { override def numPartitions: Int = allSchemes.length override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String]) } val input = readRawFromS3(inputPrefix) .map(x => (x.logType, x.row)) .partitionBy(new CustomPartitioner()) .map { case (logType, row) => row } .persist(StorageLevel.MEMORY_AND_DISK) allSchemes.zipWithIndex.foreach { case (schemeName, index) => val rdd = input .mapPartitionsWithIndex( (i, iter) => if (i == index) iter else Iterator.empty, preservesPartitioning = true ) writeColumnarToS3(rdd, schemeName) } input.unpersist() }
Conceptually, I think that the code should have 1 DStream output for each type of circuit, and the input RDD should choose 'n to place each processed element on the correct DStream (with batch loading for better throughput).
Does anyone have directions on how to implement this? And / or is there a better way to solve this problem?