更换groupByKey()与reduceByKey() [英] Replace groupByKey() with reduceByKey()

查看:238
本文介绍了更换groupByKey()与reduceByKey()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是从一个跟进的问题在这里。我想在此基础上的实施。它的伟大工程,的的我想换成 groupByKey() reduceByKey() ,但我不知道如何(我现在并不担心性能)。下面是相关缩小的code:

This is a follow up question from here. I am trying to implement k-means based on this implementation. It works great, but I would like to replace groupByKey() with reduceByKey(), but I am not sure how (I am not worried about performance now). Here is the relevant minified code:

val data = sc.textFile("dense.txt").map(
        t => (t.split("#")(0), parseVector(t.split("#")(1)))).cache()

val read_mean_centroids = sc.textFile("centroids.txt").map(
        t => (t.split("#")(0), parseVector(t.split("#")(1))))
var centroids = read_mean_centroids.takeSample(false, K, 42).map(x => x._2)
do {
    var closest = read_mean_centroids.map(p => (closestPoint(p._2, centroids), p._2))
    var pointsGroup = closest.groupByKey() // <-- THE VICTIM :)
    var newCentroids = pointsGroup.mapValues(ps => average(ps.toSeq)).collectAsMap()
    ..

注意的println(newCentroids)会给:

图(23 - >(-6.269305E-4,-0.0011746404,-4.08004E-5),8 - >(-5.108732E-4,7.336348E-4,-3.707591E-4),17 - > (-0.0016383086,-0.0016974678,1.45 ..

Map(23 -> (-6.269305E-4, -0.0011746404, -4.08004E-5), 8 -> (-5.108732E-4, 7.336348E-4, -3.707591E-4), 17 -> (-0.0016383086, -0.0016974678, 1.45..

的println(最近)

MapPartitionsRDD [6]在地图kmeans.scala:75

MapPartitionsRDD[6] at map at kmeans.scala:75

相关问题:在Apache的火花(斯卡拉)使用reduceByKey

Relevant question: Using reduceByKey in Apache Spark (Scala).

一些<一个href=\"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions\"相对=nofollow>文档:

DEF reduceByKey(FUNC:(V,V)⇒V):RDD [(K,V)]

def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

使用关联功能,降低合并值为每个键。

Merge the values for each key using an associative reduce function.

DEF reduceByKey(FUNC:(V,V)⇒V,numPartitions:智力):RDD [(K,V)]

def reduceByKey(func: (V, V) ⇒ V, numPartitions: Int): RDD[(K, V)]

使用关联功能,降低合并值为每个键。

Merge the values for each key using an associative reduce function.

DEF reduceByKey(分区:分区程序,FUNC:(V,V)⇒V):RDD [(K,V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) ⇒ V): RDD[(K, V)]

使用关联功能,降低合并值为每个键。

Merge the values for each key using an associative reduce function.

DEF groupByKey():RDD [(K,可迭代[V])]

def groupByKey(): RDD[(K, Iterable[V])]

集团为RDD每个键成一个序列中的值。

Group the values for each key in the RDD into a single sequence.

推荐答案

您可以使用 aggregateByKey()(比 reduceByKey更自然()),这样来计算 newCentroids

You could use an aggregateByKey() (a bit more natural than reduceByKey()) like this to compute newCentroids:

val newCentroids = closest.aggregateByKey((Vector.zeros(dim), 0L))(
  (agg, v) => (agg._1 += v, agg._2 + 1L),
  (agg1, agg2) => (agg1._1 += agg2._1, agg1._2 + agg2._2)
).mapValues(agg => agg._1/agg._2).collectAsMap 

对于这个工作,你需要计算你的数据,即昏暗的维度,而你只需要操作一次。你也许可以使用类似 VAL暗淡= data.first._2.length

For this to work you will need to compute the dimensionality of your data, i.e. dim, but you only need to do this once. You could probably use something like val dim = data.first._2.length.

这篇关于更换groupByKey()与reduceByKey()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆