Failed to create a state store in Kafka flows

I get this error Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1when creating a state store in kafka streams application. Here is a complete application stack trace

[2016-08-30 12:43:09,408] ERROR [StreamThread-1] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group string-monitor failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.io.IOException: Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 32 more
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
    ... 1 more
Caused by: java.io.IOException: Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 32 more

And create a state store below

 StateStoreSupplier avgStore = Stores.create("avgStore")
          .withKeys(Serdes.String())
          .withValues(Serdes.String())
          .persistent()
          .build();

Any idea how to fix this?

+4
source share
2 answers

? , Kafka, , Kafka Streams ( ), , ( , ), .

, , :

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager

Apache Kafka:

https://issues.apache.org/jira/browse/KAFKA-3758

Kafka :

https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread

Kafka Apache Kafka trunk. Kafka trunk, , .

+3

, state.dir

dir w/good permissions, :

.put(StreamsConfig.STATE_DIR_CONFIG, "{goodDir}" )

0.10.2

+2

Source: https://habr.com/ru/post/1652997/


All Articles