避免apache kafka消费者中重复消息的有效策略 [英] Effective strategy to avoid duplicate messages in apache kafka consumer

查看:27
本文介绍了避免apache kafka消费者中重复消息的有效策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经学习 apache kafka 一个月了.然而,我现在陷入了困境.我的用例是,我有两个或多个消费者进程在不同的机器上运行.我运行了一些测试,其中我在 kafka 服务器中发布了 10,000 条消息.然后在处理这些消息时,我杀死了一个消费者进程并重新启动它.消费者正在将处理过的消息写入文件.所以消费完成后,文件显示超过10k条消息.所以有些消息是重复的.

I have been studying apache kafka for a month now. I am however, stuck at a point now. My use case is, I have two or more consumer processes running on different machines. I ran a few tests in which I published 10,000 messages in kafka server. Then while processing these messages I killed one of the consumer processes and restarted it. Consumers were writing processed messages in a file. So after consumption finished, file was showing more than 10k messages. So some messages were duplicated.

在消费者进程中,我禁用了自动提交.消费者手动批量提交偏移量.因此,例如,如果将 100 条消息写入文件,则消费者提交偏移量.当单个消费者进程正在运行并且它崩溃并恢复时,以这种方式避免了重复.但是当不止一个消费者正在运行并且其中一个崩溃并恢复时,它会将重复的消息写入文件.

In consumer process I have disabled auto commit. Consumers manually commit offsets batch wise. So for e.g if 100 messages are written to file, consumer commits offsets. When single consumer process is running and it crashes and recovers duplication is avoided in this manner. But when more than one consumers are running and one of them crashes and recovers, it writes duplicate messages to file.

是否有任何有效的策略来避免这些重复消息?

Is there any effective strategy to avoid these duplicate messages?

推荐答案

简短的回答是,不.

您正在寻找的是恰好一次处理.虽然它通常看起来可行,但绝不应依赖它,因为总是存在警告.

What you're looking for is exactly-once processing. While it may often seem feasible, it should never be relied upon because there are always caveats.

即使为了防止重复,您也需要使用简单的消费者.这种方法的工作原理是对于每个消费者,当从某个分区消费消息时,将消费消息的分区和偏移量写入磁盘.当消费者在失败后重新启动时,从磁盘中读取每个分区最后消耗的偏移量.

Even in order to attempt to prevent duplicates you would need to use the simple consumer. How this approach works is for each consumer, when a message is consumed from some partition, write the partition and offset of the consumed message to disk. When the consumer restarts after a failure, read the last consumed offset for each partition from disk.

但即使使用这种模式,消费者也不能保证它不会在失败后重新处理消息.如果消费者消费了一条消息,然后在偏移量刷新到磁盘之前失败了怎么办?如果在处理消息之前写入磁盘,如果在实际处理消息之前写入偏移量然后失败怎么办?即使您在每条消息之后向 ZooKeeper 提交偏移量,也会存在同样的问题.

But even with this pattern the consumer can't guarantee it won't reprocess a message after a failure. What if the consumer consumes a message and then fails before the offset is flushed to disk? If you write to disk before you process the message, what if you write the offset and then fail before actually processing the message? This same problem would exist even if you were to commit offsets to ZooKeeper after every message.

不过,在某些情况下,仅一次处理更容易实现,但仅适用于某些用例.这只是要求您的偏移量与单元应用程序的输出存储在同一位置.例如,如果您编写一个对消息进行计数的消费者,通过存储每个计数的最后计数的偏移量,您可以保证该偏移量与消费者的状态同时存储.当然,为了保证只处理一次,这将要求您只消费一条消息并为每条消息只更新一次状态,这对于大多数 Kafka 消费者应用程序来说是完全不切实际的.出于性能原因,Kafka 按其性质分批消费消息.

There are some cases, though, where exactly-once processing is more attainable, but only for certain use cases. This simply requires that your offset be stored in the same location as unit application's output. For instance, if you write a consumer that counts messages, by storing the last counted offset with each count you can guarantee that the offset is stored at the same time as the consumer's state. Of course, in order to guarantee exactly-once processing this would require that you consume exactly one message and update the state exactly once for each message, and that's completely impractical for most Kafka consumer applications. By its nature Kafka consumes messages in batches for performance reasons.

通常,如果您只是将其设计为幂等的,那么您的时间会花得更多,并且您的应用程序会更加可靠.

Usually your time will be more well spent and your application will be much more reliable if you simply design it to be idempotent.

这篇关于避免apache kafka消费者中重复消息的有效策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆