pySpark Kafka Direct Streaming更新Zookeeper/Kafka偏移 [英] pySpark Kafka Direct Streaming update Zookeeper / Kafka Offset
问题描述
目前,我正在与Kafka/Zookeeper和pySpark(1.6.0)合作.
我已经成功创建了一个使用KafkaUtils.createDirectStream()
的kafka用户.
currently I'm working with Kafka / Zookeeper and pySpark (1.6.0).
I have successfully created a kafka consumer, which is using the KafkaUtils.createDirectStream()
.
所有流媒体传输都没有问题,但是我意识到,在我消费了一些消息之后,我的Kafka主题不会更新为当前偏移量.
There is no problem with all the streaming, but I recognized, that my Kafka Topics are not updated to the current offset, after I have consumed some messages.
由于我们需要更新主题才能在此处进行监视,所以这有点奇怪.
Since we need the topics updated to have a monitoring here in place this is somehow weird.
在Spark文档中,我找到了以下注释:
In the documentation of Spark I found this comment:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
如果要基于Zookeeper的Kafka监视工具显示流式应用程序的进度,则可以使用此方法自己更新Zookeeper.
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
我在Scala中找到了一个解决方案,但是找不到与python等效的解决方案. 这是Scala示例: http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
I found a solution in Scala, but I can't find an equivalent for python. Here is the Scala example: http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
但是问题是,从那以后我如何更新Zookeeper?
But the question is, how I'm able to update the zookeeper from that point on?
推荐答案
我编写了一些函数来使用python kazoo 库.
I write some functions to save and read Kafka offsets with python kazoo library.
获取Kazoo客户端单例的第一个功能:
First function to get singleton of Kazoo Client:
ZOOKEEPER_SERVERS = "127.0.0.1:2181"
def get_zookeeper_instance():
from kazoo.client import KazooClient
if 'KazooSingletonInstance' not in globals():
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
globals()['KazooSingletonInstance'].start()
return globals()['KazooSingletonInstance']
然后用于读取和写入偏移量:
Then functions to read and write offsets:
def read_offsets(zk, topics):
from pyspark.streaming.kafka import TopicAndPartition
from_offsets = {}
for topic in topics:
for partition in zk.get_children(f'/consumers/{topic}'):
topic_partion = TopicAndPartition(topic, int(partition))
offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
from_offsets[topic_partion] = offset
return from_offsets
def save_offsets(rdd):
zk = get_zookeeper_instance()
for offset in rdd.offsetRanges():
path = f"/consumers/{offset.topic}/{offset.partition}"
zk.ensure_path(path)
zk.set(path, str(offset.untilOffset).encode())
然后,在开始流式传输之前,您可以阅读zookeeper的偏移并将其传递给
Then before starting streaming you could read offsets from zookeeper and pass them to createDirectStream
for fromOffsets
argument.:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
sc = SparkContext(appName="PythonStreamingSaveOffsets")
ssc = StreamingContext(sc, 2)
zk = get_zookeeper_instance()
from_offsets = read_offsets(zk, topics)
directKafkaStream = KafkaUtils.createDirectStream(
ssc, topics, {"metadata.broker.list": brokers},
fromOffsets=from_offsets)
directKafkaStream.foreachRDD(save_offsets)
if __name__ == "__main__":
main()
这篇关于pySpark Kafka Direct Streaming更新Zookeeper/Kafka偏移的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!