pySpark Kafka Direct Streaming更新Zookeeper/Kafka偏移 [英] pySpark Kafka Direct Streaming update Zookeeper / Kafka Offset

查看:311
本文介绍了pySpark Kafka Direct Streaming更新Zookeeper/Kafka偏移的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前,我正在与Kafka/Zookeeper和pySpark(1.6.0)合作. 我已经成功创建了一个使用KafkaUtils.createDirectStream()的kafka用户.

currently I'm working with Kafka / Zookeeper and pySpark (1.6.0). I have successfully created a kafka consumer, which is using the KafkaUtils.createDirectStream().

所有流媒体传输都没有问题,但是我意识到,在我消费了一些消息之后,我的Kafka主题不会更新为当前偏移量.

There is no problem with all the streaming, but I recognized, that my Kafka Topics are not updated to the current offset, after I have consumed some messages.

由于我们需要更新主题才能在此处进行监视,所以这有点奇怪.

Since we need the topics updated to have a monitoring here in place this is somehow weird.

在Spark文档中,我找到了以下注释:

In the documentation of Spark I found this comment:

   offsetRanges = []

     def storeOffsetRanges(rdd):
         global offsetRanges
         offsetRanges = rdd.offsetRanges()
         return rdd

     def printOffsetRanges(rdd):
         for o in offsetRanges:
             print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)

     directKafkaStream\
         .transform(storeOffsetRanges)\
         .foreachRDD(printOffsetRanges)

如果要基于Zookeeper的Kafka监视工具显示流式应用程序的进度,则可以使用此方法自己更新Zookeeper.

You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.

以下是文档: http://spark.apache.org/docs/1.6.0/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

我在Scala中找到了一个解决方案,但是找不到与python等效的解决方案. 这是Scala示例: http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

I found a solution in Scala, but I can't find an equivalent for python. Here is the Scala example: http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/

但是问题是,从那以后我如何更新Zookeeper?

But the question is, how I'm able to update the zookeeper from that point on?

推荐答案

我编写了一些函数来使用python kazoo 库.

I write some functions to save and read Kafka offsets with python kazoo library.

获取Kazoo客户端单例的第一个功能:

First function to get singleton of Kazoo Client:

ZOOKEEPER_SERVERS = "127.0.0.1:2181"

def get_zookeeper_instance():
    from kazoo.client import KazooClient

    if 'KazooSingletonInstance' not in globals():
        globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
        globals()['KazooSingletonInstance'].start()
    return globals()['KazooSingletonInstance']

然后用于读取和写入偏移量:

Then functions to read and write offsets:

def read_offsets(zk, topics):
    from pyspark.streaming.kafka import TopicAndPartition

    from_offsets = {}
    for topic in topics:
        for partition in zk.get_children(f'/consumers/{topic}'):
            topic_partion = TopicAndPartition(topic, int(partition))
            offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
            from_offsets[topic_partion] = offset
    return from_offsets

def save_offsets(rdd):
    zk = get_zookeeper_instance()
    for offset in rdd.offsetRanges():
        path = f"/consumers/{offset.topic}/{offset.partition}"
        zk.ensure_path(path)
        zk.set(path, str(offset.untilOffset).encode())

然后,在开始流式传输之前,您可以阅读zookeeper的偏移并将其传递给

Then before starting streaming you could read offsets from zookeeper and pass them to createDirectStream for fromOffsets argument.:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
    sc = SparkContext(appName="PythonStreamingSaveOffsets")
    ssc = StreamingContext(sc, 2)

    zk = get_zookeeper_instance()
    from_offsets = read_offsets(zk, topics)

    directKafkaStream = KafkaUtils.createDirectStream(
        ssc, topics, {"metadata.broker.list": brokers},
        fromOffsets=from_offsets)

    directKafkaStream.foreachRDD(save_offsets)


if __name__ == "__main__":
    main()

这篇关于pySpark Kafka Direct Streaming更新Zookeeper/Kafka偏移的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆