We receive data in a spark stream from Kafka. When a launch is launched in Spark Streaming, it runs only one batch, and the remaining batches begin to be queued in Kafka.
Our data is independent and can be processes in Parallel.
We tried several configurations with several executors, cores, back pressure and other configurations, but so far nothing worked. There are a lot of messages in the queue, and only one micropacket is processed at a time, and the rest remain in the queue.
We want to maximize parallelism so that no micro-package fits, because we have enough resources. So, how can we reduce time by maximizing the use of resources.

final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream(
getJavaStreamingContext(), LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME",
sparkServiceConf.getKafkaConsumeParams()));
ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId());
JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {
private static final long serialVersionUID = 1L;
@Override
public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception {
return kafkaRecord.value();
}
});
JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(byte[] asn1Data) throws Exception {
if(asn1Data.length > 0) {
try (InputStream inputStream = new ByteArrayInputStream(asn1Data);
Writer writer = new StringWriter(); ) {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
byte[] buffer = new byte[1024];
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
int len;
while((len = gzipInputStream.read(buffer)) != -1) {
byteArrayOutputStream.write(buffer, 0, len);
}
return new String(byteArrayOutputStream.toByteArray());
} catch (Exception e) {
producer.flush();
throw e;
}
}
return null;
}
});
cache.foreachRDD(new VoidFunction<JavaRDD<String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD<String> jsonRdd4DF) throws Exception {
if(!jsonRdd4DF.isEmpty()) {
Dataset<Row> json = sparkSession.read().json(jsonRdd4DF);
SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor();
airMainJsonProcessor.processAIRData(json, sparkSession);
}
}
});
getJavaStreamingContext().start();
getJavaStreamingContext().awaitTermination();
getJavaStreamingContext().stop();
The technology we use:
HDFS 2.7.1.2.5
YARN + MapReduce2 2.7.1.2.5
ZooKeeper 3.4.6.2.5
Ambari Infra 0.1.0
Ambari Metrics 0.1.0
Kafka 0.10.0.2.5
Knox 0.9.0.2.5
Ranger 0.6.0.2.5
Ranger KMS 0.6.0.2.5
SmartSense 1.3.0.0-1
Spark2 2.0.x.2.5
, :
1
num_executors=6
executor_memory=8g
executor_cores=12
100 48
2
spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12
100 8
3
spark.default.parallelism=12
num_executors=6
executor_memory=8g
executor_cores=12
100 7
4
spark.default.parallelism=16
num_executors=6
executor_memory=8g
executor_cores=12
100 10
, , , .