如何使用变换高阶函数? [英] How to use transform higher-order function?

查看:85
本文介绍了如何使用变换高阶函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这与transform高阶函数有关( https://issues.apache .org/jira/browse/SPARK-23908 ).

It's about transform higher-order function (https://issues.apache.org/jira/browse/SPARK-23908).

有什么办法可以将它用作标准功能(在软件包org.apache.spark.sql.functions._中)?

Is there any way to use it as a standard function (in package org.apache.spark.sql.functions._)?

我有一个字符串数组,我想将URI规范化应用于每个字符串.现在,我使用UDF做到了.我只是希望通过spark 2.4.0可以跳过UDF.

I have an array of strings and I want to apply URI normalization to each of them. For now I did it with an UDF. I just hopped that with spark 2.4.0 I would be able to skip the UDF.

如我所见,它应该像df.selectExpr("transform(i, x -> x + 1)")一样在selectExpr中使用,但是它仅打算与selectExpr一起使用吗?

As I see it should be used in selectExpr like df.selectExpr("transform(i, x -> x + 1)") but is it only meant to be used with selectExpr?

以这种方式使用它是否可以为转换提供自定义功能?有什么方法可以实现,还是我应该使用旧的UDF?

Using it this way is there anyway to provide a custom function for the transformation? Is there any way to achieve it or should I resort to using good old UDFs?

推荐答案

无论如何,是否可以将其用作软件包org.apache.spark.sql.functions._中的标准函数??

Is there anyway to use it as a standard function located in package org.apache.spark.sql.functions._ ?

目前,它仅用于SQL表达式,尽管如果要返回Column,请使用expr:

For now it is intended only for usage with SQL expressions, although if you want to return a Column your use expr:

org.apache.spark.sql.functions._

expr("transform(i, x -> x + 1)"): Column

以这种方式使用它是否可以为转换提供自定义功能?

Using it this way is there anyway to provide a custom function for the transformation?

可以使用Scala UDF *:

It is possible to use Scala UDF*:

spark.udf.register("f", (x: Int) => x + 1)

Seq((1, Seq(1, 2, 3))).toDF("id", "xs")
  .withColumn("xsinc", expr("transform(xs, x -> f(x))"))
  .show

+---+---------+---------+
| id|       xs|    xsinc|
+---+---------+---------+
|  1|[1, 2, 3]|[2, 3, 4]|
+---+---------+---------+

尽管与使用Seq的UDF相比似乎并没有提供任何真正的好处.

although it doesn't seem to provide any real benefits over UDF taking a Seq.

*似乎也已经部分支持Python UDF(可以识别udf,正确派生类型并分派调用),但是从2.4.0版本开始,序列化机制似乎已损坏(所有记录都已被删除).以None的形式传递给UDF:

* A partial support for Python UDFs seem to be in place (udfs are recognized, types are correctly derived, and calls are dispatched) as well, but as of 2.4.0 the serialization mechanism seems to be broken (all records are passed to UDF as None):

from typing import Optional
from pyspark.sql.functions import expr

sc.version

'2.4.0'

def f(x: Optional[int]) -> Optional[int]:
    return x + 1 if x is not None else None

spark.udf.register('f', f, "integer")

df = (spark
    .createDataFrame([(1, [1, 2, 3])], ("id", "xs"))
    .withColumn("xsinc", expr("transform(xs, x -> f(x))")))

df.printSchema()

root
 |-- id: long (nullable = true)
 |-- xs: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- xsinc: array (nullable = true)
 |    |-- element: integer (containsNull = true)

df.show()

+---+---------+-----+
| id|       xs|xsinc|
+---+---------+-----+
|  1|[1, 2, 3]| [,,]|
+---+---------+-----+

当然,这里没有提高性能的真正潜力-它调度到BasePythonRunner,因此开销应该与普通udf相同.

Of course there is no real potential for performance boost here - it dispatches to BasePythonRunner so overhead should be the same as of plain udf.

相关JIRA票证 SPARK-27052-在转换中使用PySpark udf会产生NULL值

这篇关于如何使用变换高阶函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆