如何在 Spark Kafka 直接流中手动提交偏移量? [英] How to manually commit offset in Spark Kafka direct streaming?

查看:38
本文介绍了如何在 Spark Kafka 直接流中手动提交偏移量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我环顾四周,但没有找到满意的答案.也许我错过了一些东西.请帮忙.

I looked around hard but didn't find a satisfactory answer to this. Maybe I'm missing something. Please help.

我们有一个使用 Kafka 主题的 Spark 流应用程序,它需要在推进 Kafka 偏移量之前确保端到端处理,例如更新数据库.这很像在流媒体系统中构建事务支持,并保证每条消息都得到处理(转换),更重要的是,输出得到处理.

We have a Spark streaming application consuming a Kafka topic, which needs to ensure end-to-end processing before advancing Kafka offsets, e.g. updating a database. This is much like building transaction support within the streaming system, and guaranteeing that each message is processed (transformed) and, more importantly, output.

我已阅读有关 Kafka DirectStreams 的信息.它说为了在 DirectStreaming 模式下进行强大的故障恢复,应该启用 Spark 检查点,这 将偏移量与检查点一起存储.但是偏移管理是在内部完成的(设置 Kafka 配置参数,如 ["auto.offset.reset", "auto.commit.enable", "auto.offset.interval.ms"]).它没有说明我们如何(或是否)可以自定义提交偏移量(例如,一旦我们加载了数据库).换句话说,我们是否可以将 "auto.commit.enable" 设置为 false 并自己管理偏移量(与数据库连接不同)?

I have read about Kafka DirectStreams. It says that for robust failure-recovery in DirectStreaming mode, Spark checkpointing should be enabled, which stores the offsets along with the checkpoints. But the offset management is done internally (setting Kafka config params like ["auto.offset.reset", "auto.commit.enable", "auto.offset.interval.ms"]). It does not speak of how (or if) we can customize committing offsets (once we've loaded a database, for e.g.). In other words, can we set "auto.commit.enable" to false and manage the offsets (not unlike a DB connection) ourselves?

非常感谢任何指导/帮助.

Any guidance/help is greatly appreciated.

推荐答案

下面的文章可能是理解该方法的良好开端.

The article below could be a good start to understand the approach.

spark-kafka-achieving-零数据丢失

更多,

文章建议直接使用zookeeper客户端,也可以换成KafkaSimpleConsumer之类的.使用 Zookeper/KafkaSimpleConsumer 的好处是监控工具依赖 Zookeper 保存的偏移量.此外,这些信息还可以保存在 HDFS 或任何其他可靠服务上.

The article suggests using zookeeper client directly, which can be replaced by something like KafkaSimpleConsumer also. The advantage of using Zookeper/KafkaSimpleConsumer is the monitoring tools that depend on Zookeper saved offset. Also the information can also be saved on HDFS or any other reliable service.

这篇关于如何在 Spark Kafka 直接流中手动提交偏移量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆