user-defined-functions相关内容
我有一个场景,我在数据框列中有 XML 数据. +-----------+---------+----------+--------------------+----+-----------+--------+---+------------------+-----------+------------+----+|县|created_at|first_name|id|meta|name_cou
..
有没有办法选择整行作为一列输入到 Pyspark 过滤器 udf 中? 我有一个复杂的过滤函数“my_filter",我想将其应用于整个 DataFrame: my_filter_udf = udf(lambda r: my_filter(r), BooleanType())new_df = df.filter(my_filter_udf(col("*")) 但是 col("*")
..
我有一个包含 100 列和 1000 万条记录的输入表 (I).我想得到一个有 50 列的输出表 (O),这些列来自 I 的列,即有 50 个函数将 I 的列映射到 O 的 50 列,即 o1 = f(i1) , o2 = f(i2, i3) ..., o50 = f(i50, i60, i70). 在 spark sql 中,我可以通过两种方式做到这一点: 逐行解析 I 的整行(例如
..
我有一个返回字符串列表的 udf.这应该不会太难.我在执行 udf 时传入数据类型,因为它返回一个字符串数组:ArrayType(StringType). 现在,不知何故这是行不通的: 我正在操作的数据帧是 df_subsets_concat,看起来像这样: df_subsets_concat.show(3,False) +----------------------+|col1
..
我想在用户设计的函数中使用位于另一个类中的方法,但它不起作用. 我有一个方法: def traitementDataFrameEleve(sc:SparkSession, dfRedis:DataFrame, domainMail:String, dir:String):Boolean ={def loginUDF = udf((sn: String, givenName:String)
..
比如我们用Spark引擎执行下面的SQL,我们需要my_udf(row)返回 Spark 中的分区 ID. 添加jar hdfs:///dir/udf/udf.jar;创建临时函数 my_udf 为 'com.my.MyUDF';从表中选择行,my_udf(row); 我已经知道如何在 MR 引擎中执行的 Hive UDF 中获取 taskId:如何在 hive UDF 中获取 taskID
..
我正在使用 Spark 和 Scala 进行一些数据处理.我将 XML 数据映射到数据帧.我将 Row 作为参数传递给 UDF 并尝试将两个复杂类型的对象提取为列表.Spark 给我以下错误: 线程“main"中的异常java.lang.UnsupportedOperationException:不支持类型org.apache.spark.sql.Row的架构 def testUdf =
..
我目前正在使用 Spark 2.1,并且有一个主脚本调用包含我所有转换方法的辅助模块.换句话说: main.py助手文件 在我的 helper.py 文件的顶部,我有几个自定义 UDF,我已按以下方式定义: def 重新格式化:返回 reformat_logic(s)reformat_udf = udf(reformat, StringType()) 在我将所有 UDF 拆分到帮助文件中之
..
我已经使用 lambda 函数尝试了带参数的 Spark UDF 并注册了它.但是我怎么能创建没有参数和注册器的 udf 我已经试过了我的示例代码预计会显示当前时间 从日期时间导入日期时间从 pyspark.sql.functions 导入 udf def getTime():时间值=日期时间.now()返回时间值udfGateTime=udf(getTime,TimestampType(
..
我正在使用 Spark 2.0,并正在寻找一种在 Scala 中实现以下目标的方法: 需要两个 Data-frame 列值之间的时间戳差异(以毫秒为单位). Value_1 = 06/13/2017 16:44:20.044Value_2 = 06/13/2017 16:44:21.067 两者的数据类型都是时间戳. 注意:将函数 unix_timestamp(Column s)
..
我无法一次将每组数据帧发送给执行程序. 我在 company_model_vals_df dataframe 中有如下数据. -------------------------------------------------------------------------------------|model_id |财政年|财政季度|列 1 |col2 |col3 |col4 |col5
..
我是 spark/scala 的新手.我正在尝试从配置单元表中读取一些数据到 spark 数据框,然后根据某些条件添加一列.这是我的代码: val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) asbid
..
作为问题,如何在 hive UDF 中获取 taskID 或 mapperID(类似于 Spark 中的 partitionID)? 解决方案 我自己找到了正确答案,我们可以通过以下方式在hive UDF中获取taskID: public class TestUDF extends GenericUDF {私人文本结果=新文本();私人字符串 tmpStr = "";@覆盖公共无效配置(
..
我尝试使用 countDistinct 函数,根据 DataBrick 的博客.但是,我得到了以下异常: 线程“main"org.apache.spark.sql.AnalysisException 中的异常:未定义函数 countDistinct; 我发现在 Spark 开发人员的邮件列表 他们建议使用 count 和 distinct 函数来获得与 countDistinct 应该产生的结
..
我想编写 Spark UDAF,其中列的类型可以是任何定义了 Scala Numeric 的类型.我在 Internet 上进行了搜索,但只找到了具有具体类型的示例,例如 DoubleType、LongType.这不可能吗?但是如何将 UDAF 与其他数值一起使用? 解决方案 为简单起见,我们假设您要定义自定义 sum.您将为输入类型提供一个 TypeTag 并使用 Scala 反射来定义
..
我在 Spark SQL DataFrame 中有两列,每一列中的每个条目都是一个字符串数组. val ngramDataFrame = Seq((Seq("curious", "bought", "20"), Seq("iwa", "was", "asj"))).toDF("filtered_words", "ngrams_array") 我想合并每一行中的数组以在新列中创建一个数组.我的代
..
我有一个 PySpark DataFrame,其中一列作为一个热编码向量.我想在 groupby 之后通过向量加法聚合不同的一个热编码向量 例如df[userid,action] Row1: ["1234","[1,0,0]] Row2: ["1234", [0 1 0]] 我希望输出为行:["1234", [ 1 1 0]] 所以向量是按 userid 分组的所有向量的总和.> 我
..
我必须根据值列表将列添加到 PySpark 数据框. a= spark.createDataFrame([("Dog", "Cat"), ("Cat", "Dog"), ("Mouse", "Cat")],["Animal", "敌人"]) 我有一个名为 rating 的列表,它是对每只宠物的评级. rating = [5,4,1] 我需要在数据框后面附加一个名为 Rating 的列,这样
..
我正在尝试使用 Apache Spark SQL 在 Java 中创建用户定义的聚合函数 (UDAF),该函数在完成时返回多个数组.我在网上搜索过,但找不到有关如何执行此操作的任何示例或建议. 我能够返回单个数组,但无法弄清楚如何在evaluate() 方法中以正确格式获取数据以返回多个数组. UDAF 确实有效,因为我可以在evaluate() 方法中打印出数组,但我不知道如何将这些
..
来自 R,我习惯于轻松地对列进行操作.有什么简单的方法可以使用我用 Scala 编写的这个函数 def round_tenths_place( un_rounded:Double ) : Double = {val rounded = BigDecimal(un_rounded).setScale(1, BigDecimal.RoundingMode.HALF_UP).toDouble返回四舍五
..