从数据帧中触发MLLib Kmeans,然后再次返回 [英] Spark MLLib Kmeans from dataframe, and back again

查看:81
本文介绍了从数据帧中触发MLLib Kmeans,然后再次返回的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的目标是使用Spark(1.3.1)MLLib将kmeans聚类算法应用于非常大的数据集.我已经使用Spark中的hiveContext从HDFS调用了数据,最终希望以这种格式将其放回原处

I aim to apply a kmeans clustering algorithm to a very large data set using Spark (1.3.1) MLLib. I have called the data from an HDFS using a hiveContext from Spark, and would eventually like to put it back there that way - in this format

    |I.D     |cluster |
    ===================
    |546     |2       |
    |6534    |4       |
    |236     |5       |
    |875     |2       |

我运行了以下代码,其中数据"是一个双精度数据框,并且是第一列的ID.

I have ran the following code, where "data" is a dataframe of doubles, and an ID for the first column.

    val parsedData = data.rdd.map(s => Vectors.dense(s.getDouble(1),s.getDouble(2))).cache()
    val clusters = KMeans.train(parsedData, 3, 20)

这成功运行,现在我陷入了如上所述将集群映射回其各自ID(在如上所述的数据帧中)的问题.我可以使用以下命令将其转换为datframe:

This runs successfully, I'm stuck now mapping the clusters back to their respective IDs, in a dataframe as described above. I can convert it to a datframe with:

    sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

但那是我所知道的. 此帖子在正确的轨道上,并且这篇文章我想问的是我的类似问题.

But that's as far as I've got. This post is on the right track, and this post I think is asking a similar question to mine.

我怀疑需要labeledPoint库.任何评论,答案将不胜感激.

I suspect the labeledPoint library is needed. Any comments,answers would be appreciated, cheers.

刚刚找到,看起来很有前途

Just found this in the Spark userlist, looks promising

推荐答案

我知道您想在最后获得DataFrame.我看到两种可能的解决方案.我会说,在它们之间进行选择是出于品味的问题.

I understand that you want to get DataFrame at the end. I see two possible solutions. I'd say that choosing between them is matter of taste.

以RDD的形式获取ID和集群对非常容易:

It's very easy to obtain pairs of ids and clusters in form of RDD:

val idPointRDD = data.rdd.map(s => (s.getInt(0), Vectors.dense(s.getDouble(1),s.getDouble(2)))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 3, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)

然后您从中创建DataFrame

Then you create DataFrame from that

val idCluster = idClusterRDD.toDF("id", "cluster")

之所以起作用,是因为map不会更改RDD中数据的顺序,这就是为什么您只能将ID与预测结果一起压缩的原因.

It works because map doesn't change order of the data in RDD, which is why you can just zip ids with results of prediction.

第二种方法涉及将clusters.predict方法用作UDF:

Second method involves using clusters.predict method as UDF:

val bcClusters = sc.broadcast(clusters)
def predict(x: Double, y: Double): Int = {
    bcClusters.value.predict(Vectors.dense(x, y))
}
sqlContext.udf.register("predict", predict _)

现在,我们可以使用它来向数据添加预测:

Now we can use it to add predictions to data:

val idCluster = data.selectExpr("id", "predict(x, y) as cluster")

请记住,Spark API不允许UDF注销.这意味着关闭数据将保留在内存中.

Keep in mind that Spark API doesn't allow UDF deregistration. This means that closure data will be kept in the memory.

  • 使用clusters.predict而不进行广播

它在分布式设置中不起作用.实际上它可以工作,

It won't work in the distributed setup. actually it will work, I was confused by implementation of predict for RDD, which uses broadcast.

  • sc.makeRDD(clusters.predict(parsedData).toArray()).toDF()

toArray收集驱动程序中的所有数据.这意味着在分布式模式下,您将集群ID复制到一个节点中.

toArray collects all data in the driver. This means that in distributed mode you will be copying cluster ids into one node.

这篇关于从数据帧中触发MLLib Kmeans,然后再次返回的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆