I have a kafka and KTable theme that listens to it.
I want to write an HTTP POST request that will traverse the current elements in ktable, perform some actions on them and write back to the topic
so basically i have:
private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
stream.cleanUp()
stream.start()
....
override def refreshTokens = {
accessTokenTable.mapValues {
new ValueMapper[String, String] {
override def apply(value: String) = {
value
}
}
}.print(token_topic_name)
}
and when I try to call this method, nothing is printed / written to the topic
What am I missing? my only choice is to write messages from ktable to hashmap and read it from there? does it skip the whole ktables point?
source
share