为什么SequenceFile被截断? [英] Why the SequenceFile is truncated?

查看:171
本文介绍了为什么SequenceFile被截断?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习 Hadoop ,这个问题让我困惑了一阵子。基本上我正在写一个 SequenceFile 到磁盘,然后再读回来。但是,每次读取时,我都会收到 EOFException 。深层次的观察表明,在编写序列文件时,它过早地被截断,并且它总是在写入索引962之后发生,并且文件总是具有45056字节的固定大小。



<我在MacBook Pro上使用Java 8和Hadoop 2.5.1。事实上,我在Java 7下的另一台Linux机器上尝试了相同的代码,但是也发生了同样的情况。



我可以排除写入器/阅读器没有正确关闭。我试着用代码中显示的writer.close()来显式使用老式的try / catch,并且还使用了更新的try-with-resource方法。这两个都不起作用。



任何帮助都将得到高度赞赏。



以下是我正在使用的代码:

  public class SequenceFileDemo {

private static final String [] DATA = {One,two,扣我的鞋子,
三,四,关门,
五,六,拿起棍子,
七,八,平躺,
九,十,一只大肥鸡};

public static void main(String [] args)throws Exception {
String uri =file:///Users/andy/Downloads/puzzling.seq;
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);

路径路径=新路径(uri);
IntWritable key = new IntWritable();
Text value = new Text();

// API更改
尝试{
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
stream(fs.create(path)),
keyClass(IntWritable.class),
valueClass(Text.class));

for(int i = 0; i <1024; i ++){
key.set(i);
value.clear();
value.set(DATA [i%DATA.length]);

writer.append(key,value);如果((i-1)%100 == 0)
writer.hflush();
System.out.printf([%s] \t%s\t%s\\\
,writer.getLength(),key,value);
}

writer.close();

} catch(Exception e){
e.printStackTrace();



try {
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(path));
Class<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();

boolean isWritableSerilization = false;
尝试{
keyClass.asSubclass(WritableComparable.class);
isWritableSerilization = true;
} catch(ClassCastException e){

}

if(isWritableSerilization){
WritableComparable<?> rKey =(WritableComparable<> ReflectionUtils.newInstance(keyClass,conf);
Writable rValue =(可写)ReflectionUtils.newInstance(valueClass,conf); (读者.next(rKey,rValue)){
System.out.printf([%s]%d%s =%s \ n,reader.syncSeen(),reader .getPosition(),rKey,rValue);
}
} else {
//确保io.seraizliatons在编写序列文件时使用序列化
}

reader.close( );
} catch(IOException e){
e.printStackTrace();



$ b


解决方法

我实际上发现了错误,这是因为您从未关闭 Writer.stream(fs.create(path))中的已创建流。



由于某种原因,关闭不会传播到您刚创建的流。这是一个我想要的错误,但我现在懒得在Jira中查找它。



解决您的问题的一种方法是简单地使用 Writer.file(path)改为。



显然,您也可以直接关闭create stream。找到我正确的例子:

$ p $ Path path = new Path(file:

try(FSDataOutputStream stream = fs.create(path)){
try(SequenceFile.Writer writer = SequenceFile.createWriter(conf,Writer.stream(stream),
Writer .keyClass(IntWritable.class),Writer.valueClass(NullWritable.class))){

for(int i = 0; i <1024; i ++){
writer.append(新的IntWritable(i),NullWritable.get());



$ b try(SequenceFile.Reader reader = new SequenceFile.Reader(conf,Reader.file(path))){
类<?> keyClass = reader.getKeyClass();
Class<?> valueClass = reader.getValueClass();

WritableComparable<?> rKey =(WritableComparable<> ReflectionUtils.newInstance(keyClass,conf);
Writable rValue =(可写)ReflectionUtils.newInstance(valueClass,conf);
while(reader.next(rKey,rValue)){
System.out.printf(%s =%s \ n,rKey,rValue);
}

}


I am learning Hadoop and this problem has baffled me for a while. Basically I am writing a SequenceFile to disk and then read it back. However, every time I get an EOFException when reading. A deeper look reveals that when writing the sequence file, it is prematurely truncated, and it always happens after writing index 962, and the file always has a fixed size of 45056 bytes.

I am using Java 8 and Hadoop 2.5.1 on a MacBook Pro. In fact, I tried the same code on another Linux machine under Java 7, but the same things happens.

I can rule out writer/reader is not properly closed. I tried using the old styled try/catch with an explicit writer.close() as shown in the code, and also use the newer try-with-resource approach. Both are not working.

Any help will be highly appreciated.

Following is the code I am using:

public class SequenceFileDemo {

private static final String[] DATA = { "One, two, buckle my shoe",
    "Three, four, shut the door",
    "Five, six, pick up sticks",
    "Seven, eight, lay them straight",
    "Nine, ten, a big fat hen" };

public static void main(String[] args) throws Exception {
    String uri = "file:///Users/andy/Downloads/puzzling.seq";
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);

    Path path = new Path(uri);      
    IntWritable key = new IntWritable();
    Text value = new Text();

    //API change
    try {
        SequenceFile.Writer writer = SequenceFile.createWriter(conf, 
            stream(fs.create(path)),
            keyClass(IntWritable.class),
            valueClass(Text.class));

        for ( int i = 0; i < 1024; i++ ) {
            key.set( i);
            value.clear();
            value.set(DATA[i % DATA.length]);

            writer.append(key, value);
            if ( (i-1) %100 == 0 ) writer.hflush();
            System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value);
        }

        writer.close();

    } catch (Exception e ) {
        e.printStackTrace();
    }


    try {
        SequenceFile.Reader reader = new SequenceFile.Reader(conf, 
                SequenceFile.Reader.file(path));
        Class<?> keyClass = reader.getKeyClass();
        Class<?> valueClass = reader.getValueClass();

        boolean isWritableSerilization = false;
        try {
            keyClass.asSubclass(WritableComparable.class);
            isWritableSerilization = true;
        } catch (ClassCastException e) {

        }

        if ( isWritableSerilization ) {
            WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
            Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
            while(reader.next(rKey, rValue)) {
                System.out.printf("[%s] %d %s=%s\n",reader.syncSeen(), reader.getPosition(), rKey, rValue);
            }
        } else {
            //make sure io.seraizliatons has the serialization in use when write the sequence file
        }

        reader.close();
    } catch(IOException e) {
        e.printStackTrace();
    }
}

}

解决方案

I actually found the error, it is because you are never closing the created stream in Writer.stream(fs.create(path)).

For some reason the close doesn't propagate down to the stream you just created there. This is a bug I suppose, but I'm too lazy to look it up in Jira for now.

One way to fix your problems is to simply use Writer.file(path) instead.

Obviously, you can also just close the create stream explicitly. Find my corrected example below:

    Path path = new Path("file:///tmp/puzzling.seq");

    try (FSDataOutputStream stream = fs.create(path)) {
        try (SequenceFile.Writer writer = SequenceFile.createWriter(conf, Writer.stream(stream),
                Writer.keyClass(IntWritable.class), Writer.valueClass(NullWritable.class))) {

            for (int i = 0; i < 1024; i++) {
                writer.append(new IntWritable(i), NullWritable.get());
            }
        }
    }

    try (SequenceFile.Reader reader = new SequenceFile.Reader(conf, Reader.file(path))) {
        Class<?> keyClass = reader.getKeyClass();
        Class<?> valueClass = reader.getValueClass();

        WritableComparable<?> rKey = (WritableComparable<?>) ReflectionUtils.newInstance(keyClass, conf);
        Writable rValue = (Writable) ReflectionUtils.newInstance(valueClass, conf);
        while (reader.next(rKey, rValue)) {
            System.out.printf("%s = %s\n", rKey, rValue);
        }

    }

这篇关于为什么SequenceFile被截断?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆