从 Spark 中的压缩读取整个文本文件 [英] Read whole text files from a compression in Spark
问题描述
我有以下问题:假设我有一个目录,其中包含存储在 HDFS 上的多个文件的压缩目录.我想创建一个包含一些 T 类型对象的 RDD,即:
I have the following problem: suppose that I have a directory containing compressed directories which contain multiple files, stored on HDFS. I want to create an RDD consisting some objects of type T, i.e.:
context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
// The name of the file
String fileName = fileNameContent._1();
// The content of the file
String content = fileNameContent._2();
// Class T has a constructor of taking the filename and the content of each
// processed file (as two strings)
T t = new T(content, fileName);
return t;
});
现在当 inputDataPath
是一个包含文件的目录时,这工作得很好,即当它是这样的:
Now when inputDataPath
is a directory containing files this works perfectly fine, i.e. when it's something like:
String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders
但是,当存在包含多个文件的 tgz 时,文件内容 (fileNameContent._2()
) 会给我一些无用的二进制字符串(非常预期).我发现了一个 类似的问题,但情况不一样,因为解决方案是当每个压缩仅包含一个文件时,在我的情况下,还有许多其他文件我想作为整个文件单独读取.我还发现了一个关于问题wholeTextFiles
,但这在我的情况下不起作用.
But, when there's a tgz containing multiple files, the file content (fileNameContent._2()
) gets me some useless binary string (quite expected). I found a similar question on SO, but it's not the same case, because there the solution is when each compression consists of one file only, and in my case there are many other files which I want to read individually as whole files. I also found a question about wholeTextFiles
, but this doesn't work in my case.
任何想法如何做到这一点?
Any ideas how to do this?
我尝试与来自 此处 的阅读器(尝试从 这里,就像在函数 testTarballWithFolders()
),但是每当我打电话给
I tried with the reader from here (trying to test the reader from here, like in the function testTarballWithFolders()
), but whenever I call
TarballReader tarballReader = new TarballReader(fileName);
我得到NullPointerException
:
java.lang.NullPointerException
at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
at utils.TarballReader.<init>(TarballReader.java:61)
at main.SparkMain.lambda$0(SparkMain.java:105)
at main.SparkMain$$Lambda$18/1667100242.call(Unknown Source)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
MainSpark
中的第 105 行是我在帖子编辑中显示的那一行,而 TarballReader
中的第 61 行是
The line 105 in MainSpark
is the one I showed upper in my edit of the post, and line 61 from TarballReader
is
GZIPInputStream gzip = new GZIPInputStream(in);
它为上面一行中的输入流 in
提供了一个空值:
which gives a null value for the input stream in
in the upper line:
InputStream in = this.getClass().getResourceAsStream(tarball);
我是否走在正确的道路上?如果是这样,我该如何继续?为什么我会得到这个空值,我该如何解决?
Am I on the right path here? If so, how do I continue? Why do I get this null value and how can I fix it?
推荐答案
一种可能的解决方案是使用 binaryFiles
读取数据并手动提取内容.
One possible solution is to read data with binaryFiles
and extract content manually.
Scala:
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.spark.input.PortableDataStream
import scala.util.Try
import java.nio.charset._
def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
Stream.continually(Option(tar.getNextTarEntry))
// Read until next exntry is null
.takeWhile(_.isDefined)
// flatten
.flatMap(x => x)
// Drop directories
.filter(!_.isDirectory)
.map(e => {
Stream.continually {
// Read n bytes
val buffer = Array.fill[Byte](n)(-1)
val i = tar.read(buffer, 0, n)
(i, buffer.take(i))}
// Take as long as we've read something
.takeWhile(_._1 > 0)
.map(_._2)
.flatten
.toArray})
.toArray
}
def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) =
new String(bytes, StandardCharsets.UTF_8)
sc.binaryFiles("somePath").flatMapValues(x =>
extractFiles(x).toOption).mapValues(_.map(decode()))
libraryDependencies += "org.apache.commons" % "commons-compress" % "1.11"
Java 完整使用示例:https://bitbucket.org/zero323/spark-multifile-targz-extract/src
Full usage example with Java: https://bitbucket.org/zero323/spark-multifile-targz-extract/src
Python:
import tarfile
from io import BytesIO
def extractFiles(bytes):
tar = tarfile.open(fileobj=BytesIO(bytes), mode="r:gz")
return [tar.extractfile(x).read() for x in tar if x.isfile()]
(sc.binaryFiles("somePath")
.mapValues(extractFiles)
.mapValues(lambda xs: [x.decode("utf-8") for x in xs]))
这篇关于从 Spark 中的压缩读取整个文本文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!