将Spark数据框保存为多个部分而无需重新分区 [英] Saving a spark dataframe in multiple parts without repartitioning

查看:128
本文介绍了将Spark数据框保存为多个部分而无需重新分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想查询我的数据库,并仅存储按某列(例如column1)排序的前100万条记录. 我尝试了两种方法

I want to query my database and store only the top 1 million records sorted by some column say column1. I have tried two approaches

  1. 我将镶木地板文件从HDFS加载到数据帧中,并对其应用SQL查询,然后将完整的数据帧(1000万条记录)保存为文本文件到HDFS上.

  1. I load parquet files into a dataframe from the HDFS and apply SQL Query to it and then I save the the complete dataframe(10 million records) as text files on HDFS.

df = sqlContext.sql("SELECT * FROM table order by column1")
df.rdd.saveAsTextFile("<path>")

然后,我读取文本文件并从该文本文件中提取100万条记录.

I then read the text files and fetch 1 million records from the text file.

我将SQL查询限制为一百万条记录.

I limit the SQL query to 1 million records.

df = sqlContext.sql("SELECT * FROM table order by column1 LIMIT 1000000")
df.rdd.saveAsTextFile("<path>")

但是第二种方法要慢得多.我发现在第二种情况下,SQL query(df)返回的数据帧仅包含1个分区,因此它是在单个任务中编写的.重新划分数据帧可提高第二种情况的性能,但仍比第一种情况慢.

But the second approach is much slower. I found that in second case dataframe returned by SQL query(df) contains only 1 partition and thus it is written in a single task. Repartitioning the dataframe improved the performance in second case but it was still slower than first case.

请有人建议在情况2中可以更快地保存数据帧的方法,或者其他可以完成相同任务的方法

Please can anybody suggest an approach to save the dataframe faster in case 2 or any other approach to achieve the same task

推荐答案

假定column1是数字,一种可能的解决方案是估计分布和filter而不是sort和limit.假设您的数据如下所示:

Assuming that column1 is numeric one possible solution is to estimate distribution and filter instead of sort and limit. Assuming your data looks like this:

from pyspark.mllib.random import RandomRDDs

df = RandomRDDs.normalRDD(sc, 1000000, 10, 323).map(lambda x: (x, )).toDF(["x"])

您想获得100条最高记录:

and you want to take 100 top records:

n = 1000 # Number of records to take, lets say 1000

您可以估算必须记录的比例:

you can estimate fraction of records you have to take:

q = 100 * (1 - n / float(df.count()))

估计各自的分位数:

import numpy as np
from pyspark.sql.functions import col


threshold = np.percentile(
    df.sample(False, 0.05).select("x").rdd.flatMap(lambda x: x).collect(),
    [q]
)[0]

result = df.where(col("x") > threshold)
result.write.format(...).save(...)

这根本不会洗牌,并保持初始记录分配,但不能保证确切的记录数,并且需要对相关的网络IO采取其他措施.

This doesn't shuffle at all and keeps initial record distribution but doesn't guarantee exact number of records and requires additional action with related network IO.

这篇关于将Spark数据框保存为多个部分而无需重新分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆