找到所需数量的结果时,以早期中止过滤Scala的并行集合 [英] Filtering Scala's Parallel Collections with early abort when desired number of results found

查看:84
本文介绍了找到所需数量的结果时,以早期中止过滤Scala的并行集合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给出一个非常大的 collection.parallel.mutable.ParHashMap 实例(或任何其他并行集合),一旦给定(例如50个)匹配数达到被发现了吗?

Given a very large instance of collection.parallel.mutable.ParHashMap (or any other parallel collection), how can one abort a filtering parallel scan once a given, say 50, number of matches has been found ?

在4个内核上尝试在线程安全的外部"数据结构中累积中间匹配或在结果数上保持外部AtomicInteger似乎比使用常规 collection.mutable.HashMap <慢2至3倍/strong>,并将单个核心固定在100%.

Attempting to accumulate intermediate matches in a thread-safe "external" data structure or keeping an external AtomicInteger with result count seems to be 2 to 3 times slower on 4 cores than using a regular collection.mutable.HashMap and pegging a single core at 100%.

我知道,在Par *集合上找到存在确实会内部"中止.有没有一种方法可以将其概括为一个以上的结果?

I am aware that find or exists on Par* collections do abort "on the inside". Is there a way to generalize this to find more than one result ?

这是代码,在ParHashMap上似乎仍然慢2到3倍,有大约79,000个条目,并且还存在比 maxResults 填充更多的问题结果CHM(这可能是因为线程在 incrementAndGet 之后但在 break 之前被抢占,从而允许其他线程在其中添加更多元素).更新:似乎速度下降是由于工作线程在counter.incrementAndGet()上竞争,这当然使整个并行扫描的目的无法实现:-(

Here's the code which still seems to be 2 to 3 times slower on the ParHashMap with ~ 79,000 entries and also has a problem of stuffing more than maxResults results into the results CHM (Which is probably due to thread being preempted after incrementAndGet but before break which allows other threads to add more elements in). Update: it seems the slow down is due to worker threads contending on the counter.incrementAndGet() which of course defeats the purpose of the whole parallel scan :-(

def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
  val counter = new AtomicInteger(0)
  val results = new ConcurrentHashMap[Key,  Node](maxResults)

  import util.control.Breaks._

  breakable
  {
    for ((key, node) <- parHashMap if filter(node))
    {
      results.put(key, node)
      val total = counter.incrementAndGet()
      if (total > maxResults) break
    }
  }

  results.values.toArray(new Array[Node](results.size))
}

推荐答案

我将首先进行并行扫描,在并行扫描中,变量maxResults将是线程局部的.这样最多可以找到(maxResults * numberOfThreads)个结果.

I would first do parallel scan in which variable maxResults would be threadlocal. This would find up to (maxResults * numberOfThreads) results.

然后我将执行单线程扫描以将其减少为maxResults.

Then I would do single threaded scan to reduce it to maxResults.

这篇关于找到所需数量的结果时,以早期中止过滤Scala的并行集合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆