So, I am trying to get interactive queries working with Kafka threads. I have Zookeeper and Kafka working locally (on windows). Where I use C: \ temp as the storage folder for both Zookeeper and Kafka.
I set the theme as follows
kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-submit-topic kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic rating-output-topic
The reading I did around this issue
I read this documentation page: http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
I also read a Java example here: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/kafkamusic/KafkaMusicExample .java
And also read this similar entry, which initially sounded like the same problem as me: Unable to access KTable from another application as a StateStore
So this is my setup. So what is the problem?
So, as I said, I'm trying to create my own application that allows interactive requests using custom Akka Http REST Api (RPC calls as recommended) so that I can request my KTable . The actual thread processing seems to be going as expected, and I can print the results of KTable , and they match what is being done on this topic.
So the storage side seems to work
The problem occurs when you try to use the Streams.allMetadata() method, where it returns an empty list.
I use
- List item
- Scala 2.12
- SBT
- Akka.Http 10.9 for REST Api
- Kafka 11.0
Manufacturer Code
Here is the code for my producer
package Processing.Ratings { import java.util.concurrent.TimeUnit import Entities.Ranking import Serialization.JSONSerde import Topics.RatingsTopics import scala.util.Random import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.serialization.Serdes import Utils.Settings import org.apache.kafka.clients.producer.ProducerConfig object RatingsProducerApp extends App { run() private def run(): Unit = { val jSONSerde = new JSONSerde[Ranking] val random = new Random val producerProps = Settings.createBasicProducerProperties val rankingList = List( Ranking(" jarden@here.com "," sacha@here.com ", 1.5f), Ranking(" miro@here.com "," mary@here.com ", 1.5f), Ranking(" anne@here.com "," margeret@here.com ", 3.5f), Ranking(" frank@here.com "," bert@here.com ", 2.5f), Ranking(" morgan@here.com "," ruth@here.com ", 1.5f)) producerProps.put(ProducerConfig.ACKS_CONFIG, "all") System.out.println("Connecting to Kafka cluster via bootstrap servers " + s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}") // send a random string from List event every 100 milliseconds val rankingProducer = new KafkaProducer[String, Array[Byte]]( producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer) //while (true) { for (i <- 0 to 10) { val ranking = rankingList(random.nextInt(rankingList.size)) val rankingBytes = jSONSerde.serializer().serialize("", ranking) System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}") rankingProducer.send(new ProducerRecord[String, Array[Byte]]( RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes)) Thread.sleep(100) } Runtime.getRuntime.addShutdownHook(new Thread(() => { rankingProducer.close(10, TimeUnit.SECONDS) })) } } }
Stream code
Here is the stream code
def createRatingStreamsProperties() : Properties = { val props = createBasicStreamProperties props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ratings-application") props.put(StreamsConfig.CLIENT_ID_CONFIG, "ratings-application-client") props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass) props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) props } private def createBasicStreamProperties() : Properties = { val props = new Properties() props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers) props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass) props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) // Records should be flushed every 10 seconds. This is less than the default // in order to keep this example interactive. props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000.asInstanceOf[Object]) // For illustrative purposes we disable record caches props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0.asInstanceOf[Object]) props }
And the actual code
import java.util.Properties import java.util.concurrent.TimeUnit import org.apache.kafka.common.serialization._ import org.apache.kafka.streams._ import org.apache.kafka.streams.kstream._ import Entities.Ranking import Serialization.JSONSerde import Topics.RatingsTopics import Utils.Settings package Processing.Ratings { import Stores.StateStores import org.apache.kafka.streams.state.HostInfo class DummyRankingReducer extends Reducer[Ranking] { override def apply(value1: Ranking, value2: Ranking): Ranking = { value2 } } class RankingByEmailInitializer extends Initializer[List[Ranking]] { override def apply(): List[Ranking] = List[Ranking]() } class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] { override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = { value :: aggregate } } object RatingStreamProcessingApp extends App { run() private def run() : Unit = { val stringSerde = Serdes.String val rankingSerde = new JSONSerde[Ranking] val listRankingSerde = new JSONSerde[List[Ranking]] val builder: KStreamBuilder = new KStreamBuilder val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC) val rankingTable = rankings.groupByKey(stringSerde,rankingSerde) .aggregate( new RankingByEmailInitializer(), new RankingByEmailAggregator(), listRankingSerde, StateStores.RANKINGS_BY_EMAIL_STORE ) rankingTable.toStream.print() val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties) val restEndpoint:HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort) System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}") System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")
}
Where is my configuration
kafka { bootStrapServers = "localhost:9092" zooKeepers = "zookeeper:2181" schemaRegistryUrl = "http://localhost:8081" partition = 0, restApiDefaultHostName = "localhost", restApiDefaultPort = "8080" }
REST Service
Scala example file port: https://github.com/confluentinc/examples/blob/3.3.0-post/kafka-streams/src/main/java/io/confluent/examples/streams/interactivequeries/MetadataService.java
package Processing.Ratings import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.state.StreamsMetadata import java.util.stream.Collectors import Entities.HostStoreInfo import org.apache.kafka.common.serialization.Serializer import org.apache.kafka.connect.errors.NotFoundException import scala.collection.JavaConverters._ /** * Looks up StreamsMetadata from KafkaStreams */ class MetadataService(val streams: KafkaStreams) { /** * Get the metadata for all of the instances of this Kafka Streams application * * @return List of { @link HostStoreInfo} */ def streamsMetadata() : List[HostStoreInfo] = { // Get metadata for all of the instances of this Kafka Streams application val metadata = streams.allMetadata return mapInstancesToHostStoreInfo(metadata) } /** * Get the metadata for all instances of this Kafka Streams application that currently * has the provided store. * * @param store The store to locate * @return List of { @link HostStoreInfo} */ def streamsMetadataForStore(store: String) : List[HostStoreInfo] = { // Get metadata for all of the instances of this Kafka Streams application hosting the store val metadata = streams.allMetadataForStore(store) return mapInstancesToHostStoreInfo(metadata) } /** * Find the metadata for the instance of this Kafka Streams Application that has the given * store and would have the given key if it exists. * * @param store Store to find * @param key The key to find * @return { @link HostStoreInfo} */ def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = { // Get metadata for the instances of this Kafka Streams application hosting the store and // potentially the value for key val metadata = streams.metadataForKey(store, key, serializer) if (metadata == null) throw new NotFoundException( s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}") return new HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList) } def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = { metadatas.stream.map[HostStoreInfo](metadata => HostStoreInfo( metadata.host(), metadata.port, metadata.stateStoreNames.asScala.toList)) .collect(Collectors.toList()) .asScala.toList } }
And here is the REST service (I was just trying to get an "instance" of the route that is currently working).
package Processing.Ratings import org.apache.kafka.streams.KafkaStreams import org.apache.kafka.streams.state.HostInfo import akka.actor.ActorSystem import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import spray.json.DefaultJsonProtocol._ import Entities.AkkaHttpEntitiesJsonFormats._ import Entities._ import akka.http.scaladsl.marshalling.ToResponseMarshallable import scala.concurrent.Future object RestService { val DEFAULT_REST_ENDPOINT_HOSTNAME = "localhost" } class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) { val metadataService = new MetadataService(streams) var bindingFuture: Future[Http.ServerBinding] = null implicit val system = ActorSystem("rating-system") implicit val materializer = ActorMaterializer() implicit val executionContext = system.dispatcher def start() : Unit = { val emailRegexPattern = """\w+""".r val route = path("ratingByEmail" / emailRegexPattern) { email => get { //TODO : This would come from Kafka store, either local or remote complete(ToResponseMarshallable.apply(List[Ranking]( Ranking(" fred@here.com ", " sacha@there.com ", 4.0f), Ranking(" sam@here.com ", " sacha@there.com ", 2.0f))) ) } } ~ path("instances") { get { val x = metadataService.streamsMetadata complete(ToResponseMarshallable.apply(metadataService.streamsMetadata)) } } bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port) println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n") Runtime.getRuntime.addShutdownHook(new Thread(() => { bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ => system.terminate()) // and shutdown when done })) } def stop() : Unit = { bindingFuture .flatMap(_.unbind()) // trigger unbinding from the port .onComplete(_ => system.terminate()) // and shutdown when done } def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = { hostStoreInfo.host.equals(hostInfo.host()) && hostStoreInfo.port == hostInfo.port } }
Here is the evidence that there is data in the repository
manufacturer works 
streams 
This I launched the 1st producer, then the threads, and then the producer (another launch).
See how the results from KTable , then I started the producer and clicked a few more posts on the topic that the threads raised
But when I request a REST endpoint to try to get metadata using localhost:8080/instances , all I get is an empty list []

I would expect these lines from the stream code above to return some metadata, there is clearly something in the store, so why the metadata
val SIZE = streams.allMetadata.size() val SIZE2 = streams.allMetadataForStore(StateStores.RANKINGS_BY_EMAIL_STORE).size()
Both of them return 0, while iterating through items in the repository using this code
import org.apache.kafka.streams.state.KeyValueIterator import org.apache.kafka.streams.state.QueryableStoreTypes import org.apache.kafka.streams.state.ReadOnlyKeyValueStore val keyValueStore = streams.store(StateStores.RANKINGS_BY_EMAIL_STORE, QueryableStoreTypes.keyValueStore) val range = keyValueStore.all val HASNEXT = range.hasNext import org.apache.kafka.streams.KeyValue while (range.hasNext ) { val next = range.next System.out.println(String.format("key: %s | value: %s", next.key, next.value)) }
Produces data from the store

I know that REST api is working fine as the hard path of the test is working fine

What am I doing wrong?