在Hadoop中实现自定义Writable? [英] Implementation of custom Writable in Hadoop?

查看:141
本文介绍了在Hadoop中实现自定义Writable?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Hadoop中定义了一个自定义的Writable类,但是当运行我的程序时,Hadoop给了我下面的错误信息。

  java.lang.RuntimeException:java.lang.NullPointerException 
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.io.SortedMapWritable.readFields (SortedMapWritable.java:180)
at EquivalenceClsAggValue.readFields(EquivalenceClsAggValue.java:82)
at org.apache.hadoop.io.serializer.WritableSerialization $ WritableDeserializer.deserialize(WritableSerialization.java:67)
at org.apache.hadoop.io.serializer.WritableSerialization $ WritableDeserializer.deserialize(WritableSerialization.java:40)
at org.apache.hadoop.mapred.Task $ ValuesIterator.readNextValue(Task.java:1282 )
at org.apache.hadoop.mapred.Task $ ValuesIterator.next(Task.java:1222)
at org.apache.hadoop.mapred.Task $ CombineValuesIterator.next(Task.java:1301 )Mondrian
$ Combine.reduce(Mondrian.java:119)$ b $ Mondrian $ $ Combine.reduce(Mondrian.java:1)
at org.apache.hadoop.mapred.Task $ OldCombinerRunner.combine(Task.java:1442)
at org.apache.hadoop .mapred.MapTask $ MapOutputBuffer.sortAndSpill(MapTask.java:1436)
at org.apache.hadoop.mapred.MapTask $ MapOutputBuffer.flush(MapTask.java:1298)
at org.apache.hadoop .mapred.MapTask.runOldMapper(MapTask.java:437)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.Child $ 4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136)
at org.apache.hadoop.mapred.Child.main(Child.java:249)

引起:java.lang.NullPointerException $ b $ java.util.concurrent.ConcurrentHashMap.hash(ConcurrentHashMap.java:332 )....



EquivalenceClsAggValue是Wri的名称我已经定义了表类,这是我的类:

  public class EquivalenceClsAggValue implements WritableComparable< EquivalenceClsAggValue> {

public ArrayList< SortedMapWritable> aggValues;
public EquivalenceClsAggValue(){

aggValues = new ArrayList< SortedMapWritable>();
}
@Override
public void readFields(DataInput arg0)throws IOException {

int size = arg0.readInt();

for(int i = 0; i< size; i ++){
SortedMapWritable tmp = new SortedMapWritable();
tmp.readFields(arg0);
aggValues.add(tmp);


$ b $ @覆盖
public void write(DataOutput arg0)抛出IOException {

//首先写入大小
arg0.write(aggValues.size());

//写入每个元素
(SortedMapWritable s:aggValues){
s.write(arg0);
}

}

我不知道什么是问题的来源。

解决方案

看起来像写入(DataOutput)中的错误 $ b

 @Override 
public void write(DataOutput arg0)throws IOException {
/ /先写大小
// arg0.write(aggValues.size()); //这里你写一个int作为一个字节

//试试这个:
arg0.writeInt(aggValues.size()); //实际上将int写为int

// ..

DataOutput.write(int vs DataOutput.writeInt(int)

我还会修改您在readFields中创建 SortedMapWritable tmp局部变量以使用 ReflectionUtils.newInstance()


$ b $ pre $ @Override
public void readFields(DataInput arg0)抛出IOException {

int size = arg0.readInt();

for(int i = 0; i< size; i ++){
SortedMapWritable tmp = ReflectionUtils.newInstance(
SortedMapWritable.class,getConf());
tmp.readFields(arg0);
aggValues.add(tmp);


$ / code $ / pre

注意这个工作,你还需要修改你的类签名以扩展 Configurable (例如,当您的对象初始创建时,Hadoop将注入一个 Configuration >对象):

  public class EquivalenceClsAggValue 
extends配置
实现WritableComparable< EquivalenceClsAggValue> {


I have defined a custom Writable class in Hadoop, but Hadoop gives me the following error message when running my program.

java.lang.RuntimeException: java.lang.NullPointerException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.io.SortedMapWritable.readFields(SortedMapWritable.java:180)
at EquivalenceClsAggValue.readFields(EquivalenceClsAggValue.java:82)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1282)
at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1222)
at org.apache.hadoop.mapred.Task$CombineValuesIterator.next(Task.java:1301)
at Mondrian$Combine.reduce(Mondrian.java:119)
at Mondrian$Combine.reduce(Mondrian.java:1)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1442)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1298)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:437)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136)
at org.apache.hadoop.mapred.Child.main(Child.java:249)

Caused by: java.lang.NullPointerException at java.util.concurrent.ConcurrentHashMap.hash(ConcurrentHashMap.java:332)....

EquivalenceClsAggValue is the name of the Writable class I've defined and this is my class:

public class EquivalenceClsAggValue implements WritableComparable<EquivalenceClsAggValue>{

public ArrayList<SortedMapWritable> aggValues;  
public EquivalenceClsAggValue(){        

    aggValues = new ArrayList<SortedMapWritable>();
}
@Override
public void readFields(DataInput arg0) throws IOException {

    int size = arg0.readInt();

    for (int i=0;i<size;i++){
        SortedMapWritable tmp = new SortedMapWritable();
        tmp.readFields(arg0);
        aggValues.add(tmp);
    }       
}

@Override
public void write(DataOutput arg0) throws IOException {

    //write the size first
    arg0.write(aggValues.size());

    //write each element
    for (SortedMapWritable s:aggValues){
        s.write(arg0);
    }

}

I wonder to know what is the source of the problem.

解决方案

Looks like an error in your write(DataOutput) method:

@Override
public void write(DataOutput arg0) throws IOException {
  //write the size first
  // arg0.write(aggValues.size()); // here you're writing an int as a byte

  // try this instead:
  arg0.writeInt(aggValues.size()); // actually write int as an int

  //..

Look at the API docs for DataOutput.write(int) vs DataOutput.writeInt(int)

I'd also amend your creation of the SortedMapWritable tmp local variable in readFields to use ReflectionUtils.newInstance():

@Override
public void readFields(DataInput arg0) throws IOException {

  int size = arg0.readInt();

  for (int i=0;i<size;i++){
    SortedMapWritable tmp = ReflectionUtils.newInstance(
        SortedMapWritable.class, getConf());
    tmp.readFields(arg0);
    aggValues.add(tmp);
  }       
}

Note for this to work, you'll also need to amend you class signature to extend Configurable (such that Hadoop will inject a Configuration object when your object is initially created):

public class EquivalenceClsAggValue 
          extends Configured 
          implements WritableComparable<EquivalenceClsAggValue> {

这篇关于在Hadoop中实现自定义Writable?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆