在hbase mapreduce中传递一个Delete或Put错误 [英] Pass a Delete or a Put error in hbase mapreduce

查看:121
本文介绍了在hbase mapreduce中传递一个Delete或Put错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  java.io.IOException:传递一个删除或一个Put 
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat $ TableRecordWriter.write(TableOutputFormat.java:125)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat $ TableRecordWriter.write(TableOutputFormat。
at org.apache.hadoop.mapred.MapTask $ NewDirectOutputCollector.write(MapTask.java:639)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java: 80)b $ HBaseImporter $ InnerMap.map(HBaseImporter.java:61)
HBaseImporter $ InnerMap.map(HBaseImporter.java:1)
at org.apache.hadoop.mapreduce.Mapper .run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask .java:370)
at org.apache.hadoop.mapred.LocalJobRunner $ Job.run(LocalJobRunner.java:212)
12/11/27 16:16:50信息mapred.JobClient:map 0%reduce 0%
12/11/27 16:16:50信息mapred.JobClient:作业完成:job_local_0001
12/11/27 16:16:50信息mapred .JobClient:Counters:0

代码:



< pre $ public $ {code> public class HBaseImporter extends Configured implements Tool {
public static class InnerMap extends
TableMapper< Text,IntWritable> {
IntWritable one = new IntWritable();
$ b $ public void map(ImmutableBytesWritable row,Result value,Context context)throws IOException,InterruptedException {
String val = new String(value.getValue(Bytes.toBytes(cf),Bytes .toBytes( 线)));
String [] words = val.toString()。split();
尝试{
for(String word:words)
{
context.write(new Text(word),one);
}
} catch(InterruptedException e){
e.printStackTrace();



$ b public static class MyTableReducer扩展了TableReducer< Text,IntWritable,ImmutableBytesWritable> {
$ b $ public void reduce(Text key,Iterable< IntWritable> values,Context context)throws IOException,InterruptedException {
int i = 0; (IntWritable val:values)
{
i + = val.get();
}
Put put = new Put(Bytes.toBytes(key.toString()));
put.add(Bytes.toBytes(cf),Bytes.toBytes(count),Bytes.toBytes(i));

context.write(null,put);



$ b public int run(String args [])throws Exception
{
// Configuration conf = getConf() ;
配置conf = HBaseConfiguration.create();
conf.addResource(new Path(/ home / trg / hadoop-1.0.4 / conf / core-site.xml));
conf.addResource(new Path(/ home / trg / hadoop-1.0.4 / conf / hdfs-site.xml));


工作职位=新职位(conf,SM LogAnalyzer MR);

job.setJarByClass(HBaseImporter.class);
//FileInputFormat.setInputPaths(job,new Path(args [1]));
//FileOutputFormat.setOutputPath(job,new Path(outyy));
//job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//job.setMapperClass(InMapMap.class);
扫描扫描=新扫描();
scan.setCaching(500); // 1是Scan中的默认设置,这对于MapReduce作业会很糟糕
scan.setCacheBlocks(false);
TableMapReduceUtil.initTableMapperJob(
wc_in,//输入表格
scan,//扫描实例以控制CF和属性选择
InnerMap.class,// mapper class
Text.class,//映射器输出键
IntWritable.class,//映射器输出值
job);

TableMapReduceUtil.initTableReducerJob(
word_count,//输出表
MyTableReducer.class,// reducer class
job);
job.setNumReduceTasks(1);

job.setNumReduceTasks(0);

return job.waitForCompletion(true)?0:1;


public static void main(String [] args)throws Exception {
// Configuration conf = new HBaseConfiguration();
// Job job = configureJob(conf,args);
//System.exit(job.waitForCompletion(true)?0:1);

String [] inArgs = new String [4];
inArgs [0] =HBaseImporter;
inArgs [1] =/ user / trg / wc_in;
inArgs [2] =AppLogMRImport;
inArgs [3] =MessageDB;
int res = ToolRunner.run(new Configuration(),new HBaseImporter(),inArgs);
// int res = ToolRunner.run(new Configuration(),new HBaseImporter(),args);





$ p $设置图输出值类为IntWritable.class,但仍然TableOutputFormat.write在映射器中调用,期望Put对象。

解决方案

为我自己的问题得到答案。我错误地将减速器任务设置为'0'。

  job.setNumReduceTasks(0); 

因此Mapper希望Put对象可以直接写入Hbase表中。通过上面的一行来解决这个问题。 / p>

I am getting below Error while running mapreduce on hbase:

java.io.IOException: Pass a Delete or a Put
    at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
    at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
    at org.apache.hadoop.mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:639)
    at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
    at HBaseImporter$InnerMap.map(HBaseImporter.java:61)
    at HBaseImporter$InnerMap.map(HBaseImporter.java:1)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:212)
12/11/27 16:16:50 INFO mapred.JobClient:  map 0% reduce 0%
12/11/27 16:16:50 INFO mapred.JobClient: Job complete: job_local_0001
12/11/27 16:16:50 INFO mapred.JobClient: Counters: 0

Code:

public class HBaseImporter extends Configured implements Tool  {    
    public static class InnerMap extends
TableMapper<Text, IntWritable> {
    IntWritable one = new IntWritable();

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    String val = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("line")));
    String[] words = val.toString().split(" ");
        try {
                for(String word:words)
            {
            context.write(new Text(word), one);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int i = 0;
            for (IntWritable val : values) {
                i += val.get();
            }
            Put put = new Put(Bytes.toBytes(key.toString()));
            put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(i));

            context.write(null, put);
    }
}


public int run(String args[]) throws Exception
{
    //Configuration conf = getConf();
     Configuration conf = HBaseConfiguration.create();
        conf.addResource(new Path("/home/trg/hadoop-1.0.4/conf/core-site.xml"));
        conf.addResource(new Path("/home/trg/hadoop-1.0.4/conf/hdfs-site.xml"));


    Job job = new Job(conf,"SM LogAnalyzer MR");

    job.setJarByClass(HBaseImporter.class);
    //FileInputFormat.setInputPaths(job, new Path(args[1]));
    //FileOutputFormat.setOutputPath(job, new Path("outyy"));
     //job.setOutputFormatClass(TextOutputFormat.class);
      job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

    //job.setMapperClass(InnerMap.class);
    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);
    TableMapReduceUtil.initTableMapperJob(
            "wc_in",        // input table
            scan,               // Scan instance to control CF and attribute selection
            InnerMap.class,     // mapper class
            Text.class,         // mapper output key
            IntWritable.class,  // mapper output value
            job);

    TableMapReduceUtil.initTableReducerJob(
            "word_count",        // output table
            MyTableReducer.class,    // reducer class
            job);
        job.setNumReduceTasks(1);

    job.setNumReduceTasks(0);

    return job.waitForCompletion(true)?0:1;
}

public static void main(String[] args) throws Exception {
    //Configuration conf = new HBaseConfiguration();
    //Job job = configureJob(conf, args);
    //System.exit(job.waitForCompletion(true) ? 0 : 1);

    String[] inArgs = new String[4]; 
    inArgs[0] = "HBaseImporter";
        inArgs[1] = "/user/trg/wc_in"; 
        inArgs[2] = "AppLogMRImport"; 
        inArgs[3] = "MessageDB"; 
        int res = ToolRunner.run(new Configuration(), new HBaseImporter(), inArgs);
        //int res = ToolRunner.run(new Configuration(), new HBaseImporter(), args);

    }
}

Am setting map output value class as IntWritable.class, but still TableOutputFormat.write getting called in mapper which expects Put object.

解决方案

Got Answer for my own question. I was setting mistakenly no of reducer tasks as '0'.

 job.setNumReduceTasks(0);

So Mapper expects Put object to directly write into Hbase table.Commenting the above line solved the issue.

这篇关于在hbase mapreduce中传递一个Delete或Put错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆