Spark将多个目录读入多个数据帧 [英] Spark read multiple directories into multiple dataframes

查看:122
本文介绍了Spark将多个目录读入多个数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在S3上有一个目录结构,如下所示:

I have a directory structure on S3 looking like this:

foo
  |-base
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....
  |-A
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....
  |-B
     |-2017
        |-01
           |-04
              |-part1.orc, part2.orc ....

这意味着对于目录foo,我基于作业的时间戳在给定的路径中具有多个输出表,baseAB等.

Meaning that for directory foo I have multiple output tables, base, A, B, etc in a given path based on the timestamp of a job.

我想基于时间戳和主目录(在本例中为foo)对它们全部进行left join.这意味着将每个输出表baseAB等读入新的单独的输入表中,并在这些表中应用left join.全部以base表为起点

I'd like to left join them all, based on a timestamp and the master directory, in this case foo. This would mean reading in each output table base, A, B, etc into new separate input tables on which a left join can be applied. All with the base table as starting point

类似这样的东西(不起作用的代码!)

Something like this (not working code!)

val dfs: Seq[DataFrame] = spark.read.orc("foo/*/2017/01/04/*")
val base: DataFrame = spark.read.orc("foo/base/2017/01/04/*")

val result = dfs.foldLeft(base)((l, r) => l.join(r, 'id, "left"))

有人可以为我指出正确的方向,以获取该序列的DataFrames吗?甚至值得考虑将读取读取为延迟读取或顺序读取,因此在应用联接以减少内存需求时仅读取AB表.

Can someone point me in the right direction on how to get that sequence of DataFrames? It might even be worth considering the reads as lazy, or sequential, thus only reading the A or B table when the join is applied to reduce memory requirements.

注意:目录结构不是最终结构,这意味着可以根据需要更改它.

Note: the directory structure is not final, meaning it can change if that fits the solution.

推荐答案

据我了解,Spark使用基础Hadoop API读取数据文件.因此,继承的行为是将您指定的所有内容读取到一个RDD/DataFrame中.

From what I understand Spark uses the underlying Hadoop API to read in data file. So the inherited behavior is to read everything you specify into one single RDD/DataFrame.

要实现所需的功能,您首先可以使用以下方法获取目录列表:

To achieve what you want, you can first get a list of directories with:

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{ FileSystem, Path }

    val path = "foo/"

    val hadoopConf = new Configuration()
    val fs = FileSystem.get(hadoopConf)
    val paths: Array[String] = fs.listStatus(new Path(path)).
      filter(_.isDirectory).
      map(_.getPath.toString)

然后将它们加载到单独的数据帧中:

Then load them into separated dataframes:

    val dfs: Array[DataFrame] = paths.
      map(path => spark.read.orc(path + "/2017/01/04/*"))

这篇关于Spark将多个目录读入多个数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆