计算Scala并行集合中每个项目的出现次数 [英] Count occurrences of each item in a Scala parallel collection

查看:180
本文介绍了计算Scala并行集合中每个项目的出现次数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题与计算列表中每个元素的出现非常相似[Scala中的[List [T]] ],除了我想拥有一个涉及

My question is very similar to Count occurrences of each element in a List[List[T]] in Scala, except that I would like to have an efficient solution involving parallel collections.

具体来说,我有一个大的(〜10 ^ 7)向量vec,其中包含简短的(〜10)个整数列表,并且我想为每个整数x获取x发生的次数,例如作为Map[Int,Int].不同整数的数量级为10 ^ 6.

Specifically, I have a large (~10^7) vector vec of short (~10) lists of Ints, and I would like to get for each Int x the number of times x occurs, for example as a Map[Int,Int]. The number of distinct integers is of the order 10^6.

由于需要在计算机上完成此操作,因此具有相当大的内存(150GB)和内核数量(> 100),因此并行收集似乎是一个不错的选择.下面的代码是一种好方法吗?

Since the machine this needs to be done on has a fair amount of memory (150GB) and number of cores (>100) it seems like parallel collections would be a good choice for this. Is the code below a good approach?

val flatpvec = vec.par.flatten
val flatvec = flatpvec.seq
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatvec.count(_ == x)))
counts.toMap

还是有更好的解决方案?如果您想知道.seq转换:出于某种原因,以下代码似乎没有终止,即使是很小的例子:

Or are there better solutions? In case you are wondering about the .seq conversion: for some reason the following code doesn't seem to terminate, even for small examples:

val flatpvec = vec.par.flatten
val unique = flatpvec.distinct
val counts = unique map (x => (x -> flatpvec.count(_ == x)))
counts.toMap

推荐答案

这可以执行某些操作. aggregatefold相似,不同之处在于您还可以组合顺序折痕的结果.

This does something. aggregate is like fold except you also combine the results of the sequential folds.

更新:.par.groupBy中有开销不足为奇,但是我对常量因素感到惊讶.通过这些数字,您将永远无法以这种方式计数.另外,我不得不增加内存.

Update: It's not surprising that there is overhead in .par.groupBy, but I was surprised by the constant factor. By these numbers, you would never count that way. Also, I had to bump the memory way up.

本文介绍了用于构建结果图的有趣技术 概述链接. (它巧妙地保存了中间结果,然后在末尾并行合并它们.)

The interesting technique used to build the result map is described in this paper linked from the overview. (It cleverly saves the intermediate results and then coalesces them in parallel at the end.)

但是,如果您真正想要的只是计数,则复制groupBy的中间结果会很昂贵.

But copying around the intermediate results of the groupBy turns out to be expensive, if all you really want is a count.

数字是对连续的groupBy,并行的和最后的aggregate进行比较.

The numbers are comparing sequential groupBy, parallel, and finally aggregate.

apm@mara:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test
GroupBy: Starting...
Finished in 12695
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Par GroupBy: Starting...
Finished in 51481
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Aggregate: Starting...
Finished in 2672
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))

测试代码中没有什么神奇的东西.

Nothing magical in the test code.

import collection.GenTraversableOnce
import collection.concurrent.TrieMap
import collection.mutable

import concurrent.duration._

trait Timed {
  def now = System.nanoTime
  def timed[A](op: =>A): A =  {
    val start = now
    val res = op
    val end = now
    val lapsed = (end - start).nanos.toMillis
    Console println s"Finished in $lapsed"
    res
  }
  def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = {
    Console println s"$title: Starting..."
    val res = timed(op)
    //val showable = res.toIterator.min   //(res.toIterator take 10).toList
    val showable = res.toList.sorted take 10
    Console println s"$title: $showable"
  }
}

它会生成一些有趣的随机数据.

It generates some random data for interest.

object Test extends App with Timed {

  val upto = math.pow(10,6).toInt
  val ran = new java.util.Random
  val ten = (1 to 10).toList
  val maxSamples = 1000
  // samples of ten random numbers in the desired range
  val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto))
  // pick a sample at random
  def anyten = samples(ran nextInt maxSamples)
  def mag = 7
  val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten)

从任务中调用aggregate的顺序操作和组合操作,并将结果分配给易失性变量.

The sequential operation and the combining operation of aggregate are invoked from a task, and the result is assigned to a volatile var.

  def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int]
  def so(m: mutable.Map[Int,Int], is: List[Int]) = {
    for (i <- is) {
      val v = m.getOrElse(i, 0)
      m(i) = v + 1
    }
    m
  }
  def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = {
    for ((i, count) <- n) {
      val v = m.getOrElse(i, 0)
      m(i) = v + count
    }
    m
  }
  showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) })
  showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) })
  showtime("Aggregate", data.par.aggregate(z)(so, co))
}

这篇关于计算Scala并行集合中每个项目的出现次数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆