Spark Streaming数据将数据放入HBase [英] Issue on Spark Streaming data put data into HBase

查看:281
本文介绍了Spark Streaming数据将数据放入HBase的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是这个领域的初学者,所以我无法理解它......


  • HBase ver:0.98。 24-hadoop2

  • Spark版本:2.1.0



以下代码试图将接收来自Spark Streming-Kafka制片人的数据转化为HBase。




  • Kafka输入数据格式如下:

    Line1, TAG1,123

    Line1,TAG2,134




Spark流处理将接收线通过分隔符','然后把数据放入HBase。
但是,我的应用程序在调用htable.put()方法时遇到错误。
任何人都可以帮助为什么下面的代码抛出错误?



谢谢。

  JavaDStream< String> records = lines.flatMap(new FlatMapFunction< String,String>(){
private static final serialVersionUID = 7113426295831342436L;

HTable htable;
public HTable set()throws IOException {
配置hconfig = HBaseConfiguration.create();
hconfig.set(hbase.zookeeper.property.clientPort,2222);
hconfig.set(hbase.zookeeper。 quorum,127.0.0.1);

HConnection hconn = HConnectionManager.createConnection(hconfig);

htable = new HTable(hconfig,tableName);

return htable;
};
@Override
public Iterator< String> call(String x)throws IOException {

//////// ////////放入HBase /////////////////////
String [] data = x.split(,);如果(null!= data&& data.length> 2){
SimpleDateFormat sdf = new SimpleDateFormat(yyyyMMddHHmmss);

if
String ts = sdf.format(new Date());

Put put = new Put(Bytes.toBytes(ts));

put.addImmutable(Bytes.toBytes(familyName),Bytes.toBytes(LINEID),Bytes.toBytes(data [0]));
put.addImmutable(Bytes.toBytes(familyName),Bytes.toBytes(TAGID),Bytes.toBytes(data [1]));
put.addImmutable(Bytes.toBytes(familyName),Bytes.toBytes(VAL),Bytes.toBytes(data [2]));

/ *我已经检查过这样的数据
{totalColumns:3,row:20170120200927,
families:{TAGVALUE:
[{qualifier:LINEID,vlen:3,tag [],timestamp:9223372036854775807},
{qualifier:TAGID,vlen:3 ,tag:[],timestamp:9223372036854775807},
{qualifier:VAL,vlen:6,tag[],timestamp:9223372036854775807}]}} * /


// *********************错误************* ****** //
htable.put(put);
htable.close();


}

();
}
});

ERRO代码:

 线程main中的异常org.apache.spark.SparkException:Job 

因阶段失败而中止:阶段23.0中的任务0失败1次,最近失败:阶段23.0中丢失的任务0.0(TID 23,localhost,executor driver):java.lang.NullPointerException $ b $ org.test.avro.sparkAvroCons umer $ 2.call(sparkAvroConsumer.java:154)
在org.test.avro.sparkAvroConsumer $ 2.call(sparkAvroConsumer.java:123)
在org.apache.spark.streaming.api.java。 JavaDStreamLike $$ anonfun $ fn $ 1 $ 1.apply(JavaDStreamLike.scala:171)
at org.apache.spark.streaming.api.java.JavaDStreamLike $$ anonfun $ fn $ 1 $ 1.apply(JavaDStreamLike.scala:171 )
at scala.collection.Iterator $$ anon $ 12.nextCur(Iterator.scala:434)
at scala.collection.Iterator $$ anon $ 12.hasNext(Iterator.scala:440)
at scala.collection.Iterator $$ anon $ 10.hasNext(Iterator.scala:389)
at scala.collection.Iterator $ class.foreach(Iterator.scala:893)
at scala.collection。 AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable $ class。$ plus $ plus $ eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer 。$ plus $ plus $ eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer。$ plus $ plus $ eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce $ class.to(TraversableOnce.scala :310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:302)
at scala。
at scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala: 1336)
at org.apache.spark.rdd.RDD $$ anonfun $ take $ 1 $$ anonfun $ 29.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDD $ $ anonfun $ take $ 1 $$ anonfun $ 29.apply(RDD.scala:1353)
at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:282)
at java。 util.concurrent.ThreadPoolExecutor.runWorker(线程池$ java.util.concurrent.ThreadPoolExecutor
$ Worker.run(ThreadPoolExecutor.java:617)$ b $ java.util.run(Thread.java:745)


解决方案

您不会调用此方法 public HTable set()抛出IOException
返回htable实例。



由于htable实例为null,并且您试图对null

  htable.put()

你得到的NPE如下

 阶段23.0失败1次,最近失败:失败任务0.0在阶段23.0(TID 23,localhost,executor driver):java.lang.NullPointerException 


I am a beginner in this field, so I can not get a sense of it...

  • HBase ver: 0.98.24-hadoop2
  • Spark ver: 2.1.0

The following code try to put receiving data from Spark Streming-Kafka producer into HBase.

  • Kafka input data format is like this :

    Line1,TAG1,123
    Line1,TAG2,134

Spark-streaming process split the receiving line by delimiter ',' then put the data into HBase. However, my application met an error when it call the htable.put() method. Can any one help why the below code is throwing error?

Thank you.

JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() {   
    private static final long serialVersionUID = 7113426295831342436L;

    HTable htable; 
    public HTable set() throws IOException{ 
        Configuration hconfig = HBaseConfiguration.create();
        hconfig.set("hbase.zookeeper.property.clientPort", "2222");
        hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");  

        HConnection hconn = HConnectionManager.createConnection(hconfig);  

        htable = new HTable(hconfig, tableName); 

        return htable;  
    };  
    @Override
    public Iterator<String> call(String x) throws IOException {  

        ////////////// Put into HBase   ///////////////////// 
        String[] data = x.split(",");   

        if (null != data && data.length > 2 ){ 
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");   
            String ts = sdf.format(new Date());  

            Put put = new Put(Bytes.toBytes(ts)); 

            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0]));
            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1]));
            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2]));

/*I've checked data passed like this 
{"totalColumns":3,"row":"20170120200927",
"families":{"TAGVALUE":
[{"qualifier":"LINEID","vlen":3,"tag[],  "timestamp":9223372036854775807},
{"qualifier":"TAGID","vlen":3,"tag":[],"timestamp":9223372036854775807},
{"qualifier":"VAL","vlen":6,"tag" [],"timestamp":9223372036854775807}]}}*/ 


//********************* ERROR *******************//   
            htable.put(put);  
            htable.close();  


        }

        return Arrays.asList(COLDELIM.split(x)).iterator(); 
    } 
}); 

ERRO Code :

Exception in thread "main" org.apache.spark.SparkException: Job 

aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:154)
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:123)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

解决方案

you are not calling this method public HTable set() throws IOException which returns htable instance.

Since htable instance is null and you are trying to do operation on null

htable.put() 

you are getting NPE like below

 stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException

这篇关于Spark Streaming数据将数据放入HBase的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆