user-defined-functions相关内容

spark sql - 是否使用行转换或 UDF

我有一个包含 100 列和 1000 万条记录的输入表 (I).我想得到一个有 50 列的输出表 (O),这些列来自 I 的列,即有 50 个函数将 I 的列映射到 O 的 50 列,即 o1 = f(i1) , o2 = f(i2, i3) ..., o50 = f(i50, i60, i70). 在 spark sql 中,我可以通过两种方式做到这一点: 逐行解析 I 的整行(例如 ..

如何在 PySpark 中创建一个返回字符串数组的 udf?

我有一个返回字符串列表的 udf.这应该不会太难.我在执行 udf 时传入数据类型,因为它返回一个字符串数组:ArrayType(StringType). 现在,不知何故这是行不通的: 我正在操作的数据帧是 df_subsets_concat,看起来像这样: df_subsets_concat.show(3,False) +----------------------+|col1 ..

使用 Spark 引擎执行 SQL 时,如何在 hive UDF 中获取 Spark 的 partitionId 或 taskContext?

比如我们用Spark引擎执行下面的SQL,我们需要my_udf(row)返回 Spark 中的分区 ID. 添加jar hdfs:///dir/udf/udf.jar;创建临时函数 my_udf 为 'com.my.MyUDF';从表中选择行,my_udf(row); 我已经知道如何在 MR 引擎中执行的 Hive UDF 中获取 taskId:如何在 hive UDF 中获取 taskID ..

PySpark 2.1:使用 UDF 导入模块会破坏 Hive 连接

我目前正在使用 Spark 2.1,并且有一个主脚本调用包含我所有转换方法的辅助模块.换句话说: main.py助手文件 在我的 helper.py 文件的顶部,我有几个自定义 UDF,我已按以下方式定义: def 重新格式化:返回 reformat_logic(s)reformat_udf = udf(reformat, StringType()) 在我将所有 UDF 拆分到帮助文件中之 ..

如何在 Pyspark 中注册没有参数的 UDF

我已经使用 lambda 函数尝试了带参数的 Spark UDF 并注册了它.但是我怎么能创建没有参数和注册器的 udf 我已经试过了我的示例代码预计会显示当前时间 从日期时间导入日期时间从 pyspark.sql.functions 导入 udf def getTime():时间值=日期时间.now()返回时间值udfGateTime=udf(getTime,TimestampType( ..

如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)?

作为问题,如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)? 解决方案 我自己找到了正确答案,我们可以通过以下方式在hive UDF中获取taskID: public class TestUDF extends GenericUDF {私人文本结果=新文本();私人字符串 tmpStr = "";@覆盖公共无效配置( ..

Spark UDAF - 使用泛型作为输入类型?

我想编写 Spark UDAF,其中列的类型可以是任何定义了 Scala Numeric 的类型.我在 Internet 上进行了搜索,但只找到了具有具体类型的示例,例如 DoubleType、LongType.这不可能吗?但是如何将 UDAF 与其他数值一起使用? 解决方案 为简单起见,我们假设您要定义自定义 sum.您将为输入类型提供一个 TypeTag 并使用 Scala 反射来定义 ..

在 Apache Spark SQL 中从用户定义的聚合函数 (UDAF) 返回多个数组

我正在尝试使用 Apache Spark SQL 在 Java 中创建用户定义的聚合函数 (UDAF),该函数在完成时返回多个数组.我在网上搜索过,但找不到有关如何执行此操作的任何示例或建议. 我能够返回单个数组,但无法弄清楚如何在evaluate() 方法中以正确格式获取数据以返回多个数组. UDAF 确实有效,因为我可以在evaluate() 方法中打印出数组,但我不知道如何将这些 ..