Spark 2.1 + Kafka 0.10 + Glow of jets.
Batch duration 30 s.
I have 13 nodes, 2 brokers, and I use 1 core for each artist in a section / section.
LocationStrategy - PreferConsistent.
When consuming 1 topic without problems, the performers always process the same topic / section (up to 24 sections are tested).
When I add another topic, some artists used to process themes / sections from one batch to another.
When the artist processes the same section / section (for example, 3 batches after, so 1:30 after the previous processing), I get the disassembly of my KafkaConsumer due to the request timeout (request.timeout.ms parameter) from the broker, and then my new Kafka fetch request is blocked for 40 seconds (request.timeout.ms parameter again).
2017-10-09 16:51:30.336 DEBUG [Executor task launch worker for task 315]:org.apache.spark.internal.Logging$class - Seeking to topic2-7 136136613 2017-10-09 16:51:30.336 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.KafkaConsumer - Seeking to offset 136136613 for partition topic2-7 2017-10-09 16:51:30.337 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.NetworkClient - Disconnecting from node 1005 due to request timeout. 2017-10-09 16:51:30.337 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler - Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer .internals.ConsumerNetworkClient$RequestFutureCompletionHandler@ 30ea3352, request=RequestSend(header={api_key=1,api_version=2,correlation_id=25,client_id=consumer-1}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=topic2,partitions=[{partition=7,fetch_offset=136125064,max_bytes=1048576}]}]}), createdTimeMs=1507557031875, sendTimeMs=1507557031875) with correlation id 25 due to node 1005 being disconnected 2017-10-09 16:51:30.338 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.Fetcher$1 - Fetch failed org.apache.kafka.common.errors.DisconnectException 2017-10-09 16:51:30.341 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater - Initialize connection to node 1006 for sending metadata request 2017-10-09 16:51:30.341 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.NetworkClient - Initiating connection to node 1006 at broker001.domain.loc:9092. 2017-10-09 16:51:30.342 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1006.bytes-sent 2017-10-09 16:51:30.342 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1006.bytes-received 2017-10-09 16:51:30.342 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.common.metrics.Metrics - Added sensor with name node-1006.latency 2017-10-09 16:51:30.343 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.NetworkClient - Completed connection to node 1006 2017-10-09 16:51:30.343 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler - Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer .internals.ConsumerNetworkClient$RequestFutureCompletionHandler@ 7d9e82c8, request=RequestSend(header={api_key=1,api_version=2,correlation_id=26,client_id=consumer-1}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=topic2,partitions=[{partition=7,fetch_offset=136136613,max_bytes=1048576}]}]}), createdTimeMs=1507557090341, sendTimeMs=0) with correlation id 26 due to node 1005 being disconnected 2017-10-09 16:51:30.343 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.Fetcher$1 - Fetch failed org.apache.kafka.common.errors.DisconnectException 2017-10-09 16:51:30.343 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater - Sending metadata request {topics=[topic2]} to node 1006 2017-10-09 16:51:30.344 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler - Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer .internals.ConsumerNetworkClient$RequestFutureCompletionHandler@ 4512b012, request=RequestSend(header={api_key=1,api_version=2,correlation_id=27,client_id=consumer-1}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=topic2,partitions=[{partition=7,fetch_offset=136136613,max_bytes=1048576}]}]}), createdTimeMs=1507557090343, sendTimeMs=0) with correlation id 27 due to node 1005 being disconnected 2017-10-09 16:51:30.344 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.Fetcher$1 - Fetch failed org.apache.kafka.common.errors.DisconnectException 2017-10-09 16:51:30.344 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.Metadata - Updated cluster metadata version 3 to Cluster(nodes = [broker002.domain.loc:9092 (id: 1005 rack: null), broker001.domain.loc:9092 (id: 1006 rack: null)], partitions = [Partition(topic = topic2, partition = 14, leader = 1006, replicas = [1005,1006,], isr = [1006,1005,], Partition(topic = topic2, partition = 13, leader = 1005, replicas = [1005,1006,], isr = [1005,1006,], Partition(topic = topic2, partition = 12, leader = 1006, replicas = [1005,1006,], isr = [1006,1005,], Partition(topic = topic2, partition = 11, leader = 1005, replicas = [1005,1006,], isr = [1005,1006,], Partition(topic = topic2, partition = 10, leader = 1006, replicas = [1005,1006,], isr = [1006,1005,], Partition(topic = topic2, partition = 9, leader = 1005, replicas = [1005,1006,], isr = [1005,1006,], Partition(topic = topic2, partition = 8, leader = 1006, replicas = [1005,1006,], isr = [1006,1005,], Partition(topic = topic2, partition = 7, leader = 1005, replicas = [1005,1006,], isr = [1005,1006,], Partition(topic = topic2, partition = 6, leader = 1006, replicas = [1005,1006,], isr = [1006,1005,], Partition(topic = topic2, partition = 5, leader = 1005, replicas = [1005,1006,], isr = [1005,1006,], Partition(topic = topic2, partition = 4, leader = 1006, replicas = [1005,1006,], isr = [1006,1005,], Partition(topic = topic2, partition = 3, leader = 1005, replicas = [1005,1006,], isr = [1005,1006,], Partition(topic = topic2, partition = 2, leader = 1006, replicas = [1005,1006,], isr = [1006,1005,], Partition(topic = topic2, partition = 1, leader = 1005, replicas = [1005,1006,], isr = [1005,1006,], Partition(topic = topic2, partition = 0, leader = 1006, replicas = [1005,1006,], isr = [1006,1005,]]) 2017-10-09 16:51:30.345 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler - Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer .internals.ConsumerNetworkClient$RequestFutureCompletionHandler@ 4214186f, request=RequestSend(header={api_key=1,api_version=2,correlation_id=29,client_id=consumer-1}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=topic2,partitions=[{partition=7,fetch_offset=136136613,max_bytes=1048576}]}]}), createdTimeMs=1507557090344, sendTimeMs=0) with correlation id 29 due to node 1005 being disconnected 2017-10-09 16:51:30.345 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.Fetcher$1 - Fetch failed org.apache.kafka.common.errors.DisconnectException 2017-10-09 16:51:42.942 DEBUG [LeaseRenewer: hdfs_user@master001.domain.loc :8020]:org.apache.hadoop.hdfs.LeaseRenewer - Lease renewer daemon for [] with renew id 1 executed 2017-10-09 16:52:00.293 DEBUG [IPC Client (1926664485) connection to master001.domain.loc/10.0.10.1:8020 from hdfs_user]:org.apache.hadoop.ipc.Client$Connection - IPC Client (1926664485) connection to master001.domain.loc/10.0.10.1:8020 from hdfs_user: closed 2017-10-09 16:52:00.293 DEBUG [IPC Client (1926664485) connection to master001.domain.loc/10.0.10.1:8020 from hdfs_user]:org.apache.hadoop.ipc.Client$Connection - IPC Client (1926664485) connection to master001.domain.loc/10.0.10.1:8020 from hdfs_user: stopped, remaining connections 0 2017-10-09 16:52:10.388 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler - Cancelled FETCH request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer .internals.ConsumerNetworkClient$RequestFutureCompletionHandler@ 4b954a27, request=RequestSend(header={api_key=1,api_version=2,correlation_id=30,client_id=consumer-1}, body={replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=topic2,partitions=[{partition=7,fetch_offset=136136613,max_bytes=1048576}]}]}), createdTimeMs=1507557090345, sendTimeMs=0) with correlation id 30 due to node 1005 being disconnected 2017-10-09 16:52:10.389 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.consumer.internals.Fetcher$1 - Fetch failed org.apache.kafka.common.errors.DisconnectException 2017-10-09 16:52:10.389 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.NetworkClient - Initiating connection to node 1005 at broker002.domain.loc:9092. 2017-10-09 16:52:10.390 DEBUG [Executor task launch worker for task 315]:org.apache.kafka.clients.NetworkClient - Completed connection to node 1005 2017-10-09 16:52:10.397 DEBUG [Executor task launch worker for task 315]:org.apache.spark.internal.Logging$class - Polled [topic2-7] 2603 2017-10-09 16:52:10.398 DEBUG [Executor task launch worker for task 315]:org.apache.spark.internal.Logging$class - Getting local block broadcast_13 2017-10-09 16:52:10.398 DEBUG [Executor task launch worker for task 315]:org.apache.spark.internal.Logging$class - Level for block broadcast_13 is StorageLevel(disk, memory, deserialized, 1 replicas)
What can I do to overcome this problem? Increasing the request.timeout.ms parameter does not seem to me a good solution.
I saw an option to disable the cache for Kafka Consumers, which could solve this problem, but it is available with Spark 2.2, and I can not switch to Spark 2.2.
The only solution that I see now should be to return to processing mono threads ...
Thank you for your help!
2017/10/18: update about this issue
Artist switches for processing the topic / section were associated with data locality issues. For any topic / section, the performer who was supposed to process the data locally (PROCESS_LOCAL locality level) is not available, so it was planned to execute another performer (RACK_LOCAL location level), and this performer may differ from the party in another.
My configuration was 1 core for each artist.
I changed my configuration to have 2 cores for each artist, and everything is in order, all tasks are processed locally.
If you want to process 3 themes, I have to change my configuration to 3 cores for each artist (themes are uneven, 15 sections for topic1, 3 for topic2 and 6 for topic 3, for example, with 3 themes).
1 topic, 24 topics / sections, 24 artists, 1 core per artist: OK
2 topics, 24 topics / sections, 12 artists, 2 cores for each artist: OK
3 topics, 24 topics / sections, 8 artists, 3 cores for each artist: OK
4 topics, 24 topics / sections, 6 artists, 4 cores for each artist: OK
6 topics, 24 topics / sections, 4 artists, 6 cores for each artist: KO
With 6 topics, I am starting up data localization issues again. What can I do to scale the Spark process with the number of topics?