将Spark结构流数据框架转换为Pandas数据框架 [英] Convert Spark Structure Streaming DataFrames to Pandas DataFrame

查看:205
本文介绍了将Spark结构流数据框架转换为Pandas数据框架的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个设置自Kafka主题的Spark Streaming App,我需要使用一些Pandas Dataframe的API,但是当我尝试对其进行转换时,却得到了

I have a Spark Streaming App set up that consumes from a Kafka topic and I need to use some APIs that takes in Pandas Dataframe but when I try to convert it I get this

: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2832)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2809)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)

这是我的python代码

Here is my python code

spark = SparkSession\
    .builder\
    .appName("sparkDf to pandasDf")\
    .getOrCreate()

sparkDf = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "mytopic")\
    .option("startingOffsets", "earliest")\
    .load()


pandas_df =  sparkDf.toPandas()

query = sparkDf.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

现在我知道我正在创建流数据帧的另一个实例,但是无论我在哪里尝试使用start()和awaitTermination(),我都会遇到相同的错误.

Now I am aware I am creating another instance of a streaming Dataframe but no matter where I try to use start() and awaitTermination(), I get the same error.

有什么想法吗?

推荐答案

TL; DR 这样的操作无法正常工作.

TL;DR Such operation just cannot work.

现在我知道我正在创建流数据帧的另一个实例

Now I am aware I am creating another instance of a streaming Dataframe

好吧,问题是您真的不知道.在DataFrame上调用的toPandas在驱动程序节点的内存中创建一个简单的,本地的,未分布的熊猫DataFrame a>.

Well, the problem is that you really don't. toPandas, called on a DataFrame creates a simple, local, non-distributed Pandas DataFrame, in memory of the driver node.

它不仅与Spark没有关系,而且由于抽象本质上与结构化流不兼容-熊猫DataFrame表示一组固定的元组,而结构化流则表示无限的元组流.

It not only has nothing to do with Spark, but as an abstraction is inherently incompatible with Structured Streaming - Pandas DataFrame represents a fixed set of tuples, while structured stream represent a an infinite stream of tuples.

目前尚不清楚您要实现的目标,这可能是XY问题,但是如果您确实需要将熊猫与结构化流一起使用,则可以尝试使用pandas_udf-SCALARGROUPED_MAP变体至少与基于时间的基本触发器兼容(也可以支持其他变体,尽管某些组合显然没有任何意义,并且我不知道任何官方的兼容性列表).

It is not exactly clear what you're trying to achieve here, and it might be the XY-problem, but if you really need to use Pandas with Structured Streaming, you can try using pandas_udf - SCALAR and GROUPED_MAP variants are compatible with at least basic time based triggers (other variants might be supported as well, though some combinations clearly don't make any sense, and I am not aware of any official compatibility matrix).

这篇关于将Spark结构流数据框架转换为Pandas数据框架的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆