如何在 Apache Spark 中读取包含多个文件的 zip [英] How to read a zip containing multiple files in Apache Spark

查看:38
本文介绍了如何在 Apache Spark 中读取包含多个文件的 zip的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含多个文本文件的压缩文件.我想读取每个文件并构建一个包含每个文件内容的 RDD 列表.

I am having a Zipped file containing multiple text files. I want to read each of the file and build a List of RDD containining the content of each files.

val test = sc.textFile("/Volumes/work/data/kaggle/dato/test/5.zip")

将只是整个文件,但如何遍历 zip 的每个内容,然后使用 Spark 将其保存在 RDD 中.

will just entire files, but how to iterate through each content of zip and then save the same in RDD using Spark.

我对 Scala 或 Python 没问题.

I am fine with Scala or Python.

在 Python 中使用 Spark 的可能解决方案 -

Possible solution in Python with using Spark -

archive = zipfile.ZipFile(archive_path, 'r')
file_paths = zipfile.ZipFile.namelist(archive)
for file_path in file_paths:
    urls = file_path.split("/")
    urlId = urls[-1].split('_')[0]

推荐答案

Apache Spark 默认压缩支持

我在其他答案中写了所有必要的理论,您可能想参考:https://stackoverflow.com/a/45958182/1549135

我遵循了 @Herman 给出的建议并使用了 ZipInputStream.这给了我这个解决方案,它返回 zip 内容的 RDD[String].

I have followed the advice given by @Herman and used ZipInputStream. This gave me this solution, which returns RDD[String] of the zip content.

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

implicit class ZipSparkContext(val sc: SparkContext) extends AnyVal {

    def readFile(path: String,
                 minPartitions: Int = sc.defaultMinPartitions): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap { case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            Stream.continually(zis.getNextEntry)
                  .takeWhile {
                      case null => zis.close(); false
                      case _ => true
                  }
                  .flatMap { _ =>
                      val br = new BufferedReader(new InputStreamReader(zis))
                      Stream.continually(br.readLine()).takeWhile(_ != null)
                  }
        }
      } else {
        sc.textFile(path, minPartitions)
      }
    }
  }

只需通过导入隐式类并调用 SparkContext 上的 readFile 方法来使用它:

simply use it by importing the implicit class and call the readFile method on SparkContext:

import com.github.atais.spark.Implicits.ZipSparkContext
sc.readFile(path)

这篇关于如何在 Apache Spark 中读取包含多个文件的 zip的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆