如何在 Spark 流应用程序中使用 Kafka 主题? [英] How do I consume Kafka topic inside spark streaming app?

查看:25
本文介绍了如何在 Spark 流应用程序中使用 Kafka 主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我从 Kafka 主题创建流并打印其内容时

When I create a stream from Kafka topic and print its content

    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    sc = SparkContext(appName="PythonStreamingKafkaWords")
    ssc = StreamingContext(sc, 10)

    lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})

    lines.pprint()

    ssc.start()
    ssc.awaitTermination()

我得到一个空结果

    -------------------------------------------
    Time: 2019-12-07 13:11:50
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:00
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:10
    -------------------------------------------

同时,它在控制台中工作:

Meanwhile, it works in the console:

    kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092

正确地给出了我在 Kafka 主题中的所有文本行:

correctly gives me all lines of my text in Kafka topic:

    ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
    ham Ard 6 like dat lor.
    ham Why don't you wait 'til at least wednesday to see if you get your .
    ham Huh y lei...
    spam    REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
    spam    This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
    ham Will ü b going to esplanade fr home?
    . . . 

将数据从 Kafka 主题流式传输到 Spark 流应用程序的正确方法是什么?

What is the proper way to stream data from Kafka topic into Spark streaming app?

推荐答案

您在流输出中看不到任何数据的原因是 Spark 流默认从 latest 开始读取数据.因此,如果您先启动 Spark 流应用程序,然后将数据写入 Kafka,您将在流作业中看到输出.请参阅文档此处:

The reason that you are not seeing any data in streaming output is because spark streaming starts reading data from latest by default. So if you start your spark streaming application first and then write data to Kafka, you will see output in streaming job. Refer documentation here:

默认情况下,它会从每个 Kafka 分区的最新偏移量开始消费

By default, it will start consuming from the latest offset of each Kafka partition

但您也可以从主题的任何特定偏移量读取数据.看看 createDirectStream 方法 此处.它需要一个字典参数 fromOffsets,您可以在其中指定字典中每个分区的偏移量.

But you can also read data from any specific offset of your topic. Take a look at createDirectStream method here. It takes a dict parameter fromOffsets where you can specify the offset per partition in a dictionary.

我已经使用 kafka 2.2.0 和 spark 2.4.3 和 Python 3.7.3 测试了以下代码:

I have tested below code with kafka 2.2.0 and spark 2.4.3 and Python 3.7.3:

使用 kafka 依赖项启动 pyspark shell:

Start pyspark shell with kafka dependencies:

pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0

运行以下代码:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}

lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)

lines.pprint()

ssc.start()
ssc.awaitTermination()

此外,如果您有 kafka 代理版本 10 或更高版本,您应该考虑使用结构化流而不是 Spark 流.请参阅结构化流文档此处和结构化流与 Kafka 集成此处.

Also you should consider using Structured Streaming instead Spark Streaming if you have kafka broker version 10 or higher. Refer Structured Streaming documentation here and Structured Streaming with Kafka integration here.

以下是在结构化流中运行的示例代码.请根据您的 Kafka 版本和 Spark 版本使用 jar 版本.我使用 spark 2.4.3Scala 11kafka 0.10 所以使用 jar spark-sql-kafka-0-10_2.11:2.4.3.

Below is a sample code to run in Structured Streaming. Please use jar version according to your Kafka version and spark version. I am using spark 2.4.3 with Scala 11 and kafka 0.10 so using jar spark-sql-kafka-0-10_2.11:2.4.3.

启动pyspark shell:

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load()


df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("console") \
  .start()

这篇关于如何在 Spark 流应用程序中使用 Kafka 主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆