如何通过数组[序列[字符串]到Apache的火花UDF? (错误:不适用) [英] How to pass Array[Seq[String]] to apache spark udf? (Error: Not Applicable)

查看:203
本文介绍了如何通过数组[序列[字符串]到Apache的火花UDF? (错误:不适用)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在阶下的Apache火花UDF:

  VAL myFunc的UDF = {
  (userBias:浮动,otherBiases:地图[长,浮法]
    userFactors:序号[浮点],语境:序号[字符串])=>
    VAR的结果=的Float.NaN    如果(userFactors!= NULL){
      VAR contexBias = 0F      对于(CC< - 上下文){
       contexBias + = otherBiases(contextMapping(CC))
      }      //结果的定义
      // ...
    }
    结果
}

现在我想传递参数给这个函数,但是我总是得到的消息不适用由于参数背景。我知道,用户自定义函数按行采取的投入,这个功能如果我删除运行背景 ...如何解决这个问题?为什么它不读阵行[SEQ [字符串] ,即从背景?或者,也可以接受通过背景数据帧或类似的东西。

  //背景是数组[序列[字符串]
VAL一个= sc.parallelize(序列((1,2),(3,4)))。toDF(一,B)
VAL上下文= a.collect.map(_。toSeq.map(_。的toString))// userBias(偏见),otherBias(偏见),并userFactors(特征)
//有一个类型列,而userBias ...是DataFrames
myDataframe.select(数据集(*),
                   myFunc的(u​​serBias(偏见),
                          otherBias(偏置),
                          userFactors(特征),
                          上下文)
                   。至于($(NEWCOL)))

更新:

我试过的答案显示的解决方案 zero323 ,但是仍然有一个小问题与背景:数组[序列[字符串] 。特别是,问题是循环于此Array 为(CC< - 上下文){contexBias + = otherBiases(contextMapping(CC))} 。我要传递一个字符串 contextMapping ,而不是序列[字符串]

 高清myFunc的(背景:数组[序号[字符串]])= {UDF
    (userBias:浮动,otherBiases:地图[长,浮法]
     userFactors:序号[浮点])=>
      VAR的结果=的Float.NaN      如果(userFactors!= NULL){
        VAR contexBias = 0F
        对于(CC< - 上下文){
          contexBias + = otherBiases(contextMapping(CC))
        }        //结果估计      }
      结果
  }

现在我把它叫做如下:

  myDataframe.select(数据集(*),
                   myFunc的(上下文)(userBias(偏见),
                                   otherBias(偏置),
                                   userFactors(特征))
           。至于($(NEWCOL)))


解决方案

这是直接传递给UDF的任何参数必须是一个,所以如果你想传递常量数组你必须把它转换为文字列:

 进口org.apache.spark.sql.functions {阵列,点燃}VAL myFunc的:org.apache.spark.sql.UserDefinedFunction =?myFunc的(
  userBias(偏见),
  otherBias(偏置),
  userFactors(特征),
  // org.apache.spark.sql.Column
  阵列(context.map(XS =>阵列(xs.map(亮_):_ *)):_ *)

非 - 对象只能间接地通过封闭传递,例如像这样的:

 高清myFunc的(背景:数组[序号[字符串]])= {UDF
  (userBias:浮动,otherBiases:地图[长,浮点型],userFactors:序号[浮点])=>
    ???
}myFunc的(上下文)(userBias(偏见),otherBias(偏见),userFactors(特征))

I have the following apache spark udf in scala:

val myFunc = udf {
  (userBias: Float, otherBiases: Map[Long, Float],
    userFactors: Seq[Float], context: Seq[String]) => 
    var result = Float.NaN

    if (userFactors != null) {
      var contexBias = 0f

      for (cc <- context) {
       contexBias += otherBiases(contextMapping(cc))
      }

      // definition of result
      // ...
    }
    result
}

Now I want to pass parameters to this function, however I always get the message Not Applicable due to the parameter context. I know that user defined functions take inputs by rows, and this function runs if I delete context... How to solve this issue? Why doesn't it read rows from Array[Seq[String]], i.e. from context? Alternatively, it would be acceptable to passcontext as DataFrame or something similar.

// context is Array[Seq[String]]
val a = sc.parallelize(Seq((1,2),(3,4))).toDF("a", "b")
val context = a.collect.map(_.toSeq.map(_.toString))

// userBias("bias"), otherBias("biases") and userFactors("features")
// have a type Column, while userBias... are DataFrames
myDataframe.select(dataset("*"),
                   myFunc(userBias("bias"),
                          otherBias("biases"),
                          userFactors("features"),
                          context)
                   .as($(newCol)))

UPDATE:

I tried the solution indicated in the answer of zero323, however still there is a small issue with context: Array[Seq[String]]. In particular the problem is with looping over this Array for (cc <- context) { contexBias += otherBiases(contextMapping(cc)) }. I should pass a String to contextMapping, not a Seq[String]:

  def myFunc(context: Array[Seq[String]]) = udf {
    (userBias: Float, otherBiases: Map[Long, Float],
     userFactors: Seq[Float]) =>
      var result = Float.NaN

      if (userFactors != null) {
        var contexBias = 0f
        for (cc <- context) {
          contexBias += otherBiases(contextMapping(cc))
        }

        // estimation of result

      }
      result
  }

Now I call it as follows:

myDataframe.select(dataset("*"),
                   myFunc(context)(userBias("bias"),
                                   otherBias("biases"),
                                   userFactors("features"))
           .as($(newCol)))

解决方案

Any argument that is passed directly to the UDF has to be a Column so if you want to pass constant array you'll have to convert it to column literal:

import org.apache.spark.sql.functions.{array, lit}

val myFunc: org.apache.spark.sql.UserDefinedFunction = ???

myFunc(
  userBias("bias"),
  otherBias("biases"),
  userFactors("features"),
  // org.apache.spark.sql.Column
  array(context.map(xs => array(xs.map(lit _): _*)): _*)  
)

Non-Column objects can be passed only indirectly using closure, for example like this:

def myFunc(context: Array[Seq[String]]) = udf {
  (userBias: Float, otherBiases: Map[Long, Float],  userFactors: Seq[Float]) => 
    ???
}

myFunc(context)(userBias("bias"), otherBias("biases"), userFactors("features"))

这篇关于如何通过数组[序列[字符串]到Apache的火花UDF? (错误:不适用)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆