如何在没有丢失信息的情况下将Spark etl更多地并行化(以文件名形式) [英] How to paralelize spark etl more w/out losing info (in file names)

查看:48
本文介绍了如何在没有丢失信息的情况下将Spark etl更多地并行化(以文件名形式)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我要逐一查看HDFS上的文件列表,将其作为文本打开,然后保存回HDFS,到另一个位置.正在解析数据,然后合并 part 文件并将其保存为原始名称,后缀为BZIP2.但是,它相当慢-每个文件大约需要3s,而每个文件夹中有超过10,000个文件.我不确定如何保存文件名信息,因此需要逐个文件查找.我需要名称才能执行MD5,并确认"没有发生任何信息丢失.

I'm going over a list of files on HDFS one by one, opening it as text and then saving back to HDFS, to another location. Data is being parsed, then part files are merged and saved to same name as original, with BZIP2 suffix. However, it's rather slow - takes ~3s for each file, and I have over 10,000 of them per folder. I need to go file by file because I'm unsure how to keep the file name information. I need name to be able to do a MD5 and "confirm" no information loss has happened.

这是我的代码:

import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions.broadcast 
import org.apache.spark.sql.types._ 
import org.apache.spark.{SparkConf, SparkContext} 

sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", 
               "org.apache.hadoop.io.compress.BZip2Codec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

val hdfsConf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfsConf)
val sourcePath = new Path("/source/*20180801*") 

hdfs.globStatus(sourcePath).foreach( fileStatus => {
  val fileName = fileStatus.getPath().getName()
  val filePathName = fileStatus.getPath().toString
  if (fileName.contains(".done")) {
    /* open, then save compressed */
    val myFile = sc.textFile(filePathName)
    val compressedBasePath = "/destination/compressed/"
    /* use tmp_ to store folder w/ parts in it */
    val compressedPath = compressedBasePath + "tmp_/" + fileName
    myFile.saveAsTextFile(compressedPath, 
                          classOf[org.apache.hadoop.io.compress.BZip2Codec])
    /* merge part* -> old_name.bzip */
    FileUtil.copyMerge(hdfs, new Path(compressedPath), hdfs, 
                       new Path(compressedBasePath + "/" + fileName + ".bzip2"), 
                       true, hdfsConf, null)
    myFile.unpersist()
  }
})

在我确定需要测试之前,我使用了类似的方法:

Before I figured I need the tests, I used something like this:

val myFile = sc.textFile("/source/*20180801*")
myFile.saveAsTextFile(compressedPath, 
                      classOf[org.apache.hadoop.io.compress.BZip2Codec])

但是我不能重命名,所以我需要名称. 有什么想法我可以做什么?

But then I cannot do the renaming part, and I do need the names. Any ideas what I could do?

更新: 感谢评论中的建议以及这个特定问题,我得以解决使用并行集合的问题.唯一真正的改变是导入import scala.collection.parallel.immutable.ParVector并添加par方法调用,然后再执行foreach.

UPDATE: Thanks to suggestions in comments, and this particular question, I was able to fix the issue using parallel collections. The only real change was import of import scala.collection.parallel.immutable.ParVector and adding par method call before doing the foreach.

有关并行集合的完整文章: https://docs.scala -lang.org/overviews/parallel-collections/overview.html

Full article about parallel collections: https://docs.scala-lang.org/overviews/parallel-collections/overview.html

Thx

推荐答案

原始问题的注释中有两种可能的解决方案:

There were two potential solutions in the original question's comments:

TBH,我只测试了第二种方法,因为它是一种更快的方法(警告更少).最终解决方案只需要进行最小的更改-返回适当的lib的导入和Array hdfs.globStatus(sourcePath)调用的并行化.这是最终的代码,其中删除了注释,并添加了两个注释,以便于发现更改.

TBH, I only tested second approach as it was a quicker one (with less warnings). Final solution required only minimal changes - import of appropriate lib and parallelization of the Array hdfs.globStatus(sourcePath) call was returning. Here's the final code, with comments removed and two comments added for easier spotting of changes.

import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.functions.broadcast 
import org.apache.spark.sql.types._ 
import org.apache.spark.{SparkConf, SparkContext} 
import scala.collection.parallel.immutable.ParVector /* added */

sc.getConf.set("spark.hadoop.mapred.output.compress", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true")
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", 
               "org.apache.hadoop.io.compress.BZip2Codec")
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

val hdfsConf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfsConf)
val sourcePath = new Path("/source/*20180801*") 

/* note the par method call below */
hdfs.globStatus(sourcePath).par.foreach( fileStatus => {
  val fileName = fileStatus.getPath().getName()
  val filePathName = fileStatus.getPath().toString
  if (fileName.contains(".done")) {
    val myFile = sc.textFile(filePathName)
    val compressedBasePath = "/destination/compressed/"
    val compressedPath = compressedBasePath + "tmp_/" + fileName
    myFile.saveAsTextFile(compressedPath, 
                          classOf[org.apache.hadoop.io.compress.BZip2Codec])
    FileUtil.copyMerge(hdfs, new Path(compressedPath), hdfs, 
                       new Path(compressedBasePath + "/" + 
                                fileName.replace(".done", ".done.bz2")), 
                       true, hdfsConf, null)
    myFile.unpersist()
  }
})

这篇关于如何在没有丢失信息的情况下将Spark etl更多地并行化(以文件名形式)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆