Demultiplexing RDD onto multiple ORC tables

I am trying to convert data stored in S3 as JSON-per-line text files into a structured, columnar format such as ORC or Parquet on S3.

The source files contain data from several schemes (for example, an HTTP request, an HTTP response, etc.), which must be parsed into various Spark Dataframes of the correct type.

Examples of schemes:

val Request = StructType(Seq( StructField("timestamp", TimestampType, nullable=false), StructField("requestId", LongType), StructField("requestMethod", StringType), StructField("scheme", StringType), StructField("host", StringType), StructField("headers", MapType(StringType, StringType, valueContainsNull=false)), StructField("path", StringType), StructField("sessionId", StringType), StructField("userAgent", StringType) )) val Response = StructType(Seq( StructField("timestamp", TimestampType, nullable=false), StructField("requestId", LongType), StructField("contentType", StringType), StructField("contentLength", IntegerType), StructField("statusCode", StringType), StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)), StructField("responseDuration", DoubleType), StructField("sessionId", StringType) )) 

I got this part of the job perfectly, however, trying to get the data back to S3 as efficiently as possible seems like a problem.

I tried 3 approaches:

  • muxPartitions from silex project
  • caching the parsed input S3 and cyclic switching to it several times
  • creating each schema like a separate RDD partition

In the first case, the JVM ran out of memory, and in the second, on disk.

The third one, which I have not yet fully verified, but this does not seem to be efficient use of computing power (since only one cluster node (the one on which this particular partition) will actually write data back to S3).

Relevant Code:

 val allSchemes = Schemes.all().keys.toArray if (false) { import com.realo.warehouse.multiplex.implicits._ val input = readRawFromS3(inputPrefix) // returns RDD[Row] .flatMuxPartitions(allSchemes.length, data => { val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] } data.foreach { logItem => { val schemeIndex = allSchemes.indexOf(logItem.logType) if (schemeIndex > -1) { buffers(schemeIndex).append(logItem.row) } } } buffers }) allSchemes.zipWithIndex.foreach { case (schemeName, index) => val rdd = input(index) writeColumnarToS3(rdd, schemeName) } } else if (false) { // Naive approach val input = readRawFromS3(inputPrefix) // returns RDD[Row] .persist(StorageLevel.MEMORY_AND_DISK) allSchemes.foreach { schemeName => val rdd = input .filter(x => x.logType == schemeName) .map(x => x.row) writeColumnarToS3(rdd, schemeName) } input.unpersist() } else { class CustomPartitioner extends Partitioner { override def numPartitions: Int = allSchemes.length override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String]) } val input = readRawFromS3(inputPrefix) .map(x => (x.logType, x.row)) .partitionBy(new CustomPartitioner()) .map { case (logType, row) => row } .persist(StorageLevel.MEMORY_AND_DISK) allSchemes.zipWithIndex.foreach { case (schemeName, index) => val rdd = input .mapPartitionsWithIndex( (i, iter) => if (i == index) iter else Iterator.empty, preservesPartitioning = true ) writeColumnarToS3(rdd, schemeName) } input.unpersist() } 

Conceptually, I think that the code should have 1 DStream output for each type of circuit, and the input RDD should choose 'n to place each processed element on the correct DStream (with batch loading for better throughput).

Does anyone have directions on how to implement this? And / or is there a better way to solve this problem?

+6
source share
2 answers

This is what I came with over time:

I use a custom delimiter to separate data based on their schema plus the hash of the string.

The idea here is that we want to be able to process only certain sections, but still allow all nodes to participate (for performance reasons). Thus, we do not extend the data to only one partition, but over X partitions (and X is the number of times 2 nodes in this example).

Then, for each scheme, we cut off sections that we do not need, and therefore we will process only those that we do.

Code example:

 def process(date : ReadableInstant, schemesToProcess : Array[String]) = { // Tweak this based on your use case val DefaultNumberOfStoragePartitions = spark.sparkContext.defaultParallelism * 2 class CustomPartitioner extends Partitioner { override def numPartitions: Int = schemesToProcess.length * DefaultNumberOfStoragePartitions override def getPartition(key: Any): Int = { // This is tightly coupled with how `input` gets transformed below val (logType, rowHashCode) = key.asInstanceOf[(String, Int)] (schemesToProcess.indexOf(logType) * DefaultNumberOfStoragePartitions) + Utils.nonNegativeMod(rowHashCode, DefaultNumberOfStoragePartitions) } /** * Internal helper function to retrieve all partition indices for the given key * @param key input key * @return */ private def getPartitions(key: String): Seq[Int] = { val index = schemesToProcess.indexOf(key) * DefaultNumberOfStoragePartitions index until (index + DefaultNumberOfStoragePartitions) } /** * Returns an RDD which only traverses the partitions for the given key * @param rdd base RDD * @param key input key * @return */ def filterRDDForKey[T](rdd: RDD[T], key: String): RDD[T] = { val partitions = getPartitions(key).toSet PartitionPruningRDD.create(rdd, x => partitions.contains(x)) } } val partitioner = new CustomPartitioner() val input = readRawFromS3(date) .map(x => ((x.logType, x.row.hashCode), x.row)) .partitionBy(partitioner) .persist(StorageLevel.MEMORY_AND_DISK_SER) // Initial stage: caches the processed data + gets an enumeration of all schemes in this RDD val schemesInRdd = input .map(_._1._1) .distinct() .collect() // Remaining stages: for each scheme, write it out to S3 as ORC schemesInRdd.zipWithIndex.foreach { case (schemeName, index) => val rdd = partitioner.filterRDDForKey(input, schemeName) .map(_._2) .coalesce(DefaultNumberOfStoragePartitions) writeColumnarToS3(rdd, schemeName) } input.unpersist() } 
0
source

Given that the input is json, you can read it in the data frame of the lines (each line is one line). Then you can extract the type from each json (either using UDF or using a function like get_json_object or json_tuple).

Now you have two columns: Type and original json. Now you can use the dataframe parameter for shared partitions when writing data. This will result in a directory for each type, and the contents of the directory will include the original jsons.

Now you can read each type with its own scheme.

You can also do a similar thing with RDD using a card that turns the rdd input into a rdd pair, with the key being a type and the value being json converted to the target circuit. Then you can use the partitionBy and map sections to save each section in a file, or you can use the shortcut on the keys to write to different files (for example, using the key to set the file name).

You can also take a look at Recording on multiple outputs using the Spark key - one Spark job

Please note that I suggested that the goal is file splitting. Depending on your specific use case, other options may be viable. For example, if your different schemes are close enough, you can create a super-scheme that covers all of them and create data from it. Then you can either directly work with the file frame or use the dataframeBy section to write different subtypes to different directories (but this time they are already saved to the parquet).

0
source

Source: https://habr.com/ru/post/1013155/


All Articles