FileInputFormat其中filename是KEY,文本内容是VALUE [英] FileInputFormat where filename is KEY and text contents are VALUE

查看:151
本文介绍了FileInputFormat其中filename是KEY,文本内容是VALUE的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用整个文件作为 MAP 处理的单个记录,文件名作为关键字。

我已经阅读以下帖子:如何在运行Hadoop MapReduce作业时获取文件名/文件内容作为MAP的键/值输入?

,而顶级答案的理论是坚实的,没有代码或者如何实际提供。



这是我的自定义 FileInputFormat 和相应的 RecordReader ,它编译,但不生成任何记录数据。

感谢任何帮助。

  public class CommentsInput 
extends FileInputFormat< Text,Text> {
protected boolean isSplitable(FileSystem fs,Path filename)
{
return false;
}
@Override
public RecordReader< Text,Text> createRecordReader(InputSplit split,TaskAttemptContext ctx)
throws IOException,InterruptedException {
return new CommentFileRecordReader((FileSplit)split,ctx.getConfiguration());
}

//////////////// /////////

  public class CommentFileRecordReader 
extends RecordReader< Text,Text> {
私人InputStream in;
private long start;
私人长度;
私人长仓;
私人文本键;
私人文本值;
私有布尔处理;
private FileSplit fileSplit;
私有配置conf;

public CommentFileRecordReader(FileSplit fileSplit,Configuration conf)throws IOException
{
this.fileSplit = fileSplit;
this.conf = conf;
}

/ **文件输入流的Boilerplate初始化代码。 * /
@Override
public void initialize(InputSplit split,
TaskAttemptContext context)
throws IOException,InterruptedException {
配置conf = context.getConfiguration();

fileSplit =(FileSplit)split;
this.start = fileSplit.getStart();
this.length = fileSplit.getLength();
this.position = 0;
this.processed = false;

路径路径= fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
FSDataInputStream in = fs.open(path);

CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
CompressionCodec codec = codecs.getCodec(path);
if(codec!= null)
this.in = codec.createInputStream(in);
else
this.in = in;

//如果使用Writables:
// key = new Text();
// value = new Text();
}
public boolean next(Text key,Text value)throws IOException
{
if(!processed)
{
key = new Text(fileSplit 。.getPath()的toString());
路径文件= fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
byte [] contents = new byte [(int)fileSplit.getLength()];
try
{
in = fs.open(file);
IOUtils.readFully(in,contents,0,contents.length);
value.set(contents.toString());
}
finally
{
IOUtils.closeStream(in);
}
processed = true;
返回true;
}
返回false;
}

@Override
public boolean nextKeyValue()throws IOException {
// TODO解析下一个键值,更新位置并返回true。
返回false;
}

@Override
public Text getCurrentKey(){
return key;
}

@Override
public Text getCurrentValue(){
return value;
}

/ **返回我们在拆分中的进度,以0和1之间的浮点数。* /
@Override
public float getProgress(){
if(length == 0)
return 0.0f;
return Math.min(1.0f,position /(float)length);


@Override
public void close()throws IOException {
if(in!= null)
in.close();
}
}


解决方案

你需要找到一种方法来定义自己的关键类,并确保你的类使用它。您可以查找如何定义自己的密钥类,您可以通过在其路径上调用hte getName()方法获取文件名,然后使用它来创建密钥。 / p>

I'd like to use an entire file as a single record for MAP processing, with the filename as the key.
I've read the following post:
How to get Filename/File Contents as key/value input for MAP when running a Hadoop MapReduce Job?
and while the theory of the top answer is solid, no code or "how-to" is actually provided.

Here is my custom FileInputFormat and the corresponding RecordReader, which compile, yet do not produce ANY record data.
Thanks for any help.

public class CommentsInput
    extends FileInputFormat<Text,Text> {
protected boolean isSplitable(FileSystem fs, Path filename)
{
    return false;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext ctx)
        throws IOException, InterruptedException {
    return new CommentFileRecordReader((FileSplit) split, ctx.getConfiguration());
}

/////////////////////////

public class CommentFileRecordReader
    extends RecordReader<Text,Text> {
private InputStream in;
private long start;
private long length;
private long position;
private Text key;
private Text value;
private boolean processed;
private FileSplit fileSplit;
private Configuration conf;

public CommentFileRecordReader(FileSplit fileSplit, Configuration conf) throws IOException
{
    this.fileSplit = fileSplit;
    this.conf=conf;
}

/** Boilerplate initialization code for file input streams. */
@Override
public void initialize(InputSplit split,
                     TaskAttemptContext context)
                        throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();

    fileSplit = (FileSplit) split;
    this.start = fileSplit.getStart();
    this.length = fileSplit.getLength();
    this.position = 0;
    this.processed = false;

    Path path = fileSplit.getPath();
    FileSystem fs = path.getFileSystem(conf);
    FSDataInputStream in = fs.open(path);

    CompressionCodecFactory codecs = new CompressionCodecFactory(conf);
    CompressionCodec codec = codecs.getCodec(path);
    if (codec != null)
        this.in = codec.createInputStream(in);
    else
        this.in = in;

    // If using Writables:
    // key = new Text();
    // value = new Text();
}
public boolean next(Text key, Text value) throws IOException
{
    if(!processed)
    {
        key = new Text(fileSplit.getPath().toString());
        Path file = fileSplit.getPath();
        FileSystem fs = file.getFileSystem(conf);
        FSDataInputStream in = null;
        byte[] contents = new byte[(int) fileSplit.getLength()];
        try
        {
            in = fs.open(file);
            IOUtils.readFully(in, contents, 0, contents.length);
            value.set(contents.toString());
        }
        finally
        {
            IOUtils.closeStream(in);
        }
        processed = true;
        return true;
    }
    return false;
}

@Override
public boolean nextKeyValue() throws IOException {
    // TODO parse the next key value, update position and return true.
    return false;
}

@Override
public Text getCurrentKey() {
    return key;
}

@Override
public Text getCurrentValue() {
    return value;
}

/** Returns our progress within the split, as a float between 0 and 1. */
@Override
public float getProgress() {
    if (length == 0)
        return 0.0f;
    return Math.min(1.0f, position / (float)length);
}

@Override
public void close() throws IOException {
    if (in != null)
        in.close();
}
}  

解决方案

You need to find a way to define your own key class and make sure your classes use it. You can look up how to define your own key class and you can get a file name by calling hte getName() method on its path then use it to make your key.

这篇关于FileInputFormat其中filename是KEY,文本内容是VALUE的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆