分发处理时通常有多少开销? [英] How much overhead is usual while distributing processing?

查看:63
本文介绍了分发处理时通常有多少开销?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于不耐烦的读者:这是一个进行中的工作,我要求 在此过程中提供帮助.请不要由我来判断工具 临时数据,因为它们在我尝试获得更好结果时可能会发生变化.

For impatient readers: this is a work in progress, where I ask for help, during the process. Please do not judge the tools by my temporary data, as they can change while I try to get better results.

我们正处于架构决策过程的中间,该工具可以分析协同仿真的输出.

We are in the middle of the decision process on the architecture for a tool to analyse the output from co-simulations.

作为该过程的一部分,我被要求编写一个基准测试工具,并获取有关多个分布式处理框架速度的数据.

As part of that process I was asked to write a benchmark tool, and get data on the speeds of several distributed processing frameworks.

我测试的框架是:Apache Spark,Apache Flink,Hazelcast Jet.作为比较基准纯Java.

The frameworks I tested are: Apache Spark, Apache Flink, Hazelcast Jet. And as a comparison baseline plain Java.

我使用的测试用例很简单:这是Pojo的列表,pojo中的一个字段为双精度值.找到最小(最小)值".

The test case I used was a simple "here is a list of Pojos, with one field in the pojo a double value. Find the smallest(min) value".

简单,直接并且希望具有高度可比性.

Simple, straightforward and hopefully highly comparable.

在四项测试中,有三项使用简单的比较器,第四项(flink)使用与该比较器基本相同的减速器.分析功能如下:

Three out of four tests use a simple comparator, the fourth (flink) uses a reducer that is basically identical to the comparators. The analysing functions look like this:

Java: double min = logs.stream().min(new LogPojo.Comp()).get().getValue();

Spark: JavaRDD<LogPojo> logData = sc.parallelize(logs, num_partitions);
double min = logData.min(new LogPojo.Comp()).getValue();

Hazel: IStreamList<LogPojo> iLogs = jet.getList("logs");
iLogs.addAll(logs);
double min = iLogs.stream().min(new LogPojo.Comp()).get().getValue();

Flink: DataSet<LogPojo> logSet = env.fromCollection(logs);
double min = logSet.reduce(new LogReducer()).collect().get(0).getValue();

我对此进行了广泛的测试,改变了测试列表的大小以及分配的资源.结果震惊了我.最佳结果如下所示(所有数字均以毫秒为单位,1个mio pojos,每个均进行10个测试):

I tested this extensively, varying the size of the test list as well as the allocated ressources. And the results blew my mind. The BEST results can be seen below (all numbers in ms, 1 mio pojos, 10 tests each):

  • instances:声明和启动实例花了多长时间 框架
  • 列表:解析/传输所需的时间 列出到框架列表"
  • 过程:花了多长时间 处理数据以检索最小值
  • 总体:从开始到结束 每次测试
  • instances: how long it took to to declare and initiate the instance of the frameworks
  • list: how long it took to parse/transfer the List to the frameworks "list"
  • process: how long it took to process the data to retrieve the min
  • overall: from start to end of each test

结果:

java:
Instances: 
List: 
Process: 37, 24, 16, 17, 16, 16, 16, 16, 16, 16, 
Overall: 111, 24, 16, 17, 16, 16, 16, 16, 16, 16, 

spark:
Instances: 2065, 89, 62, 69, 58, 49, 56, 47, 41, 52, 
List: 166, 5, 1, 1, 2, 1, 0, 0, 0, 0, 
Process: 2668, 2768, 1936, 2016, 1950, 1936, 2105, 2674, 1913, 1882, 
Overall: 4943, 2871, 2011, 2094, 2020, 1998, 2172, 2728, 1961, 1943, 

hazel:
Instances: 6347, 2891, 2817, 3106, 2636, 2936, 3018, 2969, 2622, 2799, 
List: 1984, 1656, 1470, 1505, 1524, 1429, 1512, 1445, 1394, 1427, 
Process: 4348, 3809, 3655, 3751, 3927, 3887, 3592, 3810, 3673, 3769, 
Overall: 12850, 8373, 7959, 8384, 8110, 8265, 8133, 8239, 7701, 8007

flink:
Instances: 45, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
List: 92, 35, 16, 13, 17, 15, 19, 11, 19, 24, 
Process: 34292, 20822, 20870, 19268, 17780, 17390, 17124, 19628, 17487, 18586, 
Overall: 34435, 20857, 20886, 19281, 17797, 17405, 17143, 19639, 17506, 18610, 

最有趣的部分是:

  • 最好的结果全部来自纯本地测试(一个实例)
  • 任何使用分布式机制(附加节点等)的测试,仍然要慢一个数量级(例如,如果分布,则火花要慢2.5个).

现在不要误会我的意思,这是基本逻辑,即分布式处理的每个内核必须比单线程处理要慢.

Now don't get me wrong, it's basic logic that distributed processing has to be slower per core than mono-threaded processing.

但是如果在单线程上使用,甚至2个数量级?如果分布有3个数量级?有人可以看到我显然在所有3个分布式过程中都犯的错误吗?我希望某些因素< 10,因此可以选择使用更多的硬件将其杀死.

But 2 orders of magnitude EVEN if used on a mono-thread? And 3 orders of magnitude if distributed? Can someone see the mistake I apparently made in all 3 distributed processes? I expected some factor < 10, so killing it with more hardware would be an option.

那么有什么方法可以减少这些框架的开销,也许是x9而不是x999?

So is there some way to reduce the overhead of those frameworks to, hmm maybe x9 instead of x999?

我知道我知道,我使用的测试数据非常小,但是即使按比例放大,我也没有看到开销与性能之间的任何降低.这大约是我们需要分析的数据批次的大小(每个模拟为0.1M-1M个对象/秒).因此,欢迎您找到我的错误. :D

I know I know, the test data I use is much to small, but even if scaling it up, I haven't seen any reduction in overhead vs. performance. And it's roughly the size of the batches of data we need to analyse (0.1M - 1M objects/s per simulation). So your help to find my error is welcome. :D

更新Spark:

在Spark上进行了更彻底的测试后,我仍然没有留下深刻的印象.设置如下:

After some more thorough testing on Spark, I'm still not impressed. The setup was as follows:

java客户端,具有64核,480 GB RAM作业 主服务器和7个从属服务器在一个单独的机架上,每个机架32芯,每个20 GB

java client on one machine in a 64 core, 480 GB RAM job master and 7 slaves on a separate rack, 32 cors, 20 GB each

    1 mio objects, 256 tasks, 64 cpus local[*]
    java:
      Instances: 
      List: 
      Process: 622, 448, 68, 45, 22, 32, 15, 27, 22, 29, 
    spark:
      Instances: 4865, 186, 160, 133, 121, 112, 106, 78, 121, 106, 
      List: 310, 2, 2, 1, 2, 4, 2, 1, 2, 1, 
      Process: 8190, 4433, 4200, 4073, 4201, 4092, 3822, 3852, 3921, 4051, 

    10 mio objects, 256 tasks, 64 cpus local[*]
    java:
      Instances: 
      List: 
      Process: 2329, 144, 50, 65, 75, 70, 69, 66, 66, 66, 
    spark:
      Instances: 20345, 
      List: 258, 2, 1, 1, 1, 4, 1, 1, 1, 1, 
      Process: 55671, 49629, 48612, 48090, 47897, 47857, 48319, 48274, 48199, 47516

    1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 1+1 Spark machines (different rack)
    java:
      Instances: 
      List: 
      Process: 748, 376, 70, 31, 69, 64, 46, 17, 50, 53, 
    spark:
      Instances: 4631, 
      List: 249, 1, 2, 2, 3, 3, 1, 1, 2, 1, 
      Process: 12273, 7471, 6314, 6083, 6228, 6158, 5990, 5953, 5981, 5972

    1 mio objects, 5.2k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
    java:
      Instances: 
      List: 
      Process: 820, 494, 66, 29, 5, 30, 29, 43, 45, 21, 
    spark:
      Instances: 4513, 
      List: 254, 2, 2, 2, 2, 4, 2, 2, 1, 1, 
      Process: 17007, 6545, 7174, 7040, 6356, 6502, 6482, 6348, 7067, 6335

    10 mio objects, 52k tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack)
    java Process: 3037, 78, 48, 45, 53, 73, 72, 73, 74, 64, 
    spark:
      Instances: 20181, 
      List: 264, 3, 2, 2, 1, 4, 2, 2, 1, 1, 
      Process: 77830, 67563, 65389, 63321, 61416, 63007, 64760, 63341, 63440, 65320

    1 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i =0 to 100
    java Process: 722, 631, 62, 26, 25, 42, 26, 11, 12, 29, 40, 16, 14, 23, 29, 18, 14, 11, 71, 76, 37, 52, 32, 15, 51, 54, 19, 74, 62, 54, 7, 60, 37, 54, 42, 3, 7, 60, 33, 44, 50, 50, 39, 34, 34, 13, 47, 63, 46, 4, 52, 20, 19, 24, 6, 53, 4, 3, 68, 10, 59, 52, 48, 3, 48, 37, 5, 38, 10, 47, 4, 53, 36, 41, 31, 57, 7, 64, 45, 33, 14, 53, 5, 41, 40, 48, 4, 60, 49, 37, 20, 34, 53, 4, 58, 36, 12, 35, 35, 4, 
    spark:
      Instances: 4612, 
      List: 279, 3, 2, 1, 2, 5, 3, 1, 1, 1, 2, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 0, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 
      Process: 16300, 6577, 5802, 6136, 5389, 5912, 5885, 6157, 5440, 6199, 5902, 6299, 5919, 6066, 5803, 6612, 6120, 6775, 6585, 6146, 6860, 6955, 6661, 6819, 6868, 6700, 7140, 7532, 7077, 7180, 7360, 7526, 7770, 7877, 8048, 7678, 8260, 8131, 7837, 7526, 8261, 8404, 8431, 8340, 9000, 8825, 8624, 9340, 9418, 8677, 8480, 8678, 9003, 9036, 8912, 9235, 9401, 9577, 9808, 9485, 9955, 10029, 9506, 9387, 9794, 9998, 9580, 9963, 9273, 9411, 10113, 10004, 10369, 9880, 10532, 10815, 11039, 10717, 11251, 11475, 10854, 11468, 11530, 11488, 11077, 11245, 10936, 11274, 11233, 11409, 11527, 11897, 11743, 11786, 11086, 11782, 12001, 11795, 12075, 12422

    2 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 0 to 30
    java Process: 1759, 82, 31, 18, 30, 41, 47, 28, 27, 13, 28, 46, 5, 72, 50, 81, 66, 44, 36, 72, 44, 11, 65, 67, 58, 47, 54, 60, 46, 34, 
    spark:
      Instances: 6316, 
      List: 265, 3, 3, 2, 2, 6, 1, 2, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 1, 1, 5, 1, 1, 1, 1, 2, 1, 1, 1, 
      Process: 24084, 13041, 11451, 11274, 10919, 10972, 10677, 11048, 10659, 10984, 10820, 11057, 11355, 10874, 10896, 11725, 11580, 11149, 11823, 11799, 12414, 11265, 11617, 11762, 11561, 12443, 12448, 11809, 11928, 12095

    10 mio objects, 224*i tasks, 64 cpus local, 32 cpus each on 7+1 Spark machines (different rack), i = 5 to 30
    java Process: 1753, 91, 57, 71, 86, 86, 151, 80, 85, 72, 61, 78, 80, 87, 93, 89, 70, 83, 166, 84, 87, 94, 90, 88, 92, 89, 196, 96, 97, 89, 
    spark:
      Instances: 21192, 
      List: 282, 3, 2, 2, 3, 4, 2, 2, 1, 0, 1, 1, 1, 1, 2, 2, 1, 1, 1, 1, 1, 2, 2, 1, 1, 1, 
      Process: 60552, 53960, 53166, 54971, 52827, 54196, 51153, 52626, 54138, 51134, 52427, 53618, 50815, 50807, 52398, 54315, 54411, 51176, 53843, 54736, 55313, 56267, 50837, 54996, 52230, 52845

结果:无论在上面扔了多少硬件,以及任务如何集群,使用Spark花费了列表中每百万pojo 5-6秒的时间.

Results: no matter how much hardware was thrown on it, and how the tasks where clustered, it took 5-6 seconds per million pojos in the list using spark.

Java则用5到30毫秒处理了相同的数量.因此基本上是200-1,000.

Java on the other hand dealed with the same amount taking 5-30 ms. So basically a factor of 200-1,000.

有人有建议如何为这样简单的工作加速" Spark吗?

Does anyone have a suggestion how to "speed up" Spark for such a simple job?

更新榛树:

现在,我开始受到感动.尽管我仍在解决一些奇怪的问题,但至少Hazelcast Jet似乎了解,如果可能的话,可以在本地处理本地数据.只需100%(系数x2)的开销,这是完全可以接受的.

Now I'm starting to get impressed. While I'm still fighting with some weird problems, at least Hazelcast Jet seems to understand that local data can be processed locally if possible. With only 100% (factor x2) overhead, which is completely acceptable.

10个mio对象

java:
   Instances: 
   List: 68987, 
   Process: 2288, 99, 54, 52, 54, 64, 89, 83, 79, 88, 
hazel:
  Instances: 6136, 
  List: 97225, 
  Process: 1112, 375, 131, 123, 148, 131, 137, 119, 176, 140

更新链接:

暂时将其从基准测试中删除,因为这会带来太多麻烦,而无法给出很好的结果.

Dropped it from the benchmarking for now, as it's causing too much trouble without giving great results.

整个基准可以在以下位置找到: https://github.com/anderschbe/clusterbench

The whole benchmark can be found under: https://github.com/anderschbe/clusterbench

spark的集群设置使用开箱即用的spark-2.1.0-bin-hadoop2.7.在spark_env.sh中进行了微小更改:SPARK_NO_DAEMONIZE = true

The cluster setup for spark uses spark-2.1.0-bin-hadoop2.7 as it comes out of the box. With one minor change in spark_env.sh : SPARK_NO_DAEMONIZE=true

要使其在集群上起作用,唯一必要的更改是将SparcProc第25行中的"localhost"替换为"spark://I_cant_give_you_my_cluster_IP.doo"

the only change necessary to make it work on the cluster is replacing "localhost" in SparcProc line 25 with "spark://I_cant_give_you_my_cluster_IP.doo"

推荐答案

在集群框架中计算某些东西时,例如Spark或Flink,框架:

When you are calculating something in cluster framework, like Spark or Flink, framework:

  • 序列化您的代码
  • 发送资源请求
  • 通过网络发送代码
  • 计划执行
  • 等待结果

如您所见,执行了许多步骤-不仅是您的计算!如果您满足以下条件,则分布式计算将很有意义:

As you can see, there are many steps peformed - not only your calculation! Distributed computing make sense if you:

  • 可以将您的计算分为多个小任务,可以并行完成
  • 在一台计算机上要处理的数据过多,或者在一台计算机上的处理速度可能太慢-磁盘I/O,项目OR计算中的其他一些特定因素非常具体,并且需要许多CPU,通常不止一台计算机-但是一部分数据的计算必须很长

尝试计算10 GB文本文件中单词的mak出现次数-然后Spark和Flink将击败一节点Java

Try to calculate maks occurence count of words in 10 GB text file - then Spark and Flink will beat one-node Java

有时用户代码可能会导致分布式计算的速度变慢.典型错误:

Sometimes user code may cause slowness of distributed computing. Typical mistakes:

  • 用户在lambda中写很多引用,其他所有类都已序列化,序列化需要很多时间
  • 任务不是真正并行的-它们必须彼此等待,或者必须处理大部分数据
  • 数据偏斜度-对象的hashCode实现可能不正确,并且HashPartitioner导致所有数据进入一个分区=一个节点
  • 分区数不正确-您可以再添加1000台计算机,但是如果仍然有4个分区,则一次最多可以存档4个并行任务
  • 网络通信过多-在您的情况下这不是问题,但有时用户会执行很多joinreduce
  • user writes lambdas in clasess with many references - all other classes are serialized, serialization takes much time
  • tasks are not really parallel - they must wait for each other or must proceed on large part of data
  • data skewness - objects may have inproper hashCode implementation and HashPartitioner causes that all data wents to one partition = one node
  • incorrect number of partition - you can add 1000 more machines, but if you still have 4 partitions, then you can archive at most 4 parallel tasks in one time
  • too much network communication - in you case it's not a problem, but sometimes user are doing a lot of join and reduce

编辑问题后 在您的示例中,Spark在local上运行-这意味着仅1个线程!至少使用local[*]或其他群集管理器.您在此答案中列出了开销,并且只有一个线程

EDIT After Question edit: In your example, Spark runs on local - which means 1 thread only! Use at least local[*] or other cluster manager. You've got overheads listed in this answer and only one thread

这篇关于分发处理时通常有多少开销?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆