从数据帧中触发MLLib Kmeans,然后再次返回 [英] Spark MLLib Kmeans from dataframe, and back again
问题描述
我的目标是使用Spark(1.3.1)MLLib将kmeans聚类算法应用于非常大的数据集.我已经使用Spark中的hiveContext从HDFS调用了数据,最终希望以这种格式将其放回原处
I aim to apply a kmeans clustering algorithm to a very large data set using Spark (1.3.1) MLLib. I have called the data from an HDFS using a hiveContext from Spark, and would eventually like to put it back there that way - in this format
|I.D |cluster |
===================
|546 |2 |
|6534 |4 |
|236 |5 |
|875 |2 |
我运行了以下代码,其中数据"是一个双精度数据框,并且是第一列的ID.
I have ran the following code, where "data" is a dataframe of doubles, and an ID for the first column.
val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
val clusters = KMeans.train(parsedData, 3, 20)
这成功运行,现在我陷入了如上所述将集群映射回其各自ID(在如上所述的数据帧中)的问题.我可以使用以下命令将其转换为datframe:
This runs successfully, I'm stuck now mapping the clusters back to their respective IDs, in a dataframe as described above. I can convert it to a datframe with:
sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
但那是我所知道的. 此帖子在正确的轨道上,并且这篇文章我想问的是我的类似问题.
But that's as far as I've got. This post is on the right track, and this post I think is asking a similar question to mine.
我怀疑需要labeledPoint库.任何评论,答案将不胜感激.
I suspect the labeledPoint library is needed. Any comments,answers would be appreciated, cheers.
刚刚找到此,看起来很有前途
Just found this in the Spark userlist, looks promising
推荐答案
我知道您想在最后获得DataFrame.我看到两种可能的解决方案.我会说,在它们之间进行选择是出于品味的问题.
I understand that you want to get DataFrame at the end. I see two possible solutions. I'd say that choosing between them is matter of taste.
以RDD的形式获取ID和集群对非常容易:
It's very easy to obtain pairs of ids and clusters in form of RDD:
val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
然后您从中创建DataFrame
Then you create DataFrame from that
val idCluster = idClusterRDD.toDF("id", "cluster")
之所以起作用,是因为map不会更改RDD中数据的顺序,这就是为什么您只能将ID与预测结果一起压缩的原因.
It works because map doesn't change order of the data in RDD, which is why you can just zip ids with results of prediction.
第二种方法涉及将clusters.predict
方法用作UDF:
Second method involves using clusters.predict
method as UDF:
val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)
现在,我们可以使用它来向数据添加预测:
Now we can use it to add predictions to data:
val idCluster = data.selectExpr("id", "predict(x, y) as cluster")
请记住,Spark API不允许UDF注销.这意味着关闭数据将保留在内存中.
Keep in mind that Spark API doesn't allow UDF deregistration. This means that closure data will be kept in the memory.
-
使用clusters.predict而不进行广播
It won't work in the distributed setup. actually it will work, I was confused by implementation of predict for RDD, which uses broadcast.
-
sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()
toArray
收集驱动程序中的所有数据.这意味着在分布式模式下,您将集群ID复制到一个节点中.
toArray
collects all data in the driver. This means that in distributed mode you will be copying cluster ids into one node.
这篇关于从数据帧中触发MLLib Kmeans,然后再次返回的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!