如何显示流式数据帧(如显示因 AnalysisException 而失败)? [英] How to display a streaming DataFrame (as show fails with AnalysisException)?

查看:22
本文介绍了如何显示流式数据帧(如显示因 AnalysisException 而失败)?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我有一些数据在 Kafka 主题中流式传输,我正在获取这些流式数据并将其放入 DataFrame.我想在 DataFrame 中显示数据:

So I have some data I'm stream in a Kafka topic, I'm taking this streaming data and placing it into a DataFrame. I want to display the data inside of the DataFrame:

import os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, DataFrame
import time
from datetime import datetime, timedelta

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'

topic_name = "my-topic"
kafka_broker = "localhost:9092"

producer = KafkaProducer(bootstrap_servers = kafka_broker)
spark = SparkSession.builder.getOrCreate()
terminate = datetime.now() + timedelta(seconds=30)

while datetime.now() < terminate:
    producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
    time.sleep(1)

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("subscribe", topic_name) \
    .load()
readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

readDF.writeStream.format("console").start()
readDF.show()

producer.close()

但是我不断收到此错误:

However I keep on getting this error:

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
...
Traceback (most recent call last):
      File "test2.py", line 30, in <module>
        readDF.show()
      File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
        print(self._jdf.showString(n, 20))
      File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
      File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
        raise AnalysisException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

我不明白为什么会发生异常,我在 show() 之前调用了 writeStream.start().我试图摆脱 selectExpr() 但这没有任何区别.有谁知道如何显示来自流的 DataFrame?我使用的是 Python 3.6.1、Kafka 0.10.2.1 和 Spark 2.2.0

I don't understand why the exception is happening, I'm calling writeStream.start() right before show(). I tried getting rid of selectExpr() but that made no difference. Does anyone know how to display a stream sourced DataFrame? I'm using Python 3.6.1, Kafka 0.10.2.1, and Spark 2.2.0

推荐答案

Streaming DataFrame 不支持 show() 方法.当您调用 start() 方法时,它会启动一个后台线程将输入数据流式传输到接收器,并且由于您使用的是 ConsoleSink,它会将数据输出到控制台.您不需要调用 show().

Streaming DataFrame doesn't support the show() method. When you call start() method, it will start a background thread to stream the input data to the sink, and since you are using ConsoleSink, it will output the data to the console. You don't need to call show().

去掉readDF.show(),然后添加一个sleep,然后就可以在控制台看到数据了,比如

Remove readDF.show() and add a sleep after that, then you should be able to see data in the console, such as

query = readDF.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()

您还需要将 startingOffsets 设置为 earliest,否则,Kafka 源将从最新的偏移量开始,并且在您的情况下不获取任何内容.

You also need to set startingOffsets to earliest, otherwise, Kafka source will just start from the latest offset and fetch nothing in your case.

readDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_broker) \
    .option("startingOffsets", "earliest") \
    .option("subscribe", topic_name) \
    .load()

这篇关于如何显示流式数据帧(如显示因 AnalysisException 而失败)?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆