I have two Java applications (App1, App2) to check how to access KTable from another application in an instance environment in docker.
The first application (App1) writes to KTable with the following code.
public static void main(String[] args) { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG,"gateway-service"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, ServiceTransactionSerde.class); KStreamBuilder builder = new KStreamBuilder(); KStream<String,ServiceTransaction> source = builder.stream("gateway_request_processed"); KStream<String, Long> countByApi = source.groupBy((key,value)-> value.getApiId().toString()).count("Counts").toStream(); countByApi.to(Serdes.String(), Serdes.Long(),"countByApi"); countByApi.print(); final KafkaStreams streams = new KafkaStreams(builder,props); streams.start(); System.out.println(streams.state()); System.out.println(streams.allMetadata()); System.out.println(streams.allMetadataForStore("countByApi")); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println(streams.allMetadata()); streams.close(); } })); }
When I launch my producer, I got the following output for code in App1
RUNNING [] [] [KTABLE-TOSTREAM-0000000006]: c00af5ee-3c2d-4d12-9c4b-3b55c1284dd6, 19
This shows me the state = RUNNING. Metadata is also empty for the store. But the request is processed and stored in KTable successfully (String, Long).
When I run kafka-topics.sh --list --zookeeper:2181 I get the following topics.
bash-4.3# kafka-topics.sh --list --zookeeper zookeeper:2181 __consumer_offsets countByApi gateway-Counts-changelog gateway-Counts-repartition gateway-service-Counts-changelog gateway-service-Counts-repartition gateway_request_processed
This shows me that KTable is somehow saved with new themes.
Then I have a command line application (App2) with the following code that tries to access and access this KTable as a state store ( ReadOnlyKeyValueStore ).
public static void main( String[] args ) { final Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "gateway-service-table-client"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "172.18.0.11:9092"); KStreamBuilder builder = new KStreamBuilder(); KafkaStreams streams = new KafkaStreams(builder,props); streams.cleanUp(); streams.start(); System.out.println( "Hello World!" ); System.out.println(streams.state()); ReadOnlyKeyValueStore<String,Long> keyValueStore = streams.store("countByApi", QueryableStoreTypes.keyValueStore()); final KeyValueIterator<String,Long> range = keyValueStore.all(); while(range.hasNext()){ KeyValue<String,Long> next = range.next(); System.out.println(String.format("key: %s | value: %s", next.key,next.value)); } Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println(streams.allMetadata()); streams.close(); } })); }
When I start application 2. I get an error message:
RUNNING Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, countByApi, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:60) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:728) at com.comp.streamtable.App.main(App.java:37)
Unfortunately, I only have 1 instance, and I check that the status is "RUNNING".
Note. I had to choose different application.id applications for each application, as this is another exception. Just wanted to point this out, as that might be interesting.
What will I skip here to access my KTable from another application?