如何将 RDD[Row] 转换为 RDD[Vector] [英] How to convert RDD[Row] to RDD[Vector]

查看:80
本文介绍了如何将 RDD[Row] 转换为 RDD[Vector]的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Scala 实现 k-means 方法.我创建了一个类似这样的 RDD

I'm trying to implement k-means method using scala. I created a RDD something like that

val df = sc.parallelize(data).groupByKey().collect().map((chunk)=> {
  sc.parallelize(chunk._2.toSeq).toDF()
})

val examples = df.map(dataframe =>{
  dataframe.selectExpr(
    "avg(time) as avg_time",
    "variance(size) as var_size",
    "variance(time) as var_time",
    "count(size) as examples"
  ).rdd
})

val rdd_final=examples.reduce(_ union _)

val kmeans= new KMeans()
val model = kmeans.run(rdd_final)

使用此代码我得到一个错误

With this code I obtain an error

type mismatch;
[error]  found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error]  required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]

所以我试着投做:

val rdd_final_Vector = rdd_final.map{x:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)}

val model = kmeans.run(rdd_final_Vector)

但后来我得到一个错误:

But then I obtain an error:

java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector

所以我正在寻找一种方法来进行这种转换,但是我找不到任何方法.

So I'm looking for a way to do that cast, but I can't find any method.

有什么想法吗?

最好的问候

推荐答案

这里至少有几个问题:

  1. 您真的不能将 Row 转换为 Vector:Row 是 Spark SQL 理解的潜在不同类型的集合.Vector 不是原生 spark sql 类型
  2. 您的 SQL 语句的内容与您尝试使用 KMeans 实现的内容之间似乎不匹配:SQL 正在执行聚合.但是 KMeans 需要一系列 Vector 形式的单个数据点(封装了一个 Array[Double]).那么 - 为什么要向 KMeans 操作提供 sumaverage 的值?
  1. No you really can not cast a Row to a Vector: a Row is a collection of potentially disparate types understood by Spark SQL. A Vector is not a native spark sql type
  2. There seems to be a mismatch between the content of your SQL statement and what you are attempting to achieve with KMeans: the SQL is performing aggregations. But KMeans expects a series of individual data points in the form a Vector (which encapsulates an Array[Double]) . So then - why are you supplying sum's and average's to a KMeans operation?

在这里只解决 #1:您需要按照以下方式做一些事情:

Addressing just #1 here: you will need to do something along the lines of:

val doubVals = <rows rdd>.map{ row =>   row.getDouble("colname") }
val vector = Vectors.toDense{ doubVals.collect}

然后你有一个正确封装的 Array[Double](在一个 Vector 中),可以提供给 Kmeans.

Then you have a properly encapsulated Array[Double] (within a Vector) that can be supplied to Kmeans.

这篇关于如何将 RDD[Row] 转换为 RDD[Vector]的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆