如何使用PySpark转换结构化流? [英] How to transform structured streams with PySpark?
问题描述
这看起来应该很明显,但是在查看文档和示例时,我不确定是否可以找到一种方法来获取结构化流并使用PySpark进行转换.
This seems like it should be obvious, but in reviewing the docs and examples, I'm not sure I can find a way to take a structured stream and transform using PySpark.
例如:
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('StreamingWordCount')
.getOrCreate()
)
raw_records = (
spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()
)
# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()
counts = (
records
.groupBy(records.value)
.count()
)
query = (
counts
.writeStream
.outputMode('complete')
.format('console')
.start()
)
query.awaitTermination()
这将引发以下异常:
Queries with streaming sources must be executed with writeStream.start
但是,如果我取消对rdd.map(...).toDF()
的调用,则一切似乎正常.
However, if I remove the call to rdd.map(...).toDF()
things seem to work fine.
好像对rdd.map
的调用从流上下文中分支了执行,并导致Spark警告它从未启动?
Seems as though the call to rdd.map
branched execution from the streaming context and causes Spark to warn that it was never started?
是否存在使用结构化流和PySpark应用map
或mapPartition
样式转换的正确"方法?
Is there a "right" way to apply map
or mapPartition
style transformations using Structured Streaming and PySpark?
推荐答案
结构化流中应用的每个转换都必须完全包含在Dataset
世界中-对于PySpark,这意味着您只能使用DataFrame
或不支持SQL和转换为RDD
(或DStream
或本地集合).
Every transformation that is applied in Structured Streaming has to be fully contained in Dataset
world - in case of PySpark it means you can use only DataFrame
or SQL and conversion to RDD
(or DStream
or local collections) are not supported.
如果要使用纯Python代码,则必须使用UserDefinedFunction
.
If you want to use plain Python code you have to use UserDefinedFunction
.
from pyspark.sql.functions import udf
@udf
def to_upper(s)
return s.upper()
raw_records.select(to_upper("value"))
这篇关于如何使用PySpark转换结构化流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!