如何使用变换高阶函数? [英] How to use transform higher-order function?

查看:19
本文介绍了如何使用变换高阶函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是关于 transform 高阶函数(https://issues.apache.org/jira/browse/SPARK-23908).

有没有办法将它用作标准函数(在包 org.apache.spark.sql.functions._ 中)?

我有一个字符串数组,我想对每个字符串应用 URI 规范化.现在我用 UDF 做到了.我只是用 spark 2.4.0 跳过了 UDF.

正如我所见,它应该在 selectExpr 中使用,如 df.selectExpr("transform(i, x -> x + 1)") 但它只是打算与 selectExpr 一起使用吗?

以这种方式使用它是否可以为转换提供自定义函数?有什么方法可以实现它还是我应该求助于使用好的旧 UDF?

解决方案

有没有办法将它用作位于包 org.apache.spark.sql.functions._ 中的标准函数?

目前它仅用于 SQL 表达式,但如果您想返回 Column,请使用 expr:

org.apache.spark.sql.functions._expr("transform(i, x -> x + 1)"): 列

<块引用>

以这种方式使用它是否可以为转换提供自定义函数?

可以使用 Scala UDF*:

spark.udf.register("f", (x: Int) => x + 1)Seq((1, Seq(1, 2, 3))).toDF("id", "xs").withColumn("xsinc", expr("transform(xs, x -> f(x))")).展示

+---+---------+---------+|身份证|xs|xsinc|+---+----------+---------+|1|[1, 2, 3]|[2, 3, 4]|+---+----------+---------+

尽管与采用 Seq 的 UDF 相比,它似乎没有提供任何真正的好处.

<小时>

* 对 Python UDFs 的部分支持似乎已经到位(udfs 被识别,类型被正确派生,调用被调度),但从 2.4.0 开始,序列化机制似乎被破坏(所有记录都是作为 None) 传递给 UDF:

from 输入 import 可选从 pyspark.sql.functions 导入 expr版本

'2.4.0'

def f(x: Optional[int]) ->可选[int]:如果 x 不是,则返回 x + 1 None else Nonespark.udf.register('f', f, "整数")df =(火花.createDataFrame([(1, [1, 2, 3])], ("id", "xs")).withColumn("xsinc", expr("transform(xs, x -> f(x))")))df.printSchema()

root|-- id: long (nullable = true)|-- xs: 数组(可为空 = 真)||-- 元素:long (containsNull = true)|-- xsinc: 数组 (nullable = true)||-- 元素:整数(containsNull = true)

df.show()

+---+---------+-----+|身份证|xs|xsinc|+---+----------+-----+|1|[1, 2, 3]|[,,]|+---+----------+-----+

当然,这里没有真正的性能提升潜力 - 它分派到 BasePythonRunner,因此开销应该与普通 udf 相同.

相关 JIRA 票证 SPARK-27052 - 在转换中使用 PySpark udf 会产生 NULL 值

It's about transform higher-order function (https://issues.apache.org/jira/browse/SPARK-23908).

Is there any way to use it as a standard function (in package org.apache.spark.sql.functions._)?

I have an array of strings and I want to apply URI normalization to each of them. For now I did it with an UDF. I just hopped that with spark 2.4.0 I would be able to skip the UDF.

As I see it should be used in selectExpr like df.selectExpr("transform(i, x -> x + 1)") but is it only meant to be used with selectExpr?

Using it this way is there anyway to provide a custom function for the transformation? Is there any way to achieve it or should I resort to using good old UDFs?

解决方案

Is there anyway to use it as a standard function located in package org.apache.spark.sql.functions._ ?

For now it is intended only for usage with SQL expressions, although if you want to return a Column your use expr:

org.apache.spark.sql.functions._

expr("transform(i, x -> x + 1)"): Column

Using it this way is there anyway to provide a custom function for the transformation?

It is possible to use Scala UDF*:

spark.udf.register("f", (x: Int) => x + 1)

Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
  .withColumn("xsinc", expr("transform(xs, x -> f(x))"))
  .show

+---+---------+---------+
| id|       xs|    xsinc|
+---+---------+---------+
|  1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+

although it doesn't seem to provide any real benefits over UDF taking a Seq.


* A partial support for Python UDFs seem to be in place (udfs are recognized, types are correctly derived, and calls are dispatched) as well, but as of 2.4.0 the serialization mechanism seems to be broken (all records are passed to UDF as None):

from typing import Optional
from pyspark.sql.functions import expr

sc.version

'2.4.0'

def f(x: Optional[int]) -> Optional[int]:
    return x + 1 if x is not None else None

spark.udf.register('f', f, "integer")

df = (spark
    .createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", expr("transform(xs, x -> f(x))")))

df.printSchema()

root
 |-- id: long (nullable = true)
 |-- xs: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- xsinc: array (nullable = true)
 |    |-- element: integer (containsNull = true)

df.show()

+---+---------+-----+
| id|       xs|xsinc|
+---+---------+-----+
|  1|[1, 2, 3]| [,,]|
+---+---------+-----+

Of course there is no real potential for performance boost here - it dispatches to BasePythonRunner so overhead should be the same as of plain udf.

Related JIRA ticket SPARK-27052 - Using PySpark udf in transform yields NULL values

这篇关于如何使用变换高阶函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆