Spark Streaming kafka 偏移管理 [英] Spark Streaming kafka offset manage

查看:48
本文介绍了Spark Streaming kafka 偏移管理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在做火花流作业,通过 kafka 消费和生产数据.我用的是directDstream,所以必须自己管理offset,我们采用redis来写和读offset.现在有一个问题,当我启动我的客户端时,我的客户端需要从redis中获取offset,而不是kafka中存在的offset它自己.如何显示我编写的代码?现在我已经在下面编写了代码:

I had been doing spark streaming jobs which consumer and produce data through kafka. I used directDstream,so I had to manage offset by myself,we adopted redis to write and read offsets.Now there is one problem,when I launched my client,my client need to get the offset from redis,not offset which exists in kafka itself.how show I write my code?Now I had written my code below:

   kafka_stream = KafkaUtils.createDirectStream(
    ssc,
    topics=[config.CONSUME_TOPIC, ],
    kafkaParams={"bootstrap.servers": config.CONSUME_BROKERS,
                 "auto.offset.reset": "largest"},
    fromOffsets=read_offset_range(config.OFFSET_KEY))

但我认为 fromOffsets 是 spark-streaming 客户端启动时的值(来自 redis),而不是运行期间.谢谢你的帮助.

But I think the fromOffsets is the value(from redis) when the spark-streaming client lauched,not during its running.thank you for helpinp.

推荐答案

如果我理解正确,您需要手动设置偏移量.我是这样做的:

If I understand you correctly you need to set your offset manually. This is how I do it:

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import TopicAndPartition

stream = StreamingContext(sc, 120) # 120 second window

kafkaParams = {"metadata.broker.list":"1:667,2:6667,3:6667"}
kafkaParams["auto.offset.reset"] = "smallest"
kafkaParams["enable.auto.commit"] = "false"

topic = "xyz"
topicPartion = TopicAndPartition(topic, 0)
fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}

kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, fromOffsets = fromOffset)

这篇关于Spark Streaming kafka 偏移管理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆