并行处理多个文件作为独立的 RDD [英] Processing multiple files as independent RDD's in parallel

查看:29
本文介绍了并行处理多个文件作为独立的 RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个场景,其中必须对许多小文件(每个约 300MB)应用一定数量的操作,包括 group by.操作看起来像这样..

I have a scenario where a certain number of operations including a group by has to be applied on a number of small (~300MB each) files. The operation looks like this..

df.groupBy(....).agg(....)

现在要在多个文件上处理它,我可以使用通配符/**/*.csv"但是,它会创建一个 RDD 并将其分区以进行操作.但是,从操作上看,它是一个group by,并且涉及很多shuffle,如果文件是互斥的,则没有必要.

Now to process it on multiple files, I can use a wildcard "/**/*.csv" however, that creates a single RDD and partitions it to for the operations. However, looking at the operations, it is a group by and involves lot of shuffle which is unnecessary if the files are mutually exclusive.

什么,我在看的是,一种可以在文件上创建独立 RDD 并独立操作它们的方法.

What, I am looking at is, a way where i can create independent RDD's on files and operate on them independently.

推荐答案

与其说是完整的解决方案,不如说是一个想法,我还没有测试过.

It is more an idea than a full solution and I haven't tested it yet.

您可以从将数据处理管道提取到函数中开始.

You can start with extracting your data processing pipeline into a function.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

如果您的文件很小,您可以调整 n 参数以使用尽可能少的分区来容纳单个文件中的数据并避免混洗.这意味着您正在限制并发性,但我们稍后会回到这个问题.

If your files are small you can adjust n parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.

val n: Int = ??? 

接下来您必须获得输入文件列表.此步骤取决于数据源,但大多数情况下或多或少很简单:

Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:

val files: Array[String] = ???

接下来你可以使用pipeline函数映射上面的列表:

Next you can map above list using pipeline function:

val rdds = files.map(f => pipeline(f, n))

由于我们将并发限制在单个文件级别,我们希望通过提交多个作业来补偿.让我们添加一个简单的帮助器,它强制评估并用 Future

Since we limit concurrency at the level of the single file we want to compensate by submitting multiple jobs. Lets add a simple helper which forces evaluation and wraps it with Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

最后我们可以在 rdds 上使用上面的 helper:

Finally we can use above helper on the rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

根据您的要求,您可以添加 onComplete 回调或使用反应流来收集结果.

Depending on your requirements you can add onComplete callbacks or use reactive streams to collect the results.

这篇关于并行处理多个文件作为独立的 RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆