在Spark / Scala中写入HDFS [英] Writing to HDFS in Spark/Scala

查看:1379
本文介绍了在Spark / Scala中写入HDFS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个spark / scala程序来读取ZIP文件,将它们解压缩并将内容写入一组新文件。我可以将其写入本地文件系统,但是想知道是否有办法将输出文件写入分布式文件系统(如HDFS)。代码如下所示:

  import java.util.zip.ZipInputStream 
import org.apache.spark.input。 PortableDataStream
import java.io._

var i = 1
sc.binaryFiles(file:/// d / tmp / zips /).flatMap((file :(String,PortableDataStream))=>
{


val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines

val fname = f/d/tmp/myfile$i.txt


i = i + 1

val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
$ b $ it it
))。collect()

`

解决方案

您可以使用hadoop通用库将数据轻松写入HDFS(如果您使用sbt作为依赖管理工具,将该库添加到您的依赖项中)。使用它你可以创建一个 FileSystem 对象:

  private val fs = {
val conf = new Configuration()
FileSystem.get(conf)
}

确保使用hadoop集群信息(core-site.xml等)配置FileSystem。然后,您可以编写一个字符串到路径(在您的情况下你应该处理流),在HDFS上如下:

  @throws [IOException] 
def writeAsString(hdfsPath: (fs.exists(path)){
fs.delete(path,true)
}
val dataOutputStream:FSDataOutputStream = fs.create(path)
val bw:BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,UTF-8))
bw.write(content )
bw.close
}


I am writing a spark/scala program to read in ZIP files, unzip them and write the contents to a set of new files. I can get this to work for writing to the local file system but wondered if there was a way to to write the output files to a distributed file system such as HDFS. Code is shown below`

import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._

var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap( (file: (String,     PortableDataStream)) => 
   {   


   val zipStream = new ZipInputStream(file._2.open)            
   val entry = zipStream.getNextEntry                            
   val iter = scala.io.Source.fromInputStream(zipStream).getLines          

   val fname = f"/d/tmp/myfile$i.txt" 


   i = i + 1

   val xx = iter.mkString
   val writer = new PrintWriter(new File(fname))
   writer.write(xx)
   writer.close()

   iter                                                       
   }).collect()

`

解决方案

You can easy write data to HDFS using hadoop-common library (if you are using sbt as dependency manangement tool, add thath library to your dependency). With that you can create a FileSystem object :

 private val fs = {
    val conf = new Configuration()
    FileSystem.get(conf)
  }

Be sure to configure the FileSystem with your hadoop cluster information (core-site.xml, etc)

Then you can write, for example a String to path (in your case you should deal with streams), on HDFS as following:

@throws[IOException]
  def writeAsString(hdfsPath: String, content: String) {
    val path: Path = new Path(hdfsPath)
    if (fs.exists(path)) {
      fs.delete(path, true)
    }
    val dataOutputStream: FSDataOutputStream = fs.create(path)
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8"))
    bw.write(content)
    bw.close
  }

这篇关于在Spark / Scala中写入HDFS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆