移植一个多线程计算密集型工作的火花 [英] Porting a multi-threaded compute intensive job to spark

查看:149
本文介绍了移植一个多线程计算密集型工作的火花的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的工作,它使用执行人服务并行任务(考虑小数据集做了一遍又一遍机器学习计算)code。
我的目标是尽可能快地,多次执行一些code和存储一些结果(总执行将是顺序的100M运行ATLEAST)。

I am working on code which uses executor service to parallelize tasks (think machine learning computations done over small dataset over and over again). My goal is to execute some code as fast as possible, multiple times and store the result somewhere (total executions will be on the order of 100M runs atleast).

的逻辑看起来是这样的(它的一个简单的例子):

The logic looks something like this (its a simplified example):

dbconn = new dbconn() //This is reused by all threads
for a in listOfSize1000:
   for b in listofSize10:
      for c in listOfSize2:
         taskcompletionexecutorservice.submit(new runner(a, b, c, dbconn))

最后,taskcompletionexecutorservice.take()被调用,我从未来,在一个数据库存储结果。
但是,这种方法是不是真的点之后进行缩放。

At the end, taskcompletionexecutorservice.take() is called and I store the Result from "Future" in a db. But this approach is not really scaling after a point.

这就是我在做什么,现在火花(这是一个残酷的黑客,但我期待的建议如何以最佳方式结构这一点):

So this is what I am doing right now in spark (which is a brutal hack, but I am looking for suggestions on how to best structure this):

sparkContext.parallelize(listOfSize1000).filter(a -> {
   dbconn = new dbconn() //Cannot init it outsize parallelize since its not serializable
   for b in listofSize10:
      for c in listOfSize2:
         Result r = new runner(a, b, c. dbconn))
         dbconn.store(r)

    return true //It serves no purpose.
}).count();

这个方法看起来低效的我,因为它不是工作的最小单位上真正的并行,虽然这份工作的工作没有问题。也算没有真正做对我什么,我说这触发执行。 http://spark.apache.org/examples.html

This approach looks inefficient to me since its not truly parallelizing on the smallest unit of job, although this job works alright. Also count is not really doing anything for for me, i added it to trigger the execution. It was inspired by computing the pi example here: http://spark.apache.org/examples.html

所以,我怎样才能更好地构建我的火花亚军这样我就可以有效地使用火花执行人有什么建议?

So any suggestions of how can I better structure my spark runner so that I can efficiently use spark executors?

推荐答案

因此​​,有几件事情我们可以做,使这个code的火花等。首先是使用的是过滤器计数,但真正使用的任何结果。功能的foreach 可能是更接近你想要的东西。

So there are a few things we can do to make this code more Spark like. The first is you are using a filter and count, but really using the results of either. The function foreach is probably closer to what you want.

话虽这么说,你正在创建一个数据库连接存储结果,我们可以看一下几种方法这样做。其一是:如果DB真的是你要使用的存储,你可以使用什么 foreachPartition mapPartitionsWithIndex 以创建每个分区只有一个连接,然后执行计数()(这是我所知道的是丑了一点,但 foreachWith 是德pcated为1.0.0 $ p $)。你也可以只是做一个简单的地图,然后将结果保存到的许多支持的输出格式(例如saveAsSequenceFile)。

That being said you are creating a DB connection to store the result, and we can look at doing this in a few ways. One is: if the DB is really what you want to use for storage, you could use foreachPartition OR mapPartitionsWithIndex to create only one connection per partition and then do a count() (which I know is a bit ugly but foreachWith is deprecated as of 1.0.0). You could also just do a simple map and then save your results to on of the many supported output formats (e.g. saveAsSequenceFile).

这篇关于移植一个多线程计算密集型工作的火花的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆