Can I move elements in KTable from an external method

I have a kafka and KTable theme that listens to it.

I want to write an HTTP POST request that will traverse the current elements in ktable, perform some actions on them and write back to the topic

so basically i have:

private val accessTokenTable: KTable[String, String] = builder.table(token_topic_name, tokenStoreString)
    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.cleanUp()
    stream.start()

....

override def refreshTokens = {

    accessTokenTable.mapValues {
        new ValueMapper[String, String] {
            override def apply(value: String) = {
                value
            }
        }
    }.print(token_topic_name)
}

and when I try to call this method, nothing is printed / written to the topic

What am I missing? my only choice is to write messages from ktable to hashmap and read it from there? does it skip the whole ktables point?

+4
source share
2 answers

GlobalKTable, " , ", .

, , , , , .

  • KTable ( = "" KTable ). , . : try-catch-retry.
  • GlobalKTable, , GlobalKTable .

. , KTable GlobalKTable, " ", , . , KTable, GlobalKTable, - , , ( , / , , ).

, !

+1

, (rocksDB), .

: confluent

GlobalKTable, " , ", .

kafka 0.10.2.1:

    private val accessTokenTable: GlobalKTable[String, String] = builder.globalTable(token_topic_name, token_store_string)

    private val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig)
    stream.cleanUp()
    stream.start()
    val store: ReadOnlyKeyValueStore[String,String] = stream.store(token_store_string,QueryableStoreTypes.keyValueStore[String,String]())

....

    override def refreshTokensFlow = {

       store.all.asScala.map( tuple => {
       // logic goes here
           System.out.println(tuple.key + ": " + tuple.value)
       }
    }
0

Source: https://habr.com/ru/post/1685940/


All Articles