Apache Spark -- 将 UDF 的结果分配给多个数据框列 [英] Apache Spark -- Assign the result of UDF to multiple dataframe columns

查看:28
本文介绍了Apache Spark -- 将 UDF 的结果分配给多个数据框列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 pyspark,使用 spark-csv 将大型 csv 文件加载到数据框中,作为预处理步骤,我需要对其中一列(包含json 字符串).这将返回 X 个值,每个值都需要存储在自己单独的列中.

I'm using pyspark, loading a large csv file into a dataframe with spark-csv, and as a pre-processing step I need to apply a variety of operations to the data available in one of the columns (that contains a json string). That will return X values, each of which needs to be stored in their own separate column.

该功能将在 UDF 中实现.但是,我不确定如何从该 UDF 返回值列表并将这些值输入到各个列中.下面是一个简单的例子:

That functionality will be implemented in a UDF. However, I am not sure how to return a list of values from that UDF and feed these into individual columns. Below is a simple example:

(...)
from pyspark.sql.functions import udf
def udf_test(n):
    return [n/2, n%2]

test_udf=udf(udf_test)


df.select('amount','trans_date').withColumn("test", test_udf("amount")).show(4)

产生以下结果:

+------+----------+--------------------+
|amount|trans_date|                test|
+------+----------+--------------------+
|  28.0|2016-02-07|         [14.0, 0.0]|
| 31.01|2016-02-07|[15.5050001144409...|
| 13.41|2016-02-04|[6.70499992370605...|
| 307.7|2015-02-17|[153.850006103515...|
| 22.09|2016-02-05|[11.0450000762939...|
+------+----------+--------------------+
only showing top 5 rows

将 udf 返回的两个(在本例中)值存储在单独的列上的最佳方法是什么?现在它们被输入为字符串:

What would be the best way to store the two (in this example) values being returned by the udf on separate columns? Right now they are being typed as strings:

df.select('amount','trans_date').withColumn("test", test_udf("amount")).printSchema()

root
 |-- amount: float (nullable = true)
 |-- trans_date: string (nullable = true)
 |-- test: string (nullable = true)

推荐答案

无法从单个 UDF 调用创建多个顶级列,但可以创建新的 struct.它需要具有指定 returnType 的 UDF:

It is not possible to create multiple top level columns from a single UDF call but you can create a new struct. It requires an UDF with specified returnType:

from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, FloatType

schema = StructType([
    StructField("foo", FloatType(), False),
    StructField("bar", FloatType(), False)
])

def udf_test(n):
    return (n / 2, n % 2) if n and n != 0.0 else (float('nan'), float('nan'))

test_udf = udf(udf_test, schema)
df = sc.parallelize([(1, 2.0), (2, 3.0)]).toDF(["x", "y"])

foobars = df.select(test_udf("y").alias("foobar"))
foobars.printSchema()
## root
##  |-- foobar: struct (nullable = true)
##  |    |-- foo: float (nullable = false)
##  |    |-- bar: float (nullable = false)

您可以使用简单的 select 进一步扁平化架构:

You further flatten the schema with simple select:

foobars.select("foobar.foo", "foobar.bar").show()
## +---+---+
## |foo|bar|
## +---+---+
## |1.0|0.0|
## |1.5|1.0|
## +---+---+

另请参阅从 Spark 数据帧中的单个列派生多个列

这篇关于Apache Spark -- 将 UDF 的结果分配给多个数据框列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆