After restarting the Kafka Connect S3 task, he restarted all the records from the beginning of the topic and wrote duplicate copies of the old records. In other words, Kafka Connect seems to have lost its place.
So, I believe that Kafka Connect stores the current offset position information in the internal tag connect-offsets. This topic is empty, which I assume is part of the problem.
The other two internal threads connect-statusesand connect-configsare not empty. connect-statuseshas 52 entries. connect-configshas 6 entries; three for each of the two connectors for the washing, which I configured: connector-<name>, task-<name>-0, commit-<name>.
I manually created Kafka Connect internal themes as indicated in the docs before running this:
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact
I can verify that the theme connect-offsetsis created correctly:
/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets PartitionCount:50 ReplicationFactor:3 Configs:cleanup.policy=compact
Topic: connect-offsets Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: connect-offsets Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: connect-offsets Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
<snip>
This is with three server clusters running Confluent Platform v3.2.1, working with Kafka 10.2.1.
Is it connect-offsetsempty? Why restart Kafka Connect at the beginning of the topic when restarting the task?
UPDATE : Response to Randall Howe's answer.
- An explanation regarding the offset of the source connector and the offset of the connector connector explains empty
connect-offsets. Thanks for the explanation! - I definitely do not change the name of the connector.
- If the connector does not work for five days and restarts after that, is there a reason why the connector's shift time will expire and reset? I see that it
__consumer_offsetshascleanup.policy=compact auto.offset.resetshould only influence whether there is a position in __consumer_offsets, right?
. My Sink JSON . Avro, . , , Confluent v3.2.2, . Confluent v3.2.2 v3.3.0, .
{
"name": "my-s3-sink",
"tasks.max": 1,
"topics": "my-topic",
"flush.size": 10000,
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
"partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",
"s3.bucket.name": "my-bucket",
"s3.region": "us-west-2",
"partition.field.name": "timestamp",
"locale": "us",
"timezone": "UTC",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"schema.compatibility": "NONE",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081"
}