Restarting Kafka Connect S3 Sink Task Loses Position, completely rewrites everything

After restarting the Kafka Connect S3 task, he restarted all the records from the beginning of the topic and wrote duplicate copies of the old records. In other words, Kafka Connect seems to have lost its place.

So, I believe that Kafka Connect stores the current offset position information in the internal tag connect-offsets. This topic is empty, which I assume is part of the problem.

The other two internal threads connect-statusesand connect-configsare not empty. connect-statuseshas 52 entries. connect-configshas 6 entries; three for each of the two connectors for the washing, which I configured: connector-<name>, task-<name>-0, commit-<name>.

I manually created Kafka Connect internal themes as indicated in the docs before running this:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

I can verify that the theme connect-offsetsis created correctly:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

This is with three server clusters running Confluent Platform v3.2.1, working with Kafka 10.2.1.

Is it connect-offsetsempty? Why restart Kafka Connect at the beginning of the topic when restarting the task?

UPDATE : Response to Randall Howe's answer.

  • An explanation regarding the offset of the source connector and the offset of the connector connector explains empty connect-offsets. Thanks for the explanation!
  • I definitely do not change the name of the connector.
  • If the connector does not work for five days and restarts after that, is there a reason why the connector's shift time will expire and reset? I see that it __consumer_offsetshascleanup.policy=compact
  • auto.offset.resetshould only influence whether there is a position in __consumer_offsets, right?

. My Sink JSON . Avro, . , , ​​ Confluent v3.2.2, . Confluent v3.2.2 v3.3.0, .

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}
+4
2

Kafka 24 (1440 ). , , 24 , , . __consumer_offsets offsets.retention.minutes

+4

Kafka Connect connect-offsets ( ) , Kafka.

, , - . , , , , .

, Kafka Connect consumer.auto.offset.reset=earliest.

3.3.0 ( ) , , . , , , .

+5

Source: https://habr.com/ru/post/1682262/


All Articles