Apache Kafka orders window messages based on their value

I am trying to find a way to reorder messages in a topic section and send ordered messages to a new topic.

I have a Kafka publisher that sends String messages in the following format: {system_timestamp}-{event_name}?{parameters}

eg:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

In addition, we add several messages for each message to send them to the appropriate section.

What I want to do is reorder events based on the {system-timestamp} part of the message and within a 1-minute window, because our publishers do not guarantee that messages will be sent in accordance with the value of {system-timestamp} .

For example, we can convey a message with a higher value {system-timestamp} to the topic .

I explored the Kafka Stream API and found some examples regarding message occlusion and aggregation:

Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/

But what should I do next with this grouped stream? I do not see the available methods "sort () (e1, e2) → e1.compareTo (e2)", windows can also be used for methods such as aggregation (), reduce (), count (), but I think that I don’t no data manipulation needed.

How can I re-order a message in a window with 1 minute and send them to another topic?

+6
source share
2 answers

Here is a sketch:

Create a processor implementation that:

  • In the process () method for each message:

    • reads a timestamp from a message value
    • KeyValueStore, (timestamp, message-key) message-value . NB . Serde , , , .
  • punctuate():

    • , 0 - 60'000 (= 1 )
    • context.forward()

, punctuate() , , " ". , , "" (!) , , "" . KIP-138 , : https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

+5

.

  1. , , .
    1. (, ) → ( ).
    2. Init() → (WINDOW_BUFFER_TIME, WALL_CLOCK_TIME) → (timestamp) List ( ) . ( ).

.

0

Source: https://habr.com/ru/post/1677007/


All Articles