从 Spark 中的压缩读取整个文本文件 [英] Read whole text files from a compression in Spark

查看:38
本文介绍了从 Spark 中的压缩读取整个文本文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下问题:假设我有一个包含压缩目录的目录,其中包含存储在 HDFS 上的多个文件.我想创建一个包含一些 T 类型对象的 RDD,即:

I have the following problem: suppose that I have a directory containing compressed directories which contain multiple files, stored on HDFS. I want to create an RDD consisting some objects of type T, i.e.:

context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String content = fileNameContent._2();

    // Class T has a constructor of taking the filename and the content of each
    // processed file (as two strings)
    T t = new T(content, fileName);

    return t;
});

现在当 inputDataPath 是一个包含文件的目录时,这工作得很好,即当它是这样的:

Now when inputDataPath is a directory containing files this works perfectly fine, i.e. when it's something like:

String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders

但是,当存在包含多个文件的 tgz 时,文件内容 (fileNameContent._2()) 会给我一些无用的二进制字符串(非常预期).我发现了一个 类似的问题,但情况不一样,因为解决方案是当每个压缩仅包含一个文件时,在我的情况下,还有许多其他文件我想作为整个文件单独读取.我还发现了一个关于问题wholeTextFiles,但这在我的情况下不起作用.

But, when there's a tgz containing multiple files, the file content (fileNameContent._2()) gets me some useless binary string (quite expected). I found a similar question on SO, but it's not the same case, because there the solution is when each compression consists of one file only, and in my case there are many other files which I want to read individually as whole files. I also found a question about wholeTextFiles, but this doesn't work in my case.

任何想法如何做到这一点?

Any ideas how to do this?

我尝试从 这里 的阅读器(尝试从 这里,就像在函数 testTarballWithFolders()),但是每当我打电话给

I tried with the reader from here (trying to test the reader from here, like in the function testTarballWithFolders()), but whenever I call

TarballReader tarballReader = new TarballReader(fileName);

我得到NullPointerException:

java.lang.NullPointerException
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
    at utils.TarballReader.<init>(TarballReader.java:61)
    at main.SparkMain.lambda$0(SparkMain.java:105)
    at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

MainSpark 中的第 105 行是我在编辑帖子时显示的那一行,而 TarballReader 中的第 61 行是

The line 105 in MainSpark is the one I showed upper in my edit of the post, and line 61 from TarballReader is

GZIPInputStream gzip = new GZIPInputStream(in);

它为上面一行中的输入流 in 提供了一个空值:

which gives a null value for the input stream in in the upper line:

InputStream in = this.getClass().getResourceAsStream(tarball);

我是否走在正确的道路上?如果是这样,我该如何继续?为什么我会得到这个空值,我该如何解决?

Am I on the right path here? If so, how do I continue? Why do I get this null value and how can I fix it?

推荐答案

一种可能的解决方案是使用 binaryFiles 读取数据并手动提取内容.

One possible solution is to read data with binaryFiles and extract content manually.

Scala:

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
  val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
  Stream.continually(Option(tar.getNextTarEntry))
    // Read until next exntry is null
    .takeWhile(_.isDefined)
    // flatten
    .flatMap(x => x)
    // Drop directories
    .filter(!_.isDirectory)
    .map(e => {
      Stream.continually {
        // Read n bytes
        val buffer = Array.fill[Byte](n)(-1)
        val i = tar.read(buffer, 0, n)
        (i, buffer.take(i))}
      // Take as long as we've read something
      .takeWhile(_._1 > 0)
      .map(_._2)
      .flatten
      .toArray})
    .toArray
}

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = 
  new String(bytes, StandardCharsets.UTF_8)

sc.binaryFiles("somePath").flatMapValues(x => 
  extractFiles(x).toOption).mapValues(_.map(decode()))

libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"

Java 完整使用示例:https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Full usage example with Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src

Python:

import tarfile
from io import BytesIO

def extractFiles(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if x.isfile()]

(sc.binaryFiles("somePath")
    .mapValues(extractFiles)
    .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))

这篇关于从 Spark 中的压缩读取整个文本文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆