如何使用PySpark转换结构化流? [英] How to transform structured streams with PySpark?

查看:95
本文介绍了如何使用PySpark转换结构化流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这看起来应该很明显,但是在查看文档和示例时,我不确定是否可以找到一种方法来获取结构化流并使用PySpark进行转换.

This seems like it should be obvious, but in reviewing the docs and examples, I'm not sure I can find a way to take a structured stream and transform using PySpark.

例如:

from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName('StreamingWordCount')
    .getOrCreate()
)

raw_records = (
    spark
    .readStream
    .format('socket')
    .option('host', 'localhost')
    .option('port', 9999)
    .load()
)

# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()

counts = (
    records
    .groupBy(records.value)
    .count()
)

query = (
    counts
    .writeStream
    .outputMode('complete')
    .format('console')
    .start()
)
query.awaitTermination()

这将引发以下异常:

Queries with streaming sources must be executed with writeStream.start

但是,如果我取消对rdd.map(...).toDF()的调用,则一切似乎正常.

However, if I remove the call to rdd.map(...).toDF() things seem to work fine.

好像对rdd.map的调用从流上下文中分支了执行,并导致Spark警告它从未启动?

Seems as though the call to rdd.map branched execution from the streaming context and causes Spark to warn that it was never started?

是否存在使用结构化流和PySpark应用mapmapPartition样式转换的正确"方法?

Is there a "right" way to apply map or mapPartition style transformations using Structured Streaming and PySpark?

推荐答案

结构化流中应用的每个转换都必须完全包含在Dataset世界中-对于PySpark,这意味着您只能使用DataFrame或不支持SQL和转换为RDD(或DStream或本地集合).

Every transformation that is applied in Structured Streaming has to be fully contained in Dataset world - in case of PySpark it means you can use only DataFrame or SQL and conversion to RDD (or DStream or local collections) are not supported.

如果要使用纯Python代码,则必须使用UserDefinedFunction.

If you want to use plain Python code you have to use UserDefinedFunction.

from pyspark.sql.functions import udf

@udf
def to_upper(s)
    return s.upper()

raw_records.select(to_upper("value"))

另请参见火花结构化流和Spark-Ml回归

这篇关于如何使用PySpark转换结构化流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆