I am trying to find a way to reorder messages in a topic section and send ordered messages to a new topic.
I have a Kafka publisher that sends String messages in the following format:
{system_timestamp}-{event_name}?{parameters}
eg:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
In addition, we add several messages for each message to send them to the appropriate section.
What I want to do is reorder events based on the {system-timestamp} part of the message and within a 1-minute window, because our publishers do not guarantee that messages will be sent in accordance with the value of {system-timestamp} .
For example, we can convey a message with a higher value {system-timestamp} to the topic .
I explored the Kafka Stream API and found some examples regarding message occlusion and aggregation:
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream("events");
KGroupedStream<String>, String> groupedStream = stream.groupByKey();
But what should I do next with this grouped stream? I do not see the available methods "sort () (e1, e2) → e1.compareTo (e2)", windows can also be used for methods such as aggregation (), reduce (), count (), but I think that I don’t no data manipulation needed.
How can I re-order a message in a window with 1 minute and send them to another topic?