Kafka streams.allMetadata () method returns an empty list

So, I am trying to get interactive queries working with Kafka threads. I have Zookeeper and Kafka working locally (on windows). Where I use C: \ temp as the storage folder for both Zookeeper and Kafka.

I set the theme as follows

kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-submit-topic kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-output-topic 

The reading I did around this issue

I read this documentation page: http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application

I also read a Java example here: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample .java

And also read this similar entry, which initially sounded like the same problem as me: Unable to access KTable from another application as a StateStore

So this is my setup. So what is the problem?

So, as I said, I'm trying to create my own application that allows interactive requests using custom Akka Http REST Api (RPC calls as recommended) so that I can request my KTable . The actual thread processing seems to be going as expected, and I can print the results of KTable , and they match what is being done on this topic.

So the storage side seems to work

The problem occurs when you try to use the Streams.allMetadata() method, where it returns an empty list.

I use

  • List item
  • Scala 2.12
  • SBT
  • Akka.Http 10.9 for REST Api
  • Kafka 11.0

Manufacturer Code

Here is the code for my producer

 package Processing.Ratings { import java.util.concurrent.TimeUnit import Entities.Ranking import Serialization.JSONSerde import Topics.RatingsTopics import scala.util.Random import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.serialization.Serdes import Utils.Settings import org.apache.kafka.clients.producer.ProducerConfig object RatingsProducerApp extends App { run() private def run(): Unit = { val jSONSerde = new JSONSerde[Ranking] val random = new Random val producerProps = Settings.createBasicProducerProperties val rankingList = List( Ranking(" jarden@here.com "," sacha@here.com ", 1.5f), Ranking(" miro@here.com "," mary@here.com ", 1.5f), Ranking(" anne@here.com "," margeret@here.com ", 3.5f), Ranking(" frank@here.com "," bert@here.com ", 2.5f), Ranking(" morgan@here.com "," ruth@here.com ", 1.5f)) producerProps.put(ProducerConfig.ACKS_CONFIG, "all") System.out.println("Connecting to Kafka cluster via bootstrap servers " + s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}") // send a random string from List event every 100 milliseconds val rankingProducer = new KafkaProducer[String, Array[Byte]]( producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer) //while (true) { for (i <- 0 to 10) { val ranking = rankingList(random.nextInt(rankingList.size)) val rankingBytes = jSONSerde.serializer().serialize("", ranking) System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}") rankingProducer.send(new ProducerRecord[String, Array[Byte]]( RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes)) Thread.sleep(100) } Runtime.getRuntime.addShutdownHook(new Thread(() => { rankingProducer.close(10, TimeUnit.SECONDS) })) } } } 

Stream code

Here is the stream code

 def createRatingStreamsProperties() : Properties = { val props = createBasicStreamProperties props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application") props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client") props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass) props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) props } private def createBasicStreamProperties() : Properties = { val props = new Properties() props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers) props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass) props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) // Records should be flushed every 10 seconds. This is less than the default // in order to keep this example interactive. props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object]) // For illustrative purposes we disable record caches props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object]) props } 

And the actual code

 import java.util.Properties import java.util.concurrent.TimeUnit import org.apache.kafka.common.serialization._ import org.apache.kafka.streams._ import org.apache.kafka.streams.kstream._ import Entities.Ranking import Serialization.JSONSerde import Topics.RatingsTopics import Utils.Settings package Processing.Ratings { import Stores.StateStores import org.apache.kafka.streams.state.HostInfo class DummyRankingReducer extends Reducer[Ranking] { override def apply(value1: Ranking, value2: Ranking): Ranking = { value2 } } class RankingByEmailInitializer extends Initializer[List[Ranking]] { override def apply(): List[Ranking] = List[Ranking]() } class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] { override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = { value :: aggregate } } object RatingStreamProcessingApp extends App { run() private def run() : Unit = { val stringSerde = Serdes.String val rankingSerde = new JSONSerde[Ranking] val listRankingSerde = new JSONSerde[List[Ranking]] val builder: KStreamBuilder = new KStreamBuilder val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC) val rankingTable = rankings.groupByKey(stringSerde,rankingSerde) .aggregate( new RankingByEmailInitializer(), new RankingByEmailAggregator(), listRankingSerde, StateStores.RANKINGS_BY_EMAIL_STORE ) rankingTable.toStream.print() val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties) val restEndpoint:HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort) System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}") System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}") // Always (and unconditionally) clean local state prior to starting the processing topology. // We opt for this unconditional call here because this will make it easier for you to play around with the example // when resetting the application for doing a re-run (via the Application Reset Tool, // http://docs.confluent.io/current/streams/developer-guide.html#application-reset-tool). // // The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch, which // will take time and will require reading all the state-relevant data from the Kafka cluster over the network. // Thus in a production scenario you typically do not want to clean up always as we do here but rather only when it // is truly needed, ie, only under certain conditions (eg, the presence of a command line flag for your app). // See `ApplicationResetExample.java` for a production-like example. //streams.cleanUp(); streams.start() val restService = new RatingRestService(streams, restEndpoint) restService.start() //**************************************************************** // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE // WHY DOES METADATA NOT WORK WHEN THERE IS CLEARLY A STORE IN USE //**************************************************************** val SIZE = streams.allMetadata.size() val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size() import org.apache.kafka.streams.state.KeyValueIterator import org.apache.kafka.streams.state.QueryableStoreTypes import org.apache.kafka.streams.state.ReadOnlyKeyValueStore val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore) val range = keyValueStore.all val HASNEXT = range.hasNext import org.apache.kafka.streams.KeyValue while (range.hasNext ) { val next = range.next System.out.println(String.format("key: %s | value: %s", next.key, next.value)) } Runtime.getRuntime.addShutdownHook(new Thread(() => { streams.close(10, TimeUnit.SECONDS) restService.stop })) //return unit () } } 

}

Where is my configuration

 kafka { bootStrapServers = "localhost:9092" zooKeepers = "zookeeper:2181" schemaRegistryUrl = "http://localhost:8081" partition = 0, restApiDefaultHostName = "localhost", restApiDefaultPort = "8080" } 

REST Service

Scala example file port: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/MetadataService.java

 package Processing.Ratings import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.state.StreamsMetadata import java.util.stream.Collectors import Entities.HostStoreInfo import org.apache.kafka.common.serialization.Serializer import org.apache.kafka.connect.errors.NotFoundException import scala.collection.JavaConverters._ /** * Looks up StreamsMetadata from KafkaStreams */ class MetadataService(val streams: KafkaStreams) { /** * Get the metadata for all of the instances of this Kafka Streams application * * @return List of { @link HostStoreInfo} */ def streamsMetadata() : List[HostStoreInfo] = { // Get metadata for all of the instances of this Kafka Streams application val metadata = streams.allMetadata return mapInstancesToHostStoreInfo(metadata) } /** * Get the metadata for all instances of this Kafka Streams application that currently * has the provided store. * * @param store The store to locate * @return List of { @link HostStoreInfo} */ def streamsMetadataForStore(store: String) : List[HostStoreInfo] = { // Get metadata for all of the instances of this Kafka Streams application hosting the store val metadata = streams.allMetadataForStore(store) return mapInstancesToHostStoreInfo(metadata) } /** * Find the metadata for the instance of this Kafka Streams Application that has the given * store and would have the given key if it exists. * * @param store Store to find * @param key The key to find * @return { @link HostStoreInfo} */ def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = { // Get metadata for the instances of this Kafka Streams application hosting the store and // potentially the value for key val metadata = streams.metadataForKey(store, key, serializer) if (metadata == null) throw new NotFoundException( s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}") return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList) } def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = { metadatas.stream.map[HostStoreInfo](metadata => HostStoreInfo( metadata.host(), metadata.port, metadata.stateStoreNames.asScala.toList)) .collect(Collectors.toList()) .asScala.toList } } 

And here is the REST service (I was just trying to get an "instance" of the route that is currently working).

 package Processing.Ratings import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.state.HostInfo import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import spray.json.DefaultJsonProtocol._ import Entities.AkkaHttpEntitiesJsonFormats._ import Entities._ import akka.http.scaladsl.marshalling.ToResponseMarshallable import scala.concurrent.Future object RestService { val DEFAULT_REST_ENDPOINT_HOSTNAME = "localhost" } class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) { val metadataService = new MetadataService(streams) var bindingFuture: Future[Http.ServerBinding] = null implicit val system = ActorSystem("rating-system") implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher def start() : Unit = { val emailRegexPattern = """\w+""".r val route = path("ratingByEmail" / emailRegexPattern) { email => get { //TODO : This would come from Kafka store, either local or remote complete(ToResponseMarshallable.apply(List[Ranking]( Ranking(" fred@here.com ", " sacha@there.com ", 4.0f), Ranking(" sam@here.com ", " sacha@there.com ", 2.0f))) ) } } ~ path("instances") { get { val x = metadataService.streamsMetadata complete(ToResponseMarshallable.apply(metadataService.streamsMetadata)) } } bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port) println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n") Runtime.getRuntime.addShutdownHook(new Thread(() => { bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ => system.terminate()) // and shutdown when done })) } def stop() : Unit = { bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ => system.terminate()) // and shutdown when done } def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = { hostStoreInfo.host.equals(hostInfo.host()) && hostStoreInfo.port == hostInfo.port } } 

Here is the evidence that there is data in the repository

manufacturer works enter image description here

streams enter image description here

This I launched the 1st producer, then the threads, and then the producer (another launch).

See how the results from KTable , then I started the producer and clicked a few more posts on the topic that the threads raised

But when I request a REST endpoint to try to get metadata using localhost:8080/instances , all I get is an empty list []

enter image description here

I would expect these lines from the stream code above to return some metadata, there is clearly something in the store, so why the metadata

 val SIZE = streams.allMetadata.size() val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size() 

Both of them return 0, while iterating through items in the repository using this code

 import org.apache.kafka.streams.state.KeyValueIterator import org.apache.kafka.streams.state.QueryableStoreTypes import org.apache.kafka.streams.state.ReadOnlyKeyValueStore val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore) val range = keyValueStore.all val HASNEXT = range.hasNext import org.apache.kafka.streams.KeyValue while (range.hasNext ) { val next = range.next System.out.println(String.format("key: %s | value: %s", next.key, next.value)) } 

Produces data from the store

enter image description here

I know that REST api is working fine as the hard path of the test is working fine

enter image description here

What am I doing wrong?

+5
source share
1 answer

So, I realized what happens due to this missing configuration value

 props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080") 

After I added that the Akti Htpp REST API http://localhost:8080/instance started working. But then I started getting this weird exception

 org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, my-key-value-store, may have migrated to another instance. at org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:49) at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:55) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:699) 

So, reading about it here: http://docs.confluent.io/current/streams/faq.html#handling-invalidstatestoreexception-the-state-store-may-have-migrated-to-another-instance

I decided what I needed to do was to execute some retry logic that I liked:

Retry

Which I borrowed here: https://gist.github.com/Mortimerp9/5430595

 package Utils import scala.concurrent._ import scala.concurrent.duration._ object Retry { /** * exponential back off for retry */ def exponentialBackoff(r: Int): Duration = scala.math.pow(2, r).round * 500 milliseconds def noIgnore(t: Throwable): Boolean = false /** * retry a particular block that can fail * * @param maxRetry how many times to retry before to giveup * @param deadline how long to retry before giving up; default None * @param backoff a back-off function that returns a Duration after which to retry. default is an exponential backoff at 100 milliseconds steps * @param ignoreThrowable if you want to stop retrying on a particular exception * @param block a block of code to retry * @param ctx an execution context where to execute the block * @returns an eventual Future succeeded with the value computed or failed with one of: * `TooManyRetriesException` if there were too many retries without an exception being caught. Probably impossible if you pass decent parameters * `DeadlineExceededException` if the retry didn't succeed before the provided deadline * `TimeoutException` if you provide a deadline and the block takes too long to execute * `Throwable` the last encountered exception */ def retry[T](maxRetry: Int, deadline: Option[Deadline] = None, backoff: (Int) => Duration = exponentialBackoff, ignoreThrowable: Throwable => Boolean = noIgnore)(block: => T)(implicit ctx: ExecutionContext): Future[T] = { class TooManyRetriesException extends Exception("too many retries without exception") class DeadlineExceededException extends Exception("deadline exceded") val p = Promise[T] def recursiveRetry(retryCnt: Int, exception: Option[Throwable])(f: () => T): Option[T] = { if (maxRetry == retryCnt || deadline.isDefined && deadline.get.isOverdue) { exception match { case Some(t) => p failure t case None if deadline.isDefined && deadline.get.isOverdue => p failure (new DeadlineExceededException) case None => p failure (new TooManyRetriesException) } None } else { val success = try { val rez = if (deadline.isDefined) { Await.result(future(f()), deadline.get.timeLeft) } else { f() } Some(rez) } catch { case t: Throwable if !ignoreThrowable(t) => blocking { val interval = backoff(retryCnt).toMillis Thread.sleep(interval) } recursiveRetry(retryCnt + 1, Some(t))(f) case t: Throwable => p failure t None } success match { case Some(v) => p success v Some(v) case None => None } } } def doBlock() = block Future { recursiveRetry(0, None)(doBlock) } p.future } } 

Which I call it

 def printStoreMetaData(streams:KafkaStreams) : Unit = { import org.apache.kafka.streams.state.KeyValueIterator import org.apache.kafka.streams.state.QueryableStoreTypes import org.apache.kafka.streams.state.ReadOnlyKeyValueStore val keyValueStoreTry = waitUntilStoreIsQueryable( StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore[String,List[Ranking]](), streams ) match { case Success(keyValueStore) => { val SIZE = streams.allMetadata.size() val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size() val range = keyValueStore.all val HASNEXT = range.hasNext import org.apache.kafka.streams.KeyValue while (range.hasNext ) { val next = range.next System.out.println(String.format("key: %s | value: %s", next.key, next.value)) } } case Failure(f) => println(f) } } 

After that, all my happy days are for me.

+5
source

Source: https://habr.com/ru/post/1271534/


All Articles