使用Java将数组转换为Spark DataFrame中的DenseVector [英] Convert Array to DenseVector in Spark DataFrame using Java
问题描述
我正在运行Spark 2.3。我想将以下DataFrame中的列功能
从 ArrayType
转换为 DenseVector
。我在Java中使用Spark。
I am running Spark 2.3. I want to convert the column features
in the following DataFrame from ArrayType
to a DenseVector
. I am using Spark in Java.
+---+--------------------+
| id| features|
+---+--------------------+
| 0|[4.191401, -1.793...|
| 10|[-0.5674514, -1.3...|
| 20|[0.735613, -0.026...|
| 30|[-0.030161237, 0....|
| 40|[-0.038345724, -0...|
+---+--------------------+
root
|-- id: integer (nullable = false)
|-- features: array (nullable = true)
| |-- element: float (containsNull = false)
我写了以下 UDF
,但似乎不起作用:
I have written the following UDF
but it doesn't seem to be working:
private static UDF1 toVector = new UDF1<Float[], Vector>() {
private static final long serialVersionUID = 1L;
@Override
public Vector call(Float[] t1) throws Exception {
double[] DoubleArray = new double[t1.length];
for (int i = 0 ; i < t1.length; i++)
{
DoubleArray[i] = (double) t1[i];
}
Vector vector = (org.apache.spark.mllib.linalg.Vector) Vectors.dense(DoubleArray);
return vector;
}
}
我希望提取以下特征作为矢量,因此
I wish to extract the following features as a vector so that I can perform clustering on it.
我也在注册UDF,然后继续按如下方式调用它:
I am also registering the UDF and then proceeding on to call it as follows:
spark.udf().register("toVector", (UserDefinedAggregateFunction) toVector);
df3 = df3.withColumn("featuresnew", callUDF("toVector", df3.col("feautres")));
df3.show();
在运行此代码段时,我面临以下错误:
On running this snippet I am facing the following error:
ReadProcessData $ 1无法转换为org.apache.spark.sql.expressions。 UserDefinedAggregateFunction
ReadProcessData$1 cannot be cast to org.apache.spark.sql.expressions. UserDefinedAggregateFunction
推荐答案
问题在于您如何注册 udf
在Spark中。您不应该使用 UserDefinedAggregateFunction
,它不是 udf
而是 udaf
用于聚合。相反,您应该做的是:
The problem lies in how you are registering the udf
in Spark. You should not use UserDefinedAggregateFunction
which is not an udf
but an udaf
used for aggregations. Instead what you should do is:
spark.udf().register("toVector", toVector, new VectorUDT());
然后使用注册功能,使用:
Then to use the registered function, use:
df3.withColumn("featuresnew", callUDF("toVector",df3.col("feautres")));
udf
本身应稍作调整,如下所示:
The udf
itself should be slightly adjusted as follows:
UDF1 toVector = new UDF1<Seq<Float>, Vector>(){
public Vector call(Seq<Float> t1) throws Exception {
List<Float> L = scala.collection.JavaConversions.seqAsJavaList(t1);
double[] DoubleArray = new double[t1.length()];
for (int i = 0 ; i < L.size(); i++) {
DoubleArray[i]=L.get(i);
}
return Vectors.dense(DoubleArray);
}
};
请注意,在 Spark 2.3+ ,您可以创建可直接调用的Scala样式 udf
。从这个答案:
Note that in Spark 2.3+ you can create a scala-style udf
that can be invoked directly. From this answer:
UserDefinedFunction toVector = udf(
(Seq<Float> array) -> /* udf code or method to call */, new VectorUDT()
);
df3.withColumn("featuresnew", toVector.apply(col("feautres")));
这篇关于使用Java将数组转换为Spark DataFrame中的DenseVector的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!