无需重新分区即可在多个部分中保存火花数据帧 [英] Saving a spark dataframe in multiple parts without repartitioning

查看:17
本文介绍了无需重新分区即可在多个部分中保存火花数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想查询我的数据库并仅存储按某个列(例如 column1)排序的前 100 万条记录.我尝试了两种方法

I want to query my database and store only the top 1 million records sorted by some column say column1. I have tried two approaches

  1. 我将镶木地板文件从 HDFS 加载到数据帧中,然后对其应用 SQL 查询,然后将完整的数据帧(1000 万条记录)保存为 HDFS 上的文本文件.

  1. I load parquet files into a dataframe from the HDFS and apply SQL Query to it and then I save the the complete dataframe(10 million records) as text files on HDFS.

df = sqlContext.sql("SELECT * FROM table order by column1")
df.rdd.saveAsTextFile("<path>")

然后我读取文本文件并从文本文件中获取 100 万条记录.

I then read the text files and fetch 1 million records from the text file.

我将 SQL 查询限制为 100 万条记录.

I limit the SQL query to 1 million records.

df = sqlContext.sql("SELECT * FROM table order by column1 LIMIT 1000000")
df.rdd.saveAsTextFile("<path>")

但是第二种方法要慢得多.我发现在第二种情况下,SQL query(df) 返回的数据帧仅包含 1 个分区,因此它是在单个任务中编写的.重新分区数据帧在第二种情况下提高了性能,但仍然比第一种情况慢.

But the second approach is much slower. I found that in second case dataframe returned by SQL query(df) contains only 1 partition and thus it is written in a single task. Repartitioning the dataframe improved the performance in second case but it was still slower than first case.

请任何人都可以建议一种方法来更快地保存数据框,以防万一或任何其他方法来实现相同的任务

Please can anybody suggest an approach to save the dataframe faster in case 2 or any other approach to achieve the same task

推荐答案

假设 column1 是数字,一种可能的解决方案是估计分布和 filter 而不是 sort 和 <代码>限制.假设您的数据如下所示:

Assuming that column1 is numeric one possible solution is to estimate distribution and filter instead of sort and limit. Assuming your data looks like this:

from pyspark.mllib.random import RandomRDDs

df = RandomRDDs.normalRDD(sc, 1000000, 10, 323).map(lambda x: (x, )).toDF(["x"])

并且您想获取 100 条最高记录:

and you want to take 100 top records:

n = 1000 # Number of records to take, lets say 1000

你可以估计你必须采取的记录的比例:

you can estimate fraction of records you have to take:

q = 100 * (1 - n / float(df.count()))

估计各自的分位数:

import numpy as np
from pyspark.sql.functions import col


threshold = np.percentile(
    df.sample(False, 0.05).select("x").rdd.flatMap(lambda x: x).collect(),
    [q]
)[0]

result = df.where(col("x") > threshold)
result.write.format(...).save(...)

这根本不会打乱并保持初始记录分布,但不能保证准确的记录数量,并且需要对相关网络 IO 执行额外操作.

This doesn't shuffle at all and keeps initial record distribution but doesn't guarantee exact number of records and requires additional action with related network IO.

这篇关于无需重新分区即可在多个部分中保存火花数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆