从Spark的scala中的* .tar.gz压缩文件中读取HDF5文件 [英] Read HDF5 files from *.tar.gz compressed file in scala in Spark

查看:22174
本文介绍了从Spark的scala中的* .tar.gz压缩文件中读取HDF5文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

参照这篇文章,我可以读取驻留在* .tar.gz文件中的多个* .txt文件。但现在,我需要读取* .tar.gz文件中的HDF5文件。示例文件可以从此处下载,它是从百万首歌曲数据集。任何人都可以告诉我如何更改以下代码以便将HDF5文件读入RDD中?

pre

$ import $ .apache.spark.sql。{SQLContext,DataFrame}
import org.apache.spark.ml.tuning.CrossValidatorModel
import org.apache.spark.ml.regression.LinearRegressionModel
import org .apache.spark.ml。{Pipeline,PipelineModel}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.input.PortableDataStream
import org.apache .commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import scala.util.Try
import java.nio.charset._
$ b object main {
def main(args:Array [String]){
val conf = new SparkConf()。setAppName(lab1)。setMaster(local )
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._
import sqlContext._

val inputpath =path / to / millio nsong.tar.gz
val rawDF = sc.binaryFiles(inputpath,2)
.flatMapValues(x => extractFiles(x).toOption)
.mapValues(_。map(decode()))
.map(_._ 2)
.flatMap(x => x)
.flatMap {x => x.split(\ n)}
.toDF()
}

def extractFiles(ps:PortableDataStream,n:Int = 1024)=尝试{
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
Stream.continually(Option(tar.getNextTarEntry))
//读取直到下一个exntry为空
.takeWhile( _.isDefined)
// flatten
.flatMap(x => x)
//放置目录
.filter(!_。isDirectory)
.map (e => {
Stream.continually {
//读取n个字节
val buffer = Array.fill [Byte](n)( - 1)
val i = tar.read(buffer,0,n)
(i,buffer.take(i))}
//只要我们读了一些
.takeWhile(_._ 1 > 0)
.map(_._ 2)
.flatten
.toArray})
.toArray
}

def解码(charset:Charset = StandardCharsets.UTF_8)(bytes:Array [Byte])= new String(bytes, StandardCharsets.UTF_8)
}


解决方案

通过将字节流写入本地文件,然后打开该文件为h5,使用这个。这里是我的代码:

  var tarFiles:Array [String] = Array()
val tar_path = path +数百万字节集.tar.gz

// TODO:将主文件夹路径中的所有tar.gz文件添加到tarFiles数组
//在这里添加尽可能多的tar.gz文件,歌曲的
// hdf5文件
tarFiles = tarFiles:+ tar_path
// tarFiles = tarFiles:+(path +A.tar.gz)
// tarFiles = tarFiles:+(path +B.tar.gz)
// tarFiles = tarFiles:+(path +C.tar.gz)

//读取所有焦油。 gz文件放入tar文件列表中,并为每个.h5
//文件内的文件提取每首歌曲的功能列表。
//因此,它会获取文件中所有歌曲的功能列表。
var allHDF5 = sc.parallelize(tarFiles).flatMap(path => {
val tar = new TarArchiveInputStream(new GzipCompressorInputStream(new FileInputStream(path)))
var entry:TarArchiveEntry = tar.getNextEntry()。asInstanceOf [TarArchiveEntry]
var res:List [Array [Byte]] = List()
var i = 0
while(entry!= null){
var outputFile:File = new File(entry.getName());
if(!entry.isDirectory()&&& entry.getName.contains(。h5)){
var byteFile = Array.ofDim [Byte](entry.getSize.toInt)
tar.read(byteFile);
res = byteFile :: res
if(i%100 == 0) {
println(Read+ i +files)
}
i = i + 1

}
entry = tar.getNextEntry() .asInstanceOf [TarArchiveEntry]
}
//所有文件都转换为字节数组
res

)).map(bytes => {
// toString方法d用作文件$ U
$ name = bytes.toString()
FileUtils.writeByteArrayToFile(new File(name),bytes)
val reader = HDF5Factory.openForReading(name)
val features = getFeatures(reader)
reader.close()
features
})

println(从tar.gz中提取歌曲,显示5个例子)
allHDF5.take(5).foreach(x => {x.foreach(y => print(y +))
println()})

几点评论:


  1. getFeatures方法:这个方法是一个非常简单的代码修改此处,提取一些功能并返回它们的数组。请注意,为了运行此功能提取代码,您需要此库,它有一个很好的 javadoc

  2. 请注意,如果此代码在具有多个执行程序的群集中运行,则执行程序会在本地写入.h5文件,因此如果它们围绕群集移动,那么在某些时候您可能会尝试读取一个文件在本地执行中不存在。


After referencing to this post, I could read multiple *.txt files residing in a *.tar.gz file. But for now, I need to read HDF5 files in a *.tar.gz file. The sample file could be downloaded here, which is generated from million songs dataset. Could anyone tell me how I should change the following code in order to read HDF5 files into RDD? Thanks!

package a.b.c

import org.apache.spark._
import org.apache.spark.sql.{SQLContext, DataFrame}
import org.apache.spark.ml.tuning.CrossValidatorModel
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.input.PortableDataStream
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream
import scala.util.Try
import java.nio.charset._

object Main {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("lab1").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._
    import sqlContext._

    val inputpath = "path/to/millionsong.tar.gz"
    val rawDF = sc.binaryFiles(inputpath, 2)
                .flatMapValues(x => extractFiles(x).toOption)
                .mapValues(_.map(decode()))
                .map(_._2)
                .flatMap(x => x)
                .flatMap { x => x.split("\n") }
                .toDF()
  }

  def extractFiles(ps: PortableDataStream, n: Int = 1024) = Try {
    val tar = new TarArchiveInputStream(new GzipCompressorInputStream(ps.open))
    Stream.continually(Option(tar.getNextTarEntry))
      // Read until next exntry is null
      .takeWhile(_.isDefined)
      // flatten
      .flatMap(x => x)
      // Drop directories
      .filter(!_.isDirectory)
      .map(e => {
        Stream.continually {
          // Read n bytes
          val buffer = Array.fill[Byte](n)(-1)
          val i = tar.read(buffer, 0, n)
          (i, buffer.take(i))}
        // Take as long as we've read something
        .takeWhile(_._1 > 0)
        .map(_._2)
        .flatten
        .toArray})
      .toArray
  }

  def decode(charset: Charset = StandardCharsets.UTF_8)(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
}

解决方案

I managed to read the HDF5 files within the tarball by writing the byte stream into a local file and then opening this file as h5, extracting the features using this. Here is my code:

var tarFiles: Array[String] = Array()
val tar_path = path + "millionsongsubset.tar.gz"

//TODO: add all your tar.gz files in main folder path to tarFiles array
//should add here as many tar.gz files as wanted containing the
//hdf5 files for the songs
tarFiles = tarFiles :+ tar_path
//tarFiles = tarFiles :+ (path+"A.tar.gz")
//tarFiles = tarFiles :+ (path+"B.tar.gz")
//tarFiles = tarFiles :+ (path+"C.tar.gz")

//This reads all tar.gz files in tarFiles list, and for each .h5
//file within, it extracts each song's list of features.
//Thus, it gets a list of features for all songs in the files.
var allHDF5 = sc.parallelize(tarFiles).flatMap(path => { 
    val tar = new TarArchiveInputStream(new GzipCompressorInputStream(new FileInputStream(path)))
    var entry: TarArchiveEntry = tar.getNextEntry().asInstanceOf[TarArchiveEntry]
    var res: List[Array[Byte]] = List()
    var i = 0
    while (entry != null) {
        var outputFile:File = new File(entry.getName());
        if (!entry.isDirectory() && entry.getName.contains(".h5")) {
            var byteFile = Array.ofDim[Byte](entry.getSize.toInt)
            tar.read(byteFile);
            res = byteFile :: res
            if(i%100==0) {
              println("Read " + i + " files")
            }
            i = i+1

        }
        entry = tar.getNextEntry().asInstanceOf[TarArchiveEntry]
    }
    //All files are turned into byte arrays
    res

  } ).map(bytes => {
    // The toString method is used as a UUID for the file
     val name = bytes.toString()
     FileUtils.writeByteArrayToFile(new File(name), bytes)
     val reader = HDF5Factory.openForReading(name)
     val features = getFeatures(reader)
     reader.close()
     features
  })

  println("Extracted songs from tar.gz, showing 5 examples")
  allHDF5.take(5).foreach(x => { x.foreach(y => print(y+" "))
                       println()})

Several remarks:

  1. getFeatures method: this method is a very simple adaptation of the code in here, extracting a few features and returning an array of them. Note that in order to run this feature extraction code you will need this library, which has a good javadoc.
  2. Note that if this code is run in a cluster with multiple executors, the executors write the .h5 file locally, so if they are moved around the cluster, at some point you might be trying to read a file that does not exist within the local execution.

这篇关于从Spark的scala中的* .tar.gz压缩文件中读取HDF5文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆