从使用目录星火读取多个文件 [英] Read multiple files from a directory using Spark

查看:1319
本文介绍了从使用目录星火读取多个文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图解决这个问题在kaggle使用火花的:

输入的层次结构是这样的:

the hierarchy of input is like this :

drivers/{driver_id}/trip#.csv
e.g., drivers/1/1.csv
      drivers/1/2.csv
      drivers/2/1.csv

我想读的父目录的驱动程序并为每个子目录我想创建一个 pairRDD 与键(sub_directory,FILE_NAME) 和值作为文件的内容

I want to read the parent directory "drivers" and for each sub directory i would like to create a pairRDD with key as (sub_directory,file_name) and value as the content of the file

我检查链接,并试图用

val text = sc.wholeTextFiles("drivers")
text.collect()

此失败,出现错误:

java.lang.ArrayIndexOutOfBoundsException: 0
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:591)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:283)
    at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:243)
    at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:267)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1779)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:885)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:109)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:884)

但是当我运行低于code,它的工作原理。

but when i run the below code, it works.

val text =  sc.wholeTextFiles("drivers/1")
text.collect()

但我并不想这样做,因为在这里,我将不得不读取目录驱动并循环文件,并致电 wholeTextFiles 为每个条目。

but I don't want to do this, since here i will have to read the directory drivers and loop the files and call wholeTextFiles for each entry.

推荐答案

而不是使用

sc.textfile("path/*/**") or sc.wholeTextFiles("path/*")

您可以用这块code的。由于火花内部列出一个文件夹,子文件夹的所有可能的值,因此它可以花费你的时间在大型数据集。相反的,你可以使用工会为同一目的。

You can use this piece of code. Because spark internally lists all the possible values of a folder and subfolder so it can cost you time on large datasets. Instead of that you can use Unions for the same purpose.

传递包含位置以下面这段code名单的对象,注意:SC是SQLContext的对象

Pass this List object which contains the locations to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

现在你有一个最终的统一RDD即DF

Now you got a final Unified RDD i.e. df

这篇关于从使用目录星火读取多个文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆