使用Java将数组转换为Spark DataFrame中的DenseVector [英] Convert Array to DenseVector in Spark DataFrame using Java

查看:224
本文介绍了使用Java将数组转换为Spark DataFrame中的DenseVector的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行Spark 2.3。我想将以下DataFrame中的列功能 ArrayType 转换为 DenseVector 。我在Java中使用Spark。

I am running Spark 2.3. I want to convert the column features in the following DataFrame from ArrayType to a DenseVector. I am using Spark in Java.

+---+--------------------+
| id|            features|
+---+--------------------+
|  0|[4.191401, -1.793...|
| 10|[-0.5674514, -1.3...|
| 20|[0.735613, -0.026...|
| 30|[-0.030161237, 0....|
| 40|[-0.038345724, -0...|
+---+--------------------+

root
 |-- id: integer (nullable = false)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = false)

我写了以下 UDF ,但似乎不起作用:

I have written the following UDF but it doesn't seem to be working:

private static UDF1 toVector = new UDF1<Float[], Vector>() {

    private static final long serialVersionUID = 1L;

    @Override
    public Vector call(Float[] t1) throws Exception {

        double[] DoubleArray = new double[t1.length];
        for (int i = 0 ; i < t1.length; i++)
        {
            DoubleArray[i] = (double) t1[i];
        }   
    Vector vector = (org.apache.spark.mllib.linalg.Vector) Vectors.dense(DoubleArray);
    return vector;
    }
}

我希望提取以下特征作为矢量,因此

I wish to extract the following features as a vector so that I can perform clustering on it.

我也在注册UDF,然后继续按如下方式调用它:

I am also registering the UDF and then proceeding on to call it as follows:

spark.udf().register("toVector", (UserDefinedAggregateFunction) toVector);
df3 = df3.withColumn("featuresnew", callUDF("toVector", df3.col("feautres")));
df3.show();  

在运行此代码段时,我面临以下错误:

On running this snippet I am facing the following error:


ReadProcessData $ 1无法转换为org.apache.spark.sql.expressions。 UserDefinedAggregateFunction

ReadProcessData$1 cannot be cast to org.apache.spark.sql.expressions. UserDefinedAggregateFunction


推荐答案

问题在于您如何注册 udf 在Spark中。您不应该使用 UserDefinedAggregateFunction ,它不是 udf 而是 udaf 用于聚合。相反,您应该做的是:

The problem lies in how you are registering the udf in Spark. You should not use UserDefinedAggregateFunction which is not an udf but an udaf used for aggregations. Instead what you should do is:

spark.udf().register("toVector", toVector, new VectorUDT());

然后使用注册功能,使用:

Then to use the registered function, use:

df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));

udf 本身应稍作调整,如下所示:

The udf itself should be slightly adjusted as follows:

UDF1 toVector = new UDF1<Seq<Float>, Vector>(){

  public Vector call(Seq<Float> t1) throws Exception {

    List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);
    double[] DoubleArray = new double[t1.length()]; 
    for (int i = 0 ; i < L.size(); i++) { 
      DoubleArray[i]=L.get(i); 
    } 
    return Vectors.dense(DoubleArray); 
  } 
};






请注意,在 Spark 2.3+ ,您可以创建可直接调用的Scala样式 udf 。从这个答案


Note that in Spark 2.3+ you can create a scala-style udf that can be invoked directly. From this answer:

UserDefinedFunction toVector = udf(
  (Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()
);

df3.withColumn("featuresnew", toVector.apply(col("feautres")));

这篇关于使用Java将数组转换为Spark DataFrame中的DenseVector的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆