处理多个文件作为独立RDD的并行 [英] Processing multiple files as independent RDD's in parallel
问题描述
我有一种情况,有一些小的(〜300MB每个)文件被应用一定数量的操作,包括一组通过。操作看起来是这样的。
I have a scenario where a certain number of operations including a group by has to be applied on a number of small (~300MB each) files. The operation looks like this..
df.groupBy(....)。AGG(....)
现在对其进行处理多个文件,我可以使用通配符/**/*.csv但是,创建单个RDD和分区它的操作。然而,看着操作,它是一组通过,并涉及大量的洗牌,如果这些文件是互相排斥的是不必要的。
Now to process it on multiple files, I can use a wildcard "/**/*.csv" however, that creates a single RDD and partitions it to for the operations. However, looking at the operations, it is a group by and involves lot of shuffle which is unnecessary if the files are mutually exclusive.
什么,我看到的是,这是一种在那里我可以在文件创建独立的RDD的和他们独立运营。
What, I am looking at is, a way where i can create independent RDD's on files and operate on them independently.
推荐答案
这是不是一个完整的解决方案的更多想法,我没有测试它。
It is more an idea than a full solution and I haven't tested it yet.
您可以与您的提取数据处理管线成一个函数开始。
You can start with extracting your data processing pipeline into a function.
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
如果你的文件很小,你可以调整 N
参数使用尽可能少的分区尽可能从单一的文件集成数据和避免洗牌。这意味着你限制并发,但我们会回到这个问题以后。
If your files are small you can adjust n
parameter to use as small number of partitions as possible to fit data from a single file and avoid shuffling. It means you are limiting concurrency but we'll get back to this issue later.
val n: Int = ???
接下来,您必须获得输入文件的列表。此步骤取决于数据源上,但大部分的时间,它是或多或少直接:
Next you have to obtain a list of input files. This step depends on a data source but most of the time it is more or less straightforward:
val files: Array[String] = ???
接下来,您可以使用上面的映射列表管道
功能:
val rdds = files.map(f => pipeline(f, n))
由于我们在我们想通过提交多个作业,以弥补单个文件的级别限制并发性。让我们来添加一个简单的辅助,迫使评估和未来
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.count // Force computation
df
}
最后,我们可以使用上面的帮手 RDDS
:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
根据您的要求,您可以添加的onComplete
回调或使用无流来收集结果。
Depending on your requirements you can add onComplete
callbacks or use reactive streams to collect the results.
这篇关于处理多个文件作为独立RDD的并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!