循环评估Spark DataFrame会降低每次迭代的速度,所有工作均由控制器完成 [英] Evaluating Spark DataFrame in loop slows down with every iteration, all work done by controller

查看:184
本文介绍了循环评估Spark DataFrame会降低每次迭代的速度,所有工作均由控制器完成的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用一个Spark集群(在AWS EMR上运行)来链接其中具有公共元素的项目组.本质上,我有一些元素组成的组,并且如果某些元素位于多个组中,我想组成一个包含所有这些组中元素的组.

I am trying to use a Spark cluster (running on AWS EMR) to link groups of items that have common elements in them. Essentially, I have groups with some elements and if some of the elements are in multiple groups, I want to make one group that contains elements from all of those groups.

我了解GraphX库,并尝试使用 graphframes 包(ConnectedComponents算法)来解决此问题任务,但它表明graphframes包还不够成熟,并且非常浪费资源...在我的数据集(cca 60GB)上运行它,无论我调整了多少Spark参数,它都将耗尽内存,如何对数据进行分区和重新分区,或如何创建大群集(图形很大).

I know about GraphX library and I tried to use graphframes package (ConnectedComponents algorithm) to resolve this task, but it seams that the graphframes package is not yet mature enough and is very wasteful with resources... Running it on my data set (cca 60GB) it just runs out of memory no matter how much I tune the Spark parameters, how I partition and re-partion my data or how big cluster I create (the graph IS huge).

所以我写了自己的代码来完成任务. 该代码可以正常工作,并且可以解决我的问题,但是每次迭代都会减慢速度.由于有时可能需要大约10次迭代才能完成,因此它可以运行很长时间,并且我无法弄清楚问题出在哪里.

So I wrote my own code do accomplish the task. The code works and it solves my problem, but it slows down with every iteration. Since it can take sometimes around 10 iterations to finish, it can run very long and I could not figure out what the problem is.

我从一个表(DataFrame)item_links开始,该表有两列:itemgroup_name.项目在每个组中都是唯一的,但在此表中不是唯一的.一个项目可以在多个组中.如果两个项目的行均具有相同的组名,则它们都属于同一组.

I start with a table (DataFrame) item_links that has two columns: item and group_name. Items are unique within each group, but not within this table. One item can be in multiple groups. If two items each have a row with the same group name, they both belong to the same group.

我首先按项目分组,然后为每个项目从其所属的所有组中查找最小的所有组名称.我将此信息作为额外的列添加到原始DataFrame中.然后,我通过按组名分组并在每个组中找到此新列的最小值来创建一个新的DataFrame.我将此DataFrame与我在组名上的原始表连接起来,并用该新列中的最小值替换组名列.这个想法是,如果一个组包含一个项目,该项目也属于某个较小的组,则该组将被合并.在每次迭代中,它都链接由之间越来越多的项目间接链接的组.

I first group by item and find for every item the smallest of all group names from all groups that it belongs to. I append this information as an extra column to the original DataFrame. Then I create a new DataFrame by groupping by the group name and finding the smallest value of this new column within every group. I join this DataFrame with my original table on the group name and replace the group name column with the minimum value from that new column. The idea is, that if a group contains an item that also belongs to some smaller group, this group will be merged it. In every iteration it links groups that were indirectly linked by more and more items in between.

我正在运行的代码如下:

The code that I am running looks like this:

print(" Merging groups that have common items...")

n_partitions = 32

merge_level = 0

min_new_group = "min_new_group_{}".format(merge_level)

# For every item identify the (alphabetically) first group in which this item was found
# and add a new column min_new_group with that information for every item.
first_group = item_links \
                    .groupBy('item') \
                    .agg( min('group_name').alias(min_new_group) ) \
                    .withColumnRenamed('item', 'item_id') \
                    .coalesce(n_partitions) \
                    .cache()

item_links = item_links \
                .join( first_group,
                       item_links['item'] == first_group['item_id'] ) \
                .drop(first_group['item_id']) \
                .coalesce(n_partitions) \
                .cache()

first_group.unpersist()

# In every group find the (alphabetically) smallest min_new_group value.
# If the group contains a item that was in some other group,
# this value will be different than the current group_name.
merged_groups = item_links \
                    .groupBy('group_name') \
                    .agg(
                        min(col(min_new_group)).alias('merged_group')
                    ) \
                    .withColumnRenamed('group_name', 'group_to_merge') \
                    .coalesce(n_partitions) \
                    .cache()

# Replace the group_name column with the lowest group that any of the item in the group had.
item_links = item_links \
                .join( merged_groups,
                       item_links['group_name'] == merged_groups['group_to_merge'] ) \
                .drop(item_links['group_name']) \
                .drop(merged_groups['group_to_merge']) \
                .drop(item_links[min_new_group]) \
                .withColumnRenamed('merged_group', 'group_name') \
                .coalesce(n_partitions) \
                .cache()

# Count the number of common items found
common_items_count = merged_groups.filter(col('merged_group') != col('group_to_merge')).count()

merged_groups.unpersist()

# just some debug output
print("  level {}: found {} common items".format(merge_level, common_items_count))

# As long as the number of groups keep decreasing (groups are merged together), repeat the operation.
while (common_items_count > 0):
    merge_level += 1

    min_new_group = "min_new_group_{}".format(merge_level)

    # for every item find new minimal group...
    first_group = item_links \
                        .groupBy('item') \
                        .agg(
                            min(col('group_name')).alias(min_new_group)
                        ) \
                        .withColumnRenamed('item', 'item_id') \
                        .coalesce(n_partitions) \
                        .cache() 

    item_links = item_links \
                    .join( first_group,
                           item_links['item'] == first_group['item_id'] ) \
                    .drop(first_group['item']) \
                    .coalesce(n_partitions) \
                    .cache()

    first_group.unpersist()

    # find groups that have items from other groups...
    merged_groups = item_links \
                        .groupBy(col('group_name')) \
                        .agg(
                            min(col(min_new_group)).alias('merged_group')
                        ) \
                        .withColumnRenamed('group_name', 'group_to_merge') \
                        .coalesce(n_partitions) \
                        .cache()

    # merge the groups with items from other groups...
    item_links = item_links \
                    .join( merged_groups,
                           item_links['group_name'] == merged_groups['group_to_merge'] ) \
                    .drop(item_links['group_name']) \
                    .drop(merged_groups['group_to_merge']) \
                    .drop(item_links[min_new_group]) \
                    .withColumnRenamed('merged_group', 'group_name') \
                    .coalesce(n_partitions) \
                    .cache()

    common_items_count = merged_groups.filter(col('merged_group') != col('group_to_merge')).count()

    merged_groups.unpersist()

    print("  level {}: found {} common items".format(merge_level, common_items_count))

正如我说的,它可以工作,但是问题是,每次迭代都会减慢速度. 1-3迭代仅运行几秒钟或几分钟.迭代5运行大约20-40分钟.迭代6有时甚至无法完成,因为控制器内存不足(控制器14 GB,带有20个CPU内核的整个群集的RAM约为140 GB,测试数据约为30 GB).

As I said, it works, but the problem is, that it slows down with every iteration. The iterations 1-3 run just a few seconds or minutes. Iteration 5 runs around 20-40 minutes. Iteration 6 sometimes doesn't even finish, because controller runs out of memory (14 GB for controller, around 140 GB of RAM for the entire cluster with 20 CPU cores... the test data is around 30 GB).

当我监视Ganglia的集群时,我看到,每次迭代之后,工作人员执行的工作越来越少,而控制器执行的工作越来越多.网络流量也下降到零.在初始阶段之后,内存使用情况相当稳定.

When I monitor the cluster in Ganglia, I see, that after every iteration the workers perform less and less work and the controller performs more and more. The network traffic also goes down to zero. Memory usage is rather stable after the initial phase.

我读了很多有关重新分区,设置Spark参数和随机播放操作的背景的知识,并且尽我所能来优化所有内容,但是我不知道发生了什么.以下是上面代码运行时随时间推移我的群集节点(控制器节点为黄色)的负载.

I read lot about re-partitioning, turning Spark parameters and background of shuffle operations and I did my best to optimize everything, but I have no idea what's going on here. Below is a load of my cluster nodes (yellow for controller node) over time as the code above is running.

推荐答案

一个简单的再现场景:

import time
from pyspark import SparkContext

sc = SparkContext()

def push_and_pop(rdd):
    # two transformations: moves the head element to the tail
    first = rdd.first()
    return rdd.filter(
        lambda obj: obj != first
    ).union(
        sc.parallelize([first])
    )

def serialize_and_deserialize(rdd):
    # perform a collect() action to evaluate the rdd and create a new instance
    return sc.parallelize(rdd.collect())

def do_test(serialize=False):
    rdd = sc.parallelize(range(1000))
    for i in xrange(25):
        t0 = time.time()
        rdd = push_and_pop(rdd)
        if serialize:
            rdd = serialize_and_deserialize(rdd)
        print "%.3f" % (time.time() - t0)

do_test()

在25次迭代中显示出严重的变慢:

Shows major slowdown during the 25 iterations:

0.597 0.117 0.186 0.234 0.288 0.309 0.386 0.439 0.507 0.529 0.553 0.586 0.710 0.728 0.779 0.896 0.866 0.881 0.956 1.049 1.069 1.061 1.149 1.189 1.201

0.597 0.117 0.186 0.234 0.288 0.309 0.386 0.439 0.507 0.529 0.553 0.586 0.710 0.728 0.779 0.896 0.866 0.881 0.956 1.049 1.069 1.061 1.149 1.189 1.201

(由于初始化效果,第一次迭代相对较慢,第二次迭代较快,每个后续迭代都较慢).

(first iteration is relatively slow because of initialization effects, second iteration is quick, every subsequent iteration is slower).

原因似乎是越来越多的惰性转换链.我们可以通过使用动作汇总RDD来检验假设.

The cause seems to be the growing chain of lazy transformations. We can test the hypothesis by rolling up the RDD using an action.

do_test(True)

0.897 0.256 0.233 0.229 0.220 0.238 0.234 0.252 0.240 0.267 0.260 0.250 0.244 0.266 0.295 0.464 0.292 0.348 0.320 0.258 0.250 0.201 0.197 0.243 0.230

0.897 0.256 0.233 0.229 0.220 0.238 0.234 0.252 0.240 0.267 0.260 0.250 0.244 0.266 0.295 0.464 0.292 0.348 0.320 0.258 0.250 0.201 0.197 0.243 0.230

collect()parallelize()每次迭代增加大约0.1秒,但完全消除了增量减慢.

The collect(), parallelize() adds about 0.1 second to each iteration, but completely eliminates the incremental slowdown.

这篇关于循环评估Spark DataFrame会降低每次迭代的速度,所有工作均由控制器完成的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆