如何使用 PySpark 转换结构化流? [英] How to transform structured streams with PySpark?

查看:31
本文介绍了如何使用 PySpark 转换结构化流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这看起来应该是显而易见的,但是在查看文档和示例时,我不确定我是否可以找到一种使用 PySpark 获取结构化流和转换的方法.

This seems like it should be obvious, but in reviewing the docs and examples, I'm not sure I can find a way to take a structured stream and transform using PySpark.

例如:

from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .appName('StreamingWordCount')
    .getOrCreate()
)

raw_records = (
    spark
    .readStream
    .format('socket')
    .option('host', 'localhost')
    .option('port', 9999)
    .load()
)

# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()

counts = (
    records
    .groupBy(records.value)
    .count()
)

query = (
    counts
    .writeStream
    .outputMode('complete')
    .format('console')
    .start()
)
query.awaitTermination()

这将抛出以下异常:

Queries with streaming sources must be executed with writeStream.start

但是,如果我删除对 rdd.map(...).toDF() 的调用,事情似乎工作正常.

However, if I remove the call to rdd.map(...).toDF() things seem to work fine.

似乎对 rdd.map 的调用从流式上下文分支执行并导致 Spark 警告它从未启动?

Seems as though the call to rdd.map branched execution from the streaming context and causes Spark to warn that it was never started?

是否有正确"的方式来使用结构化流和 PySpark 应用 mapmapPartition 样式转换?

Is there a "right" way to apply map or mapPartition style transformations using Structured Streaming and PySpark?

推荐答案

在 Structured Streaming 中应用的每个转换都必须完全包含在 Dataset 世界中 - 在 PySpark 的情况下,这意味着您可以使用仅不支持 DataFrame 或 SQL 以及转换为 RDD(或 DStream 或本地集合).

Every transformation that is applied in Structured Streaming has to be fully contained in Dataset world - in case of PySpark it means you can use only DataFrame or SQL and conversion to RDD (or DStream or local collections) are not supported.

如果你想使用纯 Python 代码,你必须使用 UserDefinedFunction.

If you want to use plain Python code you have to use UserDefinedFunction.

from pyspark.sql.functions import udf

@udf
def to_upper(s)
    return s.upper()

raw_records.select(to_upper("value"))

另见 Spark 结构化流和 Spark-Ml 回归

这篇关于如何使用 PySpark 转换结构化流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆