I have 2 kafka themes - recommendations
and clicks
. The first topic has a recommendation object with a key identifier Id (called recommendationsId
). Each product has a URL that the user can click.
The topic clicks
receives messages created by clicking on these product URLs recommended by the user. It has been configured so that these click messages are also entered using recommendationId
.
note that
the relationship between recommendations and clicks is one-to-many. Recommendations can lead to multiple clicks, but a click is always associated with one recommendation.
each click object will have a corresponding recommendation object.
the click object will have a timestamp later than the recommendation object.
the gap between the recommendation and the corresponding clicks can be from a few seconds to several days (say, a maximum of 7 days).
My goal is to join these two topics using Kafka threads. I don't quite understand if I should use the KStream x KStream connection or the KStream x KTable connection.
I implemented a join KStream x KTable
by attaching a thread clicks
to a table recommendations
. However, if the recommendations were generated before the joiner was launched, I will not be able to see a couple of related recommendations and recommendations, after which the click will appear after the joiner starts working.
? KStream x KStream
join? , 7 , 7 ? ?
KStream x KTable
join . , recommendations
Click
serde. String
(url). URL- String recommendations
Click
, jointTopic
.
public static void main(String[] args){
if(args.length!=4){
throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
}
final String booststrapList = args[0];
final String clicksTopic = args[1];
final String recsTopic = args[2];
final String jointTopic = args[3];
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);
KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);
KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));
join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
}
, ( ). , , , , , - . ?
KStream x KStream
join, , , , .