重启Kafka Connect S3 Sink任务丢位置,完全改写一切 [英] Restarting Kafka Connect S3 Sink Task Loses Position, Completely Rewrites everything

查看:22
本文介绍了重启Kafka Connect S3 Sink任务丢位置,完全改写一切的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

重新启动一个Kafka Connect S3 sink任务后,它从主题的开头一路重新开始写入,并写入了旧记录的重复副本.换句话说,Kafka Connect 似乎失去了它的位置.

After restarting a Kafka Connect S3 sink task, it restarted writing all the way from the beginning of the topic and wrote duplicate copies of older records. In other words, Kafka Connect seemed to lose its place.

所以,我想象 Kafka Connect 将当前偏移位置信息存储在内部 connect-offsets 主题中.该主题是空的,我认为这是问题的一部分.

So, I imagine that Kafka Connect stores current offset position information in the internal connect-offsets topic. That topic is empty which I presume is part of the problem.

另外两个内部主题connect-statusesconnect-configs 不为空.connect-statuses 有 52 个条目.connect-configs 有 6 个条目;我配置的两个接收器连接器中的每个连接器三个:connector-task--0commit-.

The other two internal topics connect-statuses and connect-configs are not empty. connect-statuses has 52 entries. connect-configs has 6 entries; three for each of two sink connectors I have configured: connector-<name>, task-<name>-0, commit-<name>.

在运行此之前,我手动创建了文档中指定的内部 Kafka Connect 主题:

I manually created the internal Kafka Connect topics as specified in the docs before running this:

/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
/usr/bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-statuses --replication-factor 3 --partitions 10 --config cleanup.policy=compact

我可以验证 connect-offsets 主题似乎已正确创建:

I can verify that the connect-offsets topic seems to be created correctly:

/usr/bin/kafka-topics --zookeeper localhost:2181 --describe --topic connect-offsets
Topic:connect-offsets   PartitionCount:50   ReplicationFactor:3 Configs:cleanup.policy=compact
    Topic: connect-offsets  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: connect-offsets  Partition: 1    Leader: 2   Replicas: 2,3,1 Isr: 2,3,1
    Topic: connect-offsets  Partition: 2    Leader: 3   Replicas: 3,1,2 Isr: 3,1,2
  <snip>

这是一个三服务器集群,运行 Confluent Platform v3.2.1,运行 Kafka 10.2.1.

This is with a three server cluster running Confluent Platform v3.2.1 running Kafka 10.2.1.

connect-offsets 应该是空的吗?重新启动任务时Kafka Connect为什么会在主题开头重新启动?

Is connect-offsets supposed to be empty? Why else would Kafka Connect restart at the beginning of the topic when restarting a task?

更新:对 Randall Hauch 的回答的回应.

UPDATE: Response to Randall Hauch's answer.

  • 关于源连接器偏移与接收器连接器偏移的解释解释了空的connect-offsets.谢谢解释!
  • 我绝对不会更改连接器名称.
  • 如果连接器关闭约五天,然后重新启动,连接器偏移位置是否会过期并重置?我看到 __consumer_offsetscleanup.policy=compact
  • auto.offset.reset 应该只在 __consumer_offsets 中没有位置时才起作用,对吗?
  • Explanation regarding source connector offsets vs sink connector offsets explains empty connect-offsets. Thanks for explanation!
  • I'm definitely not changing connector name.
  • If the connector is down for ~five days and restarted afterwards, is there any reason that the connector offset position would expire and reset? I see __consumer_offsets has cleanup.policy=compact
  • auto.offset.reset should only take affect if there is no position in __consumer_offsets, right?

我主要使用系统默认值.我的接收器配置 JSON 如下.我正在使用一个非常简单的自定义分区器在 Avro 日期时间字段而不是挂钟时间上进行分区.该功能似乎已在 Confluent v3.2.2 中添加,因此我不需要该功能的自定义插件.我希望在可用时跳过 Confluent v3.2.2 并直接进入 v3.3.0.

I'm using mostly system defaults. My Sink config JSON is as follows. I'm using a very simple custom partitioner to partition on an Avro datetime field rather than wallclock time. That feature seems to have been added in Confluent v3.2.2 so that I won't need a custom plugin for that functionality. I'm hoping to skip Confluent v3.2.2 and go straight to v3.3.0 when it is available.

{
  "name": "my-s3-sink",

  "tasks.max": 1,
  "topics": "my-topic",
  "flush.size": 10000,

  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.TimeBasedSchemaGenerator",
  "partitioner.class": "mycompany.partitioner.TimeFieldPartitioner",

  "s3.bucket.name": "my-bucket",
  "s3.region": "us-west-2",

  "partition.field.name": "timestamp",

  "locale": "us",
  "timezone": "UTC",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",

  "schema.compatibility": "NONE",

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://localhost:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://localhost:8081"
}

推荐答案

Kafka 消费者的默认偏移量保留期为 24 小时(1440 分钟).如果您停止连接器并因此超过 24 小时没有进行新的提交,您的偏移量将过期,并且您将在重新启动时作为新的消费者重新开始.您可以使用 offsets.retention.minutes 参数

The default offset retention period for Kafka consumers is 24 hours (1440 minutes). If you stop a connector and therefore make no new commits for longer than 24 hours your offsets will expire and you will start over as a new consumer when you restart. You can modify the retention period on the __consumer_offsets topic using the offsets.retention.minutes parameter

这篇关于重启Kafka Connect S3 Sink任务丢位置,完全改写一切的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆