Spark Streaming: Micro-Batch Parallel Execution

We receive data in a spark stream from Kafka. When a launch is launched in Spark Streaming, it runs only one batch, and the remaining batches begin to be queued in Kafka.

Our data is independent and can be processes in Parallel.

We tried several configurations with several executors, cores, back pressure and other configurations, but so far nothing worked. There are a lot of messages in the queue, and only one micropacket is processed at a time, and the rest remain in the queue.

We want to maximize parallelism so that no micro-package fits, because we have enough resources. So, how can we reduce time by maximizing the use of resources.

enter image description here

// Start reading messages from Kafka and get DStream
final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
        getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME",
                sparkServiceConf.getKafkaConsumeParams()));

ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());

JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {
    private static final long serialVersionUID = 1L;
    @Override
    public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception {
        return kafkaRecord.value();
    }
});

    // Decode each binary message and generate JSON array
        JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public String call(byte[] asn1Data) throws Exception {
                if(asn1Data.length > 0) {
                    try (InputStream inputStream = new ByteArrayInputStream(asn1Data);
                            Writer writer = new StringWriter(); ) {


                        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
                        GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);

                        byte[] buffer = new byte[1024];
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();

                        int len;
                        while((len = gzipInputStream.read(buffer)) != -1) {
                            byteArrayOutputStream.write(buffer, 0, len);
                        }


                        return new String(byteArrayOutputStream.toByteArray());


                    } catch (Exception e) {
//                      
                        producer.flush();

                        throw e;
                    }
                } 

                return null;
            }
        });




// publish generated json gzip to kafka 
        cache.foreachRDD(new VoidFunction<JavaRDD<String>>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
                //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
                if(!jsonRdd4DF.isEmpty()) {
                    //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect());
                    Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);   

                    SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
                    airMainJsonProcessor.processAIRData(json, sparkSession);
                }

            }               
        });

        getJavaStreamingContext().start();
        getJavaStreamingContext().awaitTermination();
        getJavaStreamingContext().stop();

The technology we use:

HDFS  2.7.1.2.5 
YARN + MapReduce2  2.7.1.2.5 
ZooKeeper  3.4.6.2.5 
Ambari Infra  0.1.0 
Ambari Metrics  0.1.0 
Kafka  0.10.0.2.5 
Knox  0.9.0.2.5 
Ranger  0.6.0.2.5 
Ranger KMS  0.6.0.2.5 
SmartSense  1.3.0.0-1
Spark2  2.0.x.2.5 

, :

1

num_executors=6
executor_memory=8g
executor_cores=12

100 48

2

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100 8

3

spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12

100 7

4

spark.default.parallelism=16
num_executors=6
executor_memory=8g
executor_cores=12

100 10

, , , .

+4
4

, :

. , , , , , .

spark.streaming.concurrentjobs, . Kafka. , 4 4 , , , 3- 4-, Kafka. , .

spark.default.parallelism, . . . . . . .

- foreachPartitionAsync foreach RDD. , foreachPartition , foreachPartitionAsync , , . , . 3 .

FAIR spark.scheduler.mode , , , .

+ , .

, , , . Spark Streaming - , , .

, . , - , .

+2

parallelism,

: , . , , , ​​ . , , .

, - 10 , 100 90 .

36 2 70- 17 . , . , 4:18 100 -, .

, , . , , , ( ) . , producer.flush(), ?

+1

, Scala Futures.

, , :

, , Scala Futures:

messages.foreachRDD{ rdd =>
  val f = Future {
  //        sleep(100)
    val newRDD = rdd.map{message => 
                           val req_message = message.value()  
                           (message.value())
                        }

    println("Request messages: " + newRDD.count())         
    var resultrows = newRDD.collect()//.collectAsList() 
    processMessage(resultrows, mlFeatures: MLFeatures, conf)          
    println("Inside scala future")
    1          
  }
  f.onComplete {
    case Success(messages) => println("yay!")
    case Failure(exception) => println("On no!")
  }  
}
+1

, , - "Hello world". . , , , , , . - Spark. , .

-1

Source: https://habr.com/ru/post/1681375/


All Articles