Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时引发错误 [英] Spark set to read from earliest offset - throws error on attempting to consumer an offset no longer available on Kafka

查看:28
本文介绍了Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时引发错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在 Dataproc 上运行 Spark 作业,并且在尝试重新加入组并从 kafka 主题读取数据时遇到错误.我已经做了一些挖掘,但不确定是什么问题.我将 auto.offset.reset 设置为 earliest 所以它应该从最早的可用未提交偏移量读取,最初我的火花日志看起来像这样:

I am currently running a spark job on Dataproc and am getting errors trying to re-join a group and read data from a kafka topic. I have done some digging and am not sure what the issue is. I have auto.offset.reset set to earliest so it should being reading from the earliest available non-committed offset and initially my spark logs look like this :

19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-7 to offset 5555542.```

但是接下来的一行我在尝试从服务器上不存在的偏移量中读取时出错(您可以看到分区的偏移量与上面列出的不同,所以我不知道为什么它会尝试要读取该偏移量,这是下一行的错误:

But then the very next line I get an error trying to read from a nonexistent offset on the server (you can see that the offset for the partition differs from the one listed above, so I have no idea why it would be attempting to read form that offset, here is the error on the next line:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
out of range with no configured reset policy for partitions: 
{demo.topic-11=4544296}

关于为什么我的 Spark 作业不断返回到这个偏移量 (4544296) 而不是它最初输出的那个 (5553330) 的任何想法?

Any ideas to why my spark job is constantly going back to this offset (4544296), and not the one it outputs originally (5553330)?

这似乎是自相矛盾的 w a) 它所说的实际偏移量和它试图读取的偏移量 b) 说没有配置的重置策略

It seems to be contradicting itself w a) the actual offset it says its on and the one it attempts to read and b) saying no configured reset policy

推荐答案

这个答案迟了一年,但希望能帮助其他面临类似问题的人.

One year to late with this answer, but hoping to help others facing a similar issue.

通常,当消费者尝试读取不再存在的 Kafka 主题中的偏移量时,会显示此行为.偏移量不再存在,通常是因为它已被 Kafka Cleaner 删除(例如,由于保留或压缩策略).但是,消费者组仍然为 Kafka 所知,Kafka 为主题demo.topic"及其所有分区保存了组demo-group"的最新消费消息的信息.

Typically, this behavior shows when the consumer tries to read an offset in a Kafka topic that is not there anymore. The offset is not there anymore, usually because it has been removed by the Kafka Cleaner (e.g. due to retention or compaction policies). However, the Consumer Group is still known to Kafka and Kafka kept the information on the latest consumed message of the group "demo-group" for the topic "demo.topic" and all its partitions.

因此,auto.offset.reset 配置没有任何影响,因为不需要重置.相反,Kafka 了解消费者组.

Therefore, the auto.offset.reset configuration does not have any impact, because there is no need for a reset. Instead Kafka knows the Consumer Group.

此外,Fetcher 只会告诉您主题的每个分区内的最新可用偏移量.它自动意味着它实际上轮询了直到这个偏移量的所有消息.Spark 决定它为每个分区实际消耗和处理多少消息(基于例如配置 maxRatePerPartition).

In addition, the Fetcher only tells you the latest available offset within each partition of the topic. It does not automatically mean that it actually polls all messages up to this offset. Spark decides how many messages it actually consumes and processes for each partition (based on e.g. the configuration maxRatePerPartition).

要解决这个问题,您可以更改消费者组(在这种特殊情况下这可能不是您想要的)或使用

To solve this issues you could either change the Consumer Group (which is probably not what you want in this particular case) or manually reset the offsets for the consumer Group "demo-group" by using

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group demo-group --topic demo.topic --partition 11 --to-latest

根据您的要求,您可以使用该工具重置主题的每个分区的偏移量.帮助功能或文档说明了所有可用选项.

Depending on your requirement you can reset the offsets for each partition of the topic with that tool. The help function or documentation explains all available options.

这篇关于Spark 设置为从最早的偏移量读取 - 在尝试使用 Kafka 上不再可用的偏移量时引发错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆