python spark替代方案可以为非常大的数据爆炸 [英] python spark alternative to explode for very large data

查看:47
本文介绍了python spark替代方案可以为非常大的数据爆炸的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个这样的数据框:

I have a dataframe like this:

df = spark.createDataFrame([(0, ["B","C","D","E"]),(1,["E","A","C"]),(2, ["F","A","E","B"]),(3,["E","G","A"]),(4,["A","C","E","B","D"])], ["id","items"])

它创建一个数据框 df 像这样:

which creates a data frame df like this:

+---+-----------------+
|  0|     [B, C, D, E]|
|  1|        [E, A, C]|
|  2|     [F, A, E, B]|
|  3|        [E, G, A]|
|  4|  [A, C, E, B, D]|
+---+-----------------+ 

我想得到这样的结果:

+---+-----+
|all|count|
+---+-----+
|  F|    1|
|  E|    5|
|  B|    3|
|  D|    2|
|  C|    3|
|  A|    4|
|  G|    1|
+---+-----+

本质上只是在 df["items"] 中找到所有不同的元素并计算它们的频率.如果我的数据更易于管理,我会这样做:

Which essentially just finds all distinct elements in df["items"] and counts their frequency. If my data was of a more manageable size, I would just do this:

all_items = df.select(explode("items").alias("all")) 
result = all_items.groupby(all_items.all).count().distinct() 
result.show()

但是因为我的数据在每个列表中有数百万行和数千个元素,所以这不是一个选项.我正在考虑逐行执行此操作,以便一次只处理 2 个列表.因为大多数元素经常在多行中重复(但每行中的列表是一个集合),这种方法应该可以解决我的问题.但问题是,我真的不知道如何在 Spark 中做到这一点,因为我才刚刚开始学习它.请问有人可以帮忙吗?

But because my data has millions of rows and thousands of elements in each list, this is not an option. I was thinking of doing this row by row, so that I only work with 2 lists at a time. Because most elements are frequently repeated in many rows (but the list in each row is a set), this approach should solve my problem. But the problem is, I don't really know how to do this in Spark, as I've only just started learning it. Could anyone help, please?

推荐答案

您需要做的是减小进入爆炸的分区的大小.有 2 个选项可以执行此操作.首先,如果您的输入数据是可拆分的,您可以减小 spark.sql.files.maxPartitionBytes 的大小,以便 Spark 读取更小的拆分.另一种选择是在爆炸前重新分区.

What you need to do is reduce the size of your partitions going into the explode. There are 2 options to do this. First, if your input data is splittable you can decrease the size of spark.sql.files.maxPartitionBytes so Spark reads smaller splits. The other option would be to repartition before the explode.

默认值maxPartitionBytes 是 128MB,因此 Spark 将尝试以 128MB 的块读取您的数据.如果数据不可拆分,那么它会将整个文件读入单个分区,在这种情况下,您需要执行 repartition.

The default value of maxPartitionBytes is 128MB, so Spark will attempt to read your data in 128MB chunks. If the data is not splittable then it'll read the full file into a single partition in which case you'll need to do a repartition instead.

在你的情况下,因为你正在做一个爆炸,假设每个分区增加 128MB 增加 100 倍,你最终每个分区出现 12GB+!

In your case since you're doing an explode, say it's 100x increase with 128MB per partition going in, you're ending up with 12GB+ per partition coming out!

您可能需要考虑的另一件事是您的随机分区,因为您正在进行聚合.因此,再次,您可能需要通过将 spark.sql.shuffle.partitions 设置为高于默认 200 的值来增加爆炸后聚合的分区.您可以使用 Spark UI 来查看shuffle 阶段,查看每个任务读取了多少数据并进行相应调整.

The other thing you may need to consider is your shuffle partitions since you're doing an aggregation. So again, you may need to increase the partitioning for the aggregation after the explode by setting spark.sql.shuffle.partitions to a higher value than the default 200. You can use the Spark UI to look at your shuffle stage and see how much data each task is reading in and adjust accordingly.

我在 谈话 我刚刚在欧洲 Spark 峰会上发表.

I discuss this and other tuning suggestions in the talk I just gave at Spark Summit Europe.

这篇关于python spark替代方案可以为非常大的数据爆炸的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆