从星火一个COM pression阅读整个文本文件 [英] Read whole text files from a compression in Spark

查看:374
本文介绍了从星火一个COM pression阅读整个文本文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下问题:假设我有含有包含多个文件COM pressed目录,存储在HDFS目录。我想创建一个由T型,即一些对象的RDD:

I have the following problem: suppose that I have a directory containing compressed directories which contain multiple files, stored on HDFS. I want to create an RDD consisting some objects of type T, i.e.:

context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String content = fileNameContent._2();

    // Class T has a constructor of taking the filename and the content of each
    // processed file (as two strings)
    T t = new T(content, fileName);

    return t;
});

现在,当 inputDataPath 是包含的文件,这工作完全正常的目录中,即当它是这样的:

Now when inputDataPath is a directory containing files this works perfectly fine, i.e. when it's something like:

String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders

不过,当有包含多个文件的TGZ,文件内容( fileNameContent._2())得到我一些无用的二进制字符串(相当有料)。我发现了一个 href=\"http://stackoverflow.com/questions/16302385/gzip-support-in-spark\">类似的问题,但它是不一样的情况下,因为有解决的办法是当每个COM pression只包含一个文件,并在我的情况也有,我想作为一个整体文件单独看了很多其他文件。我还发现了一个<一个href=\"http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles?rq=1\">question关于 wholeTextFiles ,但这并不在我的情况下工作。

But, when there's a tgz containing multiple files, the file content (fileNameContent._2()) gets me some useless binary string (quite expected). I found a similar question on SO, but it's not the same case, because there the solution is when each compression consists of one file only, and in my case there are many other files which I want to read individually as whole files. I also found a question about wholeTextFiles, but this doesn't work in my case.

任何想法如何做到这一点?

Any ideas how to do this?

编辑:

我试着从这里读者(尝试测试从<一读者href=\"https://github.com/pgrandjean/TarballInputFormat/blob/master/src/test/java/org/utils/TestTarballReader.java\"相对=nofollow>这里,像函数 testTarballWithFolders()),但每当我打电话

I tried with the reader from here (trying to test the reader from here, like in the function testTarballWithFolders()), but whenever I call

TarballReader tarballReader = new TarballReader(fileName);

和我得到的 NullPointerException异常

java.lang.NullPointerException
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
    at utils.TarballReader.<init>(TarballReader.java:61)
    at main.SparkMain.lambda$0(SparkMain.java:105)
    at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

MainSpark 线105是一个我在我的帖子编辑上显示,和61行从 TarballReader

The line 105 in MainSpark is the one I showed upper in my edit of the post, and line 61 from TarballReader is

GZIPInputStream gzip = new GZIPInputStream(in);

这给出了输入流在上线一个空值:

InputStream in = this.getClass().getResourceAsStream(tarball);

我在这里在正确的道路上?如果是这样,我怎么继续?为什么我得到这个空值,我该如何解决这个问题?

Am I on the right path here? If so, how do I continue? Why do I get this null value and how can I fix it?

推荐答案

一个可行的办法是阅读与 binaryFiles 数据和手动提取内容。

One possible solution is to read data with binaryFiles and extract content manually.

斯卡拉

import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._

def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
  val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
  Stream.continually(Option(tar.getNextTarEntry))
    // Read until next exntry is null
    .takeWhile(_.isDefined)
    // flatten
    .flatMap(x => x)
    // Drop directories
    .filter(!_.isDirectory)
    .map(e => {
      Stream.continually {
        // Read n bytes
        val buffer = Array.fill[Byte](n)(-1)
        val i = tar.read(buffer, 0, n)
        (i, buffer.take(i))}
      // Take as long as we've read something
      .takeWhile(_._1 > 0)
      .map(_._2)
      .flatten
      .toArray})
    .toArray
}

def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = 
  new String(bytes, StandardCharsets.UTF_8)

sc.binaryFiles("somePath").flatMapValues(x => 
  extractFiles(x).toOption).mapValues(_.map(decode()))

libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"

与Java完全使用示例: https://bitbucket.org/zero323/火花多文件-targz提取物/ src目录

的Python

import tarfile
from io import BytesIO

def extractFiles(bytes):
    tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
    return [tar.extractfile(x).read() for x in tar if x.isfile()]

(sc.binaryFiles("somePath")
    .mapValues(extractFiles)
    .mapValues(lambda xs: [x.decode("utf-8") for x in xs]))

这篇关于从星火一个COM pression阅读整个文本文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆