Spark设置为从最早的偏移量读取-尝试使用Kafka不再可用的偏移量时引发错误 [英] Spark set to read from earliest offset - throws error on attempting to consumer an offset no longer available on Kafka

查看:592
本文介绍了Spark设置为从最早的偏移量读取-尝试使用Kafka不再可用的偏移量时引发错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前在Dataproc上运行Spark作业,并且在尝试重新加入组并从kafka主题读取数据时遇到错误.我做了一些挖掘工作,不确定是什么问题.我已经将auto.offset.reset设置为earliest,因此应该从最早可用的未提交偏移量中进行读取,并且最初我的火花日志如下所示:

I am currently running a spark job on Dataproc and am getting errors trying to re-join a group and read data from a kafka topic. I have done some digging and am not sure what the issue is. I have auto.offset.reset set to earliest so it should being reading from the earliest available non-committed offset and initially my spark logs look like this :

19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-11 to offset 5553330.
19/04/29 16:30:30 INFO     
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-2 to offset 5555553.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-3 to offset 5555484.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-4 to offset 5555586.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-5 to offset 5555502.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-6 to offset 5555561.
19/04/29 16:30:30 INFO 
org.apache.kafka.clients.consumer.internals.Fetcher: [Consumer 
clientId=consumer-1, groupId=demo-group] Resetting offset for 
partition demo.topic-7 to offset 5555542.```

但是在接下来的一行中,我尝试从服务器上不存在的偏移量读取时遇到错误(您可以看到分区的偏移量与上面列出的偏移量不同,因此我不知道为什么要尝试读取偏移量的形式,这是下一行的错误:

But then the very next line I get an error trying to read from a nonexistent offset on the server (you can see that the offset for the partition differs from the one listed above, so I have no idea why it would be attempting to read form that offset, here is the error on the next line:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
out of range with no configured reset policy for partitions: 
{demo.topic-11=4544296}

关于为什么我的火花作业会不断回到此偏移量(4544296),而不是最初输出的偏移量(5553330)的任何想法吗?

Any ideas to why my spark job is constantly going back to this offset (4544296), and not the one it outputs originally (5553330)?

它似乎与自己矛盾,原因是a)表示其打开的实际偏移量和它尝试读取的偏移量,b)表示没有配置的重置策略

It seems to be contradicting itself w a) the actual offset it says its on and the one it attempts to read and b) saying no configured reset policy

推荐答案

此答案迟到了一年,但希望能帮助其他面临类似问题的人.

One year to late with this answer, but hoping to help others facing a similar issue.

通常,此行为在消费者尝试读取Kafka主题中不再存在的偏移量时显示.偏移量不再存在,通常是因为它已被Kafka Cleaner清除(例如,由于保留或压实策略).但是,卡夫卡仍然知道消费者组,卡夫卡将有关"demo.topic"主题及其所有分区的"demo-group"组的最新消费消息保留在信息中.

Typically, this behavior shows when the consumer tries to read an offset in a Kafka topic that is not there anymore. The offset is not there anymore, usually because it has been removed by the Kafka Cleaner (e.g. due to retention or compaction policies). However, the Consumer Group is still known to Kafka and Kafka kept the information on the latest consumed message of the group "demo-group" for the topic "demo.topic" and all its partitions.

因此,auto.offset.reset配置不会产生任何影响,因为不需要重置.相反,卡夫卡认识了消费者群.

Therefore, the auto.offset.reset configuration does not have any impact, because there is no need for a reset. Instead Kafka knows the Consumer Group.

此外,Fetcher仅告诉您主题的每个分区内的最新可用偏移量.它不是 not 自动表示实际上轮询所有消息直到此偏移量. Spark决定每个分区实际消耗和处理多少消息(基于配置maxRatePerPartition).

In addition, the Fetcher only tells you the latest available offset within each partition of the topic. It does not automatically mean that it actually polls all messages up to this offset. Spark decides how many messages it actually consumes and processes for each partition (based on e.g. the configuration maxRatePerPartition).

要解决此问题,您可以更改使用者组(在特定情况下可能不是您想要的),或者使用手动重置使用者组演示组"的偏移量

To solve this issues you could either change the Consumer Group (which is probably not what you want in this particular case) or manually reset the offsets for the consumer Group "demo-group" by using

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group demo-group --topic demo.topic --partition 11 --to-latest

根据您的要求,您可以使用该工具重置主题每个分区的偏移量.帮助功能或文档说明了所有可用的选项.

Depending on your requirement you can reset the offsets for each partition of the topic with that tool. The help function or documentation explains all available options.

这篇关于Spark设置为从最早的偏移量读取-尝试使用Kafka不再可用的偏移量时引发错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆