Kafka for several sections

I know that it is impossible to order several sections in Kafka, and this ordering of sections is guaranteed for only one consumer within the group (for one section). However with Kafka Streams 0.10 is this now possible? If we use the timestamp function so that each message in each section supports the order, on the consumer side, say, with Kafka Streams 0.10, is this now possible? Assuming we get all the messages, can we sort all the sections based on the consumed timestamp and possibly forward them to a separate topic for consumption?

Right now I need to maintain order, but that means there is one section with one user thread. I wanted to change this into several sections to increase parallelism, but somehow "tidy them up."

Any thoughts? thank.

+4
source share
2 answers

There are two problems in this situation:

  • A Kafka theme with multiple sections and the fact that Kafka does not guarantee global ordering (themes) for such multi-section sections.
  • The ability to receive arrival / failure messages for a topic and its sections, which is associated with time and time stamps.

, Kafka, ( ). Kafka Streams 0,10 ?

: , , Kafka .

, " " " ". .

, , max.in.flight.requests.per.connection == 1:

Apache Kafka: max.in.flight.requests.per.connection ( : 5): , . , 1, , - (.. ).

, ( ) .

, , , , Kafka Streams 0,10 ?

timestamp " ". ? - / .

, . ( - ):

Partition offsets     0    1    2    3    4    5    6    7    8
Timestamps            15   16   16   17   15   18   18   19   17
                                          ^^
                                         oops, late-arriving data!

/ ? , , , . , , . , "". : /, , , , ..

, , , , ?

, . " " ( , , -, ). , " " - - . , ? / "" ( ) ( , )? - , " ", , .

+8

Kafka, .

- , , , .

private List<List<ConsumerRecord<String, String>>> orderPartitions(ConsumerRecords<String, String> events) {

    Set<TopicPartition> pollPartitions = events.partitions();
    List<List<ConsumerRecord<String, String>>> orderEvents = new ArrayList<>();
    for (TopicPartition tp : pollPartitions) {
        orderEvents.add(events.records(tp));
    }
    // order the list by the first event, each list is ordered internally also
    orderEvents.sort(new PartitionEventListComparator());
    return orderEvents;
}

/**
 * Used to sort the topic partition event lists so we get them in order
 */
private class PartitionEventListComparator implements Comparator<List<ConsumerRecord<String, String>>> {

    @Override
    public int compare(List<ConsumerRecord<String, String>> list1, List<ConsumerRecord<String, String>> list2) {
        long c1 = list1.get(0).timestamp();
        long c2 = list2.get(0).timestamp();
        if (c1 < c2) {
            return -1;
        } else if (c1 > c2) {
            return 1;
        }

        return 0;
    }


}

, - , .

                ConsumerRecords<String, String> events = consumer.poll(500);
                int totalEvents = events.count();
                log.debug("Polling topic - recieved " + totalEvents + " events");
                if (totalEvents == 0) {
                    break;  // no more events
                }

                List<List<ConsumerRecord<String, String>>> orderEvents = orderPartitions(events);

                int cnt = 0;
                // Each list is removed when it is no longer needed
                while (!orderEvents.isEmpty() && sent < max) {
                    for (int j = 0; j < orderEvents.size(); j++) {
                        List<ConsumerRecord<String, String>> subList = orderEvents.get(j);
                        // The list contains no more events, or none in our time range, remove it
                        if (subList.size() < cnt + 1) {
                            orderEvents.remove(j);
                            log.debug("exhausted partition - removed");
                            j--;
                            continue;
                        }
                        ConsumerRecord<String, String> event = subList.get(cnt);
                        cnt++
}
0

Source: https://habr.com/ru/post/1655074/


All Articles