扩展SequenceFileInputFormat以包含文件名+偏移量 [英] Extend SequenceFileInputFormat to include file name+offset

查看:80
本文介绍了扩展SequenceFileInputFormat以包含文件名+偏移量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望能够创建一个自定义的InputFormat来读取序列文件,但是另外公开该文件中记录所在位置的文件路径和偏移量。



退一步说,这里是用例:我有一个包含可变大小数据的序列文件。密钥大部分是不相关的,值高达几兆字节,包含各种不同的字段。我想索引一些在elasticsearch中的这些字段以及文件名和偏移量。这样,我可以从elasticsearch中查询这些字段,然后使用文件名和偏移量返回序列文件并获取原始记录,而不是将整个东西存储在ES中。



我有这个整个过程作为一个单独的java程序工作。 SequenceFile.Reader类方便地给出 getPosition seek 方法来实现这一点。



但是,最终会涉及很多太字节的数据,因此我需要将其转换为MapReduce作业(可能仅限Map)。由于序列文件中的实际键是不相关的,我希望采用的方法是创建一个自定义的InputFormat,它扩展或以某种方式利用SquenceFileInputFormat,但不是返回实际的键,而是返回由文件组成的组合键和抵消。



然而,这在实践中证明更加困难。看起来它应该是可能的,但考虑到实际的API和暴露的内容,这很棘手。有任何想法吗?也许我应该采取另一种方法?解决方案

如果任何人遇到类似的问题,这里是我提出的解决方案。我最终只是复制了SequenceFileInputFormat / RecordReader中的一些代码并修改它。我曾希望写一个子类或装饰器或其他东西......这种方式并不美观,但它的工作原理:



SequenceFileOffsetInputFormat.java:

  import java.io.IOException; 
import java.util.List;

导入org.apache.hadoop.conf.Configuration;
导入org.apache.hadoop.fs.FileSystem;
导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

public class SequenceFileOffsetInputFormat< V extends Writable>扩展FileInputFormat< PathOffsetWritable,V> {

private static class SequenceFileOffsetRecordReader< V extends Writable>扩展RecordReader< PathOffsetWritable,V> {

private SequenceFile.Reader in;
私人长途入门;
私人长期结束;
private boolean more = true;
private PathOffsetWritable key = null;
private可写k = null;
private V value = null;
私人配置conf;
$ b @Override
public void initialize(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException {
FileSplit fileSplit =(FileSplit)split;
conf = context.getConfiguration();
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
this.in = new SequenceFile.Reader(fs,path,conf);
尝试{
this.k =(可写)in.getKeyClass()。newInstance();
this.value =(V)in.getValueClass()。newInstance();
catch(InstantiationException e){
抛出new IOException(e);
catch(IllegalAccessException e){
抛出new IOException(e);
}
this.end = fileSplit.getStart()+ fileSplit.getLength(); (fileSplit.getStart()> in.getPosition()){
in.sync(fileSplit.getStart());

if
}

this.start = in.getPosition();
more =开始<结束;

key = new PathOffsetWritable(path,start);

$ b @Override
public boolean nextKeyValue()throws IOException,InterruptedException {
if(!more){
return false;
}
long pos = in.getPosition();

more = in.next(k,value);
if(!more ||(pos> = end&& in.syncSeen())){
key = null;
value = null;
more = false;
} else {
key.setOffset(pos);
}
返回更多;
}

@Override
public PathOffsetWritable getCurrentKey(){
return key;
}

@Override
public V getCurrentValue(){
返回值;

$ b @Override
public float getProgress()throws IOException,InterruptedException {
if(end == start){
return 0.0f;
} else {
return Math.min(1.0f,(in.getPosition() - start)/(float)(end - start));
}
}

@Override
public void close()throws IOException {
in.close();
}

}

@Override
public RecordReader< PathOffsetWritable,V> createRecordReader(InputSplit split,TaskAttemptContext context)throws IOException,InterruptedException {
return new SequenceFileOffsetRecordReader< V>();
}

@Override
public List< InputSplit> getSplits(JobContext上下文)抛出IOException {
return new SequenceFileInputFormat< PathOffsetWritable,V>()。getSplits(context);
}

@Override
public long getFormatMinSplitSize(){
return SequenceFile.SYNC_INTERVAL;
}


}

PathOffsetWritable.java :

  import java.io.DataInput; 
import java.io.DataOutput;
import java.io.IOException;

导入org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class PathOffsetWritable implements WritableComparable< PathOffsetWritable> {

private Text t = new Text();
私人路径路径;
私人多头抵消;

public PathOffsetWritable(Path path,long offset){
this.path = path;
this.offset = offset;
}

公共路径getPath(){
return path;
}

public long getOffset(){
return offset;
}

public void setPath(Path path){
this.path = path;
}

public void setOffset(long offset){
this.offset = offset;
}

@Override
public void readFields(DataInput in)throws IOException {
t.readFields(in);
path = new Path(t.toString());
offset = in.readLong();
}

@Override
public void write(DataOutput out)throws IOException {
t.set(path.toString());
t.write(out);
out.writeLong(offset);
}

@Override
public int compareTo(PathOffsetWritable o){
int x = path.compareTo(o.path);
if(x!= 0){
return x;
} else {
return Long.valueOf(offset).compareTo(Long.valueOf(o.offset));
}
}


}


I would like to be able to create a custom InputFormat that reads sequence files, but additionally exposes the file path and offset within that file where the record is located.

To take a step back, here's the use case: I have a sequence file containing variably-sized data. The keys are mostly irrelevant, and the values are up to a couple megabytes containing a variety of different fields. I would like to index some of these fields in elasticsearch along with the file name and offset. This way, I can query those fields from elasticsearch, and then use the file name and offset to go back to the sequence file and obtain the original record, instead of storing the whole thing in ES.

I have this whole process working as a single java program. The SequenceFile.Reader class conveniently gives getPosition and seek methods to make this happen.

However, there will eventually be many terabytes of data involved, so I will need to convert this to a MapReduce job (probably Map-only). Since the actual keys in the sequence file are irrelevant, the approach I had hoped to take would be to create a custom InputFormat that extends or somehow utilizes the SquenceFileInputFormat, but instead of returning the actual keys, instead returns a composite key consisting of the file and offset.

However, that's proving to be more difficult in practice. It seems like it should be possible, but given the actual APIs and what's exposed, it's tricky. Any ideas? Maybe an alternative approach I should take?

解决方案

In case anyone encounters a similar problem, here's the solution I came up with. I ended up simply duplicating some of the code in SequenceFileInputFormat/RecordReader and just modifying it. I had hoped to write either a subclass or a decorator or something... this way is not pretty, but it works:

SequenceFileOffsetInputFormat.java:

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

public class SequenceFileOffsetInputFormat<V extends Writable> extends FileInputFormat<PathOffsetWritable, V> {

    private static class SequenceFileOffsetRecordReader<V extends Writable> extends RecordReader<PathOffsetWritable, V> {

        private SequenceFile.Reader in;
        private long start;
        private long end;
        private boolean more = true;
        private PathOffsetWritable key = null;
        private Writable k = null;
        private V value = null;
        private Configuration conf;

        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            FileSplit fileSplit = (FileSplit) split;
            conf = context.getConfiguration();
            Path path = fileSplit.getPath();
            FileSystem fs = path.getFileSystem(conf);
            this.in = new SequenceFile.Reader(fs, path, conf);
            try {
                this.k = (Writable) in.getKeyClass().newInstance();
                this.value = (V) in.getValueClass().newInstance();
            } catch (InstantiationException e) {
                throw new IOException(e);
            } catch (IllegalAccessException e) {
                throw new IOException(e);
            }
            this.end = fileSplit.getStart() + fileSplit.getLength();

            if (fileSplit.getStart() > in.getPosition()) {
                in.sync(fileSplit.getStart());
            }

            this.start = in.getPosition();
            more = start < end;

            key = new PathOffsetWritable(path, start);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!more) {
                return false;
            }
            long pos = in.getPosition();

            more = in.next(k, value);
            if (!more || (pos >= end && in.syncSeen())) {
                key = null;
                value = null;
                more = false;
            } else {
                key.setOffset(pos);
            }
            return more;
        }

        @Override
        public PathOffsetWritable getCurrentKey() {
            return key;
        }

        @Override
        public V getCurrentValue() {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (end == start) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (in.getPosition() - start) / (float)(end - start));
            }
        }

        @Override
        public void close() throws IOException {
            in.close();
        }

    }

    @Override
    public RecordReader<PathOffsetWritable, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new SequenceFileOffsetRecordReader<V>();
    }

    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {
        return new SequenceFileInputFormat<PathOffsetWritable, V>().getSplits(context);
    }

    @Override
    public long getFormatMinSplitSize() {
        return SequenceFile.SYNC_INTERVAL;
    }


}

PathOffsetWritable.java:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class PathOffsetWritable implements WritableComparable<PathOffsetWritable> {

    private Text t = new Text();
    private Path path;
    private long offset;

    public PathOffsetWritable(Path path, long offset) {
        this.path = path;
        this.offset = offset;
    }

    public Path getPath() {
        return path;
    }

    public long getOffset() {
        return offset;
    }

    public void setPath(Path path) {
        this.path = path;
    }

    public void setOffset(long offset) {
        this.offset = offset;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        t.readFields(in);
        path = new Path(t.toString());
        offset = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        t.set(path.toString());
        t.write(out);
        out.writeLong(offset);
    }

    @Override
    public int compareTo(PathOffsetWritable o) {
        int x = path.compareTo(o.path);
        if (x != 0) {
            return x;
        } else {
            return Long.valueOf(offset).compareTo(Long.valueOf(o.offset));
        }
    }


}

这篇关于扩展SequenceFileInputFormat以包含文件名+偏移量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆