Kafka Stream

I have 2 kafka themes - recommendationsand clicks. The first topic has a recommendation object with a key identifier Id (called recommendationsId). Each product has a URL that the user can click.

The topic clicksreceives messages created by clicking on these product URLs recommended by the user. It has been configured so that these click messages are also entered using recommendationId.

note that

  • the relationship between recommendations and clicks is one-to-many. Recommendations can lead to multiple clicks, but a click is always associated with one recommendation.

  • each click object will have a corresponding recommendation object.

  • the click object will have a timestamp later than the recommendation object.

  • the gap between the recommendation and the corresponding clicks can be from a few seconds to several days (say, a maximum of 7 days).

My goal is to join these two topics using Kafka threads. I don't quite understand if I should use the KStream x KStream connection or the KStream x KTable connection.

I implemented a join KStream x KTableby attaching a thread clicksto a table recommendations. However, if the recommendations were generated before the joiner was launched, I will not be able to see a couple of related recommendations and recommendations, after which the click will appear after the joiner starts working.

? KStream x KStream join? , 7 , 7 ? ?

KStream x KTable join . , recommendations Click serde. String (url). URL- String recommendations Click, jointTopic.

public static void main(String[] args){
    if(args.length!=4){
      throw new RuntimeException("Expected 3 params: bootstraplist clickTopic recsTopic jointTopic");
    }

    final String booststrapList = args[0];
    final String clicksTopic = args[1];
    final String recsTopic = args[2];
    final String jointTopic = args[3];

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "my_joiner_id");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
    config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, JoinSerdes.CLICK_SERDE.getClass().getName());

    KStreamBuilder builder = new KStreamBuilder();

    // load clicks as KStream
    KStream<String, String> clicksStream = builder.stream(Serdes.String(), Serdes.String(), clicksTopic);

    // load recommendations as KTable
    KTable<String, Recommendations> recsTable = builder.table(Serdes.String(), JoinSerdes.RECS_SERDE, recsTopic);

    // join the two
    KStream<String, Click> join = clicksStream.leftJoin(recsTable, (click, recs) -> new Click(click, recs));

    // emit the join to the jointTopic
    join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

    // let the action begin
    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
  }

, ( ). , , , , , - . ?

KStream x KStream join, , , , .

+4
1

. . stream-table, ( Kafka)

  • , , (inner-) . , , , , click , null (.. ) - , , .
  • KTable , , . , " ", <recommendationsId, null> , , .
  • , / , ( ).

, 7 , 7 - .

  • , /, 7 .
  • , (.. ) (.. , ).
  • , , , " ".

. 7 , 7 . " ". , " ". , (5 7- ), 1 . 7 , ( ). , , 8 , . / , .

: stream-stream , stream-table. / .

+5

Source: https://habr.com/ru/post/1686362/


All Articles