如何在Spark Streaming应用程序中使用Kafka主题? [英] How do I consume Kafka topic inside spark streaming app?

查看:64
本文介绍了如何在Spark Streaming应用程序中使用Kafka主题?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我从Kafka主题创建流并打印其内容时

When I create a stream from Kafka topic and print its content

    import os
    os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell'
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils

    sc = SparkContext(appName="PythonStreamingKafkaWords")
    ssc = StreamingContext(sc, 10)

    lines = KafkaUtils.createDirectStream(ssc, ['sample_topic'], {"bootstrap.servers": 'localhost:9092'})

    lines.pprint()

    ssc.start()
    ssc.awaitTermination()

我得到一个空结果

    -------------------------------------------
    Time: 2019-12-07 13:11:50
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:00
    -------------------------------------------

    -------------------------------------------
    Time: 2019-12-07 13:12:10
    -------------------------------------------

同时,它可以在控制台中工作:

Meanwhile, it works in the console:

    kafka-console-consumer --topic sample_topic --from-beginning --bootstrap-server localhost:9092

正确地给我Kafka主题中所有文本行:

correctly gives me all lines of my text in Kafka topic:

    ham Ok lor... Sony ericsson salesman... I ask shuhui then she say quite gd 2 use so i considering...
    ham Ard 6 like dat lor.
    ham Why don't you wait 'til at least wednesday to see if you get your .
    ham Huh y lei...
    spam    REMINDER FROM O2: To get 2.50 pounds free call credit and details of great offers pls reply 2 this text with your valid name, house no and postcode
    spam    This is the 2nd time we have tried 2 contact u. U have won the £750 Pound prize. 2 claim is easy, call 087187272008 NOW1! Only 10p per minute. BT-national-rate.
    ham Will ü b going to esplanade fr home?
    . . . 

将数据从Kafka主题流式传输到Spark流式传输应用程序的正确方法是什么?

What is the proper way to stream data from Kafka topic into Spark streaming app?

推荐答案

在流输出中看不到任何数据的原因是,默认情况下,火花流开始从 latest 读取数据.因此,如果先启动Spark Streaming应用程序,然后将数据写入Kafka,您将在流作业中看到输出.请参考文档此处:

The reason that you are not seeing any data in streaming output is because spark streaming starts reading data from latest by default. So if you start your spark streaming application first and then write data to Kafka, you will see output in streaming job. Refer documentation here:

默认情况下,它将从每个Kafka分区的最新偏移量开始消耗

By default, it will start consuming from the latest offset of each Kafka partition

但是您也可以从主题的任何特定偏移量读取数据.看看 createDirectStream 方法

But you can also read data from any specific offset of your topic. Take a look at createDirectStream method here. It takes a dict parameter fromOffsets where you can specify the offset per partition in a dictionary.

我已经使用kafka 2.2.0,spark 2.4.3和Python 3.7.3测试了以下代码:

I have tested below code with kafka 2.2.0 and spark 2.4.3 and Python 3.7.3:

使用kafka依赖项启动 pyspark shell:

Start pyspark shell with kafka dependencies:

pyspark --packages org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.4.0

运行以下代码:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sc, 1)
topicPartion = TopicAndPartition('test',0)
fromOffset = {topicPartion: 0}

lines = KafkaUtils.createDirectStream(ssc, ['test'],{"bootstrap.servers": 'localhost:9092'}, fromOffsets=fromOffset)

lines.pprint()

ssc.start()
ssc.awaitTermination()

如果您的kafka经纪人版本为10或更高,则还应考虑使用结构化流而不是Spark流.请参阅结构化流文档此处和结构化流与Kafka集成此处.

Also you should consider using Structured Streaming instead Spark Streaming if you have kafka broker version 10 or higher. Refer Structured Streaming documentation here and Structured Streaming with Kafka integration here.

下面是在结构化流中运行的示例代码.请根据您的Kafka版本和spark版本使用jar版本.我正在将 spark 2.4.3 Scala 11 kafka 0.10 一起使用,所以使用jar spark-sql-kafka-0-10_2.11:2.4.3 .

Below is a sample code to run in Structured Streaming. Please use jar version according to your Kafka version and spark version. I am using spark 2.4.3 with Scala 11 and kafka 0.10 so using jar spark-sql-kafka-0-10_2.11:2.4.3.

启动 pyspark shell:

pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "earliest") \
  .load()


df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("console") \
  .start()

这篇关于如何在Spark Streaming应用程序中使用Kafka主题?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆