user-defined-functions相关内容

在 pyspark 中使用 Scala 类作为 UDF

我正在尝试在使用 Apache Spark 时将一些计算从 Python 卸载到 Scala.我想使用 Java 的类接口来使用持久变量,就像这样(这是一个基于我更复杂用例的无意义的 MWE): 包mwe导入 org.apache.spark.sql.api.java.UDF1class SomeFun 扩展 UDF1[Int, Int] {私有变量道具:Int = 0覆盖定义调用(输入:Int ..

使用 UDF 处理多列时堆栈溢出

我有一个 DataFrame,其中包含许多 str 类型的列,我想对所有这些列应用一个函数,而不重命名它们的名称或添加更多列,我尝试使用 for-in 循环执行 withColumn(参见下面的示例),但通常当我运行代码时,它会显示 Stack Overflow(它很少工作),这个 DataFrame 一点也不大,它只有 ~15000 条记录. # df 是一个 DataFrame定义小写(字符 ..

如何将常量值传递给 Python UDF?

我在想是否有可能创建一个 UDF 接收两个参数一个 Column 和另一个变量 (Object,字典,或任何其他类型),然后执行一些操作并返回结果. 实际上,我试图这样做,但我得到了一个例外.所以,我想知道有没有什么办法可以避免这个问题. df = sqlContext.createDataFrame([("Bonsanto", 20, 2000.00),(“哈耶克", 60, 3000. ..

Spark/Scala 在多列上使用相同的函数重复调用 withColumn()

我目前有一些代码,其中我通过多个 .withColumn 链将相同的过程重复应用于多个 DataFrame 列,并且我想创建一个函数来简化该过程.就我而言,我正在查找按键聚合的列的累积总和: val newDF = oldDF.withColumn("cumA", sum("A").over(Window.partitionBy("ID").orderBy("time"))).withColum ..

Apache Spark -- 将 UDF 的结果分配给多个数据框列

我正在使用 pyspark,使用 spark-csv 将大型 csv 文件加载到数据框中,作为预处理步骤,我需要对其中一列(包含json 字符串).这将返回 X 个值,每个值都需要存储在自己单独的列中. 该功能将在 UDF 中实现.但是,我不确定如何从该 UDF 返回值列表并将这些值输入到各个列中.下面是一个简单的例子: (...)从 pyspark.sql.functions 导入 ud ..

如何使用 JAVA 在 Spark DataFrame 上调用 UDF?

与此处的类似问题,但没有足够的分数在那里发表评论. 根据Spark最新documentation 可以以两种不同的方式使用 udf,一种使用 SQL,另一种使用 DataFrame.我找到了多个关于如何在 sql 中使用 udf 的示例,但没有找到任何关于如何直接在 DataFrame 上使用 udf 的示例. o.p. 提供的解决方案在上面链接的问题上使用 __callUDF()__ ..

从 Spark DataFrame 中的单个列派生多个列

我有一个 DF,它有一个巨大的可解析元数据作为 Dataframe 中的单个字符串列,我们称之为 DFA,使用 ColmnA. 我想通过函数 ClassXYZ = Func1(ColmnA) 将该列 ColmnA 分成多列.该函数返回一个 ClassXYZ 类,其中包含多个变量,现在必须将这些变量中的每一个映射到新的 Column,例如 ColmnA1、ColmnA2 等. 我将如何通 ..

Spark UDAF 与 ArrayType 作为 bufferSchema 性能问题

我正在研究一个返回元素数组的 UDAF. 每次更新的输入是索引和值的元组. UDAF 的作用是对同一索引下的所有值求和. 示例: 对于 input(index,value) : (2,1), (3,1), (2,3) 应该返回 (0,0,4,1,...,0) 逻辑工作正常,但我的更新方法有问题,我的实现仅每行更新 1 个单元格,但该方法中的最后一个分配实际上复制 ..

关于如何在 Scala 中使用随机值向现有 DataFrame 添加新列

我有一个带有镶木地板文件的数据框,我必须添加一个包含一些随机数据的新列,但我需要这些随机数据彼此不同.这是我的实际代码,spark 的当前版本是 1.5.1-cdh-5.5.2: val mydf = sqlContext.read.parquet("some.parquet")//mydf.count()//63385686mydf.cacheval r = scala.util.Random ..