处理多个文件作为独立RDD的并行 [英] Processing multiple files as independent RDD's in parallel

查看:318
本文介绍了处理多个文件作为独立RDD的并行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一种情况,有一些小的(〜300MB每个)文件被应用一定数量的操作,包括一组通过。操作看起来是这样的。

I have a scenario where a certain number of operations including a group by has to be applied on a number of small (~300MB each) files. The operation looks like this..

df.groupBy(....)。AGG(....)

现在对其进行处理多个文件,我可以使用通配符/**/*.csv但是,创建单个RDD和分区它的操作。然而,看着操作,它是一组通过,并涉及大量的洗牌,如果这些文件是互相排斥的是不必要的。

Now to process it on multiple files, I can use a wildcard "/**/*.csv" however, that creates a single RDD and partitions it to for the operations. However, looking at the operations, it is a group by and involves lot of shuffle which is unnecessary if the files are mutually exclusive.

什么,我看到的是,这是一种在那里我可以在文件创建独立的RDD的和他们独立运营。

What, I am looking at is, a way where i can create independent RDD's on files and operate on them independently.

推荐答案

这是不是一个完整的解决方案的更多想法,我没有测试它。

It is more an idea than a full solution and I haven't tested it yet.

您可以与您的提取数据处理管线成一个函数开始。

You can start with extracting your data processing pipeline into a function.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

如果你的文件很小,你可以调整 N 参数使用尽可能少的分区尽可能从单一的文件集成数据和避免洗牌。这意味着你限制并发,但我们会回到这个问题以后。

If your files are small you can adjust n parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.

val n: Int = ??? 

接下来,您必须获得输入文件的列表。此步骤取决于数据源上,但大部分的时间,它是或多或少直接:

Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:

val files: Array[String] = ???

接下来,您可以使用上面的映射列表管道功能:

val rdds = files.map(f => pipeline(f, n))

由于我们在我们想通过提交多个作业,以弥补单个文件的级别限制并发性。让我们来添加一个简单的辅助,迫使评估和未来

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.count // Force computation
    df
}

最后,我们可以使用上面的帮手 RDDS

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

根据您的要求,您可以添加的onComplete 回调或使用无流来收集结果。

Depending on your requirements you can add onComplete callbacks or use reactive streams to collect the results.

这篇关于处理多个文件作为独立RDD的并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆