使用Spark在HDFS上对Har文件进行Gz压缩 [英] Gzipping Har Files on HDFS using Spark

查看:1310
本文介绍了使用Spark在HDFS上对Har文件进行Gz压缩的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在hadoop存档 .har 格式中有大量数据。由于 har 不包含任何压缩,我正试图将其进一步gzip并存储在HDFS中。我可以毫无错误地工作的唯一一件事是:

$ p $ harFile.coalesce(1,true)
.saveAsTextFile(hdfs:// namenode / archive / GzipOutput,classOf [org.apache.hadoop.io.compress.GzipCodec])
//`coalesce`,因为Gzip不可拆分。

但是,这并没有给我正确的结果。一个Gzipped文件被生成,但输出无效(单行说rdd类型等)。

任何帮助将不胜感激。我也接受任何其他方法。



感谢。

解决方案

创建压缩的Java代码片段现有HDFS文件的版本。

在文本编辑器中匆匆建立了一段时间以前编写的Java应用程序,因此未经测试;一些拼写错误和预计的差距。

$ $ p $ // HDFS API
导入org.apache.hadoop.conf.Configuration ;
import org.apache.hadoop.security.UserGroupInformation;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
//原生Hadoop压缩库
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
导入org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.Lz4Codec;

..............

// Hadoop配置(及其衍生物,用于HDFS,HBase等)构造函数尝试auto-magically
//通过在目录中搜索CLASSPATH并在每个目录中搜索硬编码的
// namecore-site.xml以及hdfs-site.xml来查找它们的配置文件。和/或hbase-site.xml等。
//警告 - 如果找不到这些配置文件,配置将恢复为硬编码的默认值,而不会出现
//任何警告,从而导致奇怪的错误消息后>让我们在这里运行一些显式控件
配置cnfHadoop = new Configuration();
String propDefaultFs = cnfHadoop.get(fs.defaultFS);
if(propDefaultFs == null ||!propDefaultFs.startsWith(hdfs://))
{throw new IllegalArgumentException(
HDFS配置丢失 - 没有正确的\核心-site.xml \找到了,请在CLASSPATH
中添加\
+目录/ etc / hadoop / conf /(或自定义的带有自定义XML conf文件的目录)。
}
/ *
//对于Kerberized集群,无论您在默认的
票证缓存中是否有有效的TGT(通过kinit),或者您有通过代码进行身份验证
UserGroupInformation.setConfiguration(cnfHadoop);
用户组信息。
* /
FileSystem fsCluster = FileSystem.get(cnfHadoop);
Path source = new Path(/ some / hdfs / path / to / XXX.har);
Path target = new Path(/ some / hdfs / path / to / XXX.har.gz);

//替代方案:BZip2Codec用于更好的压缩(但CPU成本更高)
//替代方案:SnappyCodec或Lz4Codec用于较低的压缩率(但CPU成本更低)
CompressionCodecFactory codecBootstrap = new CompressionCodecFactory(cnfHadoop);
CompressionCodec codecHadoop = codecBootstrap.getCodecByClassName(GzipCodec.class.getName());
Compressor compressorHadoop = codecHadoop.createCompressor();

byte [] buffer = new byte [16 * 1024 * 1024];
int bufUsedCapacity;
InputStream sourceStream = fsCluster.open(source);
OutputStream targetStream = codecHadoop.createOutputStream(fsCluster.create(target,true),compressorHadoop); ((bufUsedCapacity = sourceStream.read(buffer))> 0)
{targetStream.write(buffer,0,bufUsedCapacity); }
targetStream.close();
sourceStream.close();

..............


I have huge data in hadoop archive .har format. Since, har doesn't include any compression, I am trying to further gzip it in and store in HDFS. The only thing I can get to work without error is :

harFile.coalesce(1, "true")
.saveAsTextFile("hdfs://namenode/archive/GzipOutput", classOf[org.apache.hadoop.io.compress.GzipCodec])
//`coalesce` because Gzip isn't splittable.

But, this doesn't give me the correct results. A Gzipped file is generated but with invalid output ( a single line saying the rdd type etc.)

Any help will be appreciated. I am also open to any other approaches.

Thanks.

解决方案

A Java code snippet to create a compressed version of an existing HDFS file.

Built in a hurry, in a text editor, from bits and pieces of a Java app I wrote some time ago, hence not tested; some typos and gaps to be expected.

// HDFS API
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
// native Hadoop compression libraries
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.Lz4Codec;

..............

  // Hadoop "Configuration" (and its derivatives for  HDFS, HBase etc.) constructors try to auto-magically
  //  find their config files by searching CLASSPATH for directories, and searching each dir for hard-coded  
  //  name "core-site.xml", plus "hdfs-site.xml" and/or "hbase-site.xml" etc.
  // WARNING - if these config files are not found, the "Configuration" reverts to hard-coded defaults without
  //  any warning, resulting in bizarre error messages later > let's run some explicit controls here
  Configuration cnfHadoop = new Configuration() ;
  String propDefaultFs =cnfHadoop.get("fs.defaultFS") ;
  if (propDefaultFs ==null || ! propDefaultFs.startsWith("hdfs://"))
  { throw new IllegalArgumentException(
                "HDFS configuration is missing - no proper \"core-site.xml\" found, please add\n"
               +"directory /etc/hadoop/conf/ (or custom dir with custom XML conf files) in CLASSPATH"
               ) ;
  }
/*
  // for a Kerberised cluster, either you already have a valid TGT in the default
  //  ticket cache (via "kinit"), or you have to authenticate by code
  UserGroupInformation.setConfiguration(cnfHadoop) ;
  UserGroupInformation.loginUserFromKeytab("user@REALM", "/some/path/to/user.keytab") ;
*/
  FileSystem fsCluster =FileSystem.get(cnfHadoop) ;
  Path source = new Path("/some/hdfs/path/to/XXX.har") ;
  Path target = new Path("/some/hdfs/path/to/XXX.har.gz") ;

  // alternative: "BZip2Codec" for better compression (but higher CPU cost)
  // alternative: "SnappyCodec" or "Lz4Codec" for lower compression (but much lower CPU cost)
  CompressionCodecFactory codecBootstrap = new CompressionCodecFactory(cnfHadoop) ;
  CompressionCodec codecHadoop =codecBootstrap.getCodecByClassName(GzipCodec.class.getName()) ;
  Compressor compressorHadoop =codecHadoop.createCompressor() ;

  byte[] buffer = new byte[16*1024*1024] ;
  int bufUsedCapacity ;
  InputStream  sourceStream =fsCluster.open(source) ;
  OutputStream targetStream =codecHadoop.createOutputStream(fsCluster.create(target, true), compressorHadoop) ;
  while ((bufUsedCapacity =sourceStream.read(buffer)) >0)
  { targetStream.write(buffer, 0, bufUsedCapacity) ; }
  targetStream.close() ;
  sourceStream.close() ;

..............

这篇关于使用Spark在HDFS上对Har文件进行Gz压缩的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆