Spark序列化错误:将Spark Stream数据插入HBase时 [英] Spark serialization error: When I insert Spark Stream data into HBase
问题描述
如何更改代码?
错误发生的原因是什么?
我的代码如下:
// HBase
配置hconfig = HBaseConfiguration.create();
hconfig.set(hbase.zookeeper.property.clientPort,2222);
hconfig.set(hbase.zookeeper.quorum,127.0.0.1);
hconn = HConnectionManager.createConnection(hconfig);
HTable htable = new HTable(hconf,Bytes.toBytes(tableName));
// KAFKA配置
Set< String> topics = Collections.singleton(topic);
地图< String,String> kafkaParams = new HashMap<>();
kafkaParams.put(metadata.broker.list,localhost:9092);
kafkaParams.put(zookeeper.connect,localhost:2222);
kafkaParams.put(group.id,tag_topic_id);
// Spark Stream
JavaPairInputDStream< String,String> messages = KafkaUtils.createDirectStream(
ssc,String.class,String.class,StringDecoder.class,StringDecoder.class,kafkaParams,topics);
JavaDStream< String> lines = messages.map(new Function< Tuple2< String,String>,String>(){
@Override
public String call(Tuple2< String,String> tuple2){
return tuple2._2();
}
});
JavaDStream< String> records = lines.flatMap(new FlatMapFunction< String,String>(){
@Override
public Iterator< String> call(String x)throws IOException {
//////////////放入HBase:ERROR /////////////////////
String [] data = x.split(,);
if(null!= data&& data.length> 2){
SimpleDateFormat sdf = new SimpleDateFormat(yyyyMMddHHmmss);
String ts = sdf.format(new Date());
Put put = new Put(Bytes.toBytes(ts));
put.addImmutable (Bytes.toBytes(familyName),Bytes.toBytes(LINEID),Bytes.toBytes(data [0]));
put.addImmutable(Bytes.toBytes(familyName),Bytes.toBytes(TAGID ),Bytes.toBytes(data [1]));
put.addImmutable(Bytes.toBytes(familyName),Bytes.toBytes(VAL),Bytes.toBytes(data [2]));
htable.put(put); // *****错误******* *
htable.close();
}
返回Arrays.asList(COLDELIM.split(x))。iterator();
}
});
records.print();
ssc.start();
ssc.awaitTermination();
当我启动我的应用程序时,遇到以下错误:
线程main中的异常org.apache.spark.SparkException:任务不可序列化
at org.apache.spark.util.ClosureCleaner $ .ensureSerializable ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:288)
at org.apache。 spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.streaming.dstream .DStream $$ anonfun $ flatMap $ 1.apply(DStream.scala:554)
at org.apache.spark.streaming.dstream.DStream $$ anonfun $ flatMap $ 1.apply(DStream.scala:554)
at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)
在org.apache.spark.SparkContext.withScope(SparkContext.scala:682)
at org .apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
at org.apache.spark.streaming.dstream.DStream.flatMap(DStream.scala:553)
at org.apache .spark.streaming.api.java.JavaDStreamLike $ class.flatMap(JavaDStreamLike.scala:172)
at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.flatMap(JavaDStreamLike.scala:42)
导致:java.io.NotSerializableException:org.apache.hadoop.hbase.client.HTable
序列化堆栈:
- 对象不可序列化(class:org.apache.hadoop。 hbase.client.HTable,value:MCSENSOR; hconnection-0x6839203b)
你在这里通过序列化调试器提示
引起:java.io.NotSerializableException:org.apache.hadoop。 hbase.client.HTable
序列化堆栈:
- 对象不可序列化(class:org.apache.hadoop.hbase.client.HTable,value:MCSENSOR; hconnection-0x6839203b)
<在你使用它之前调用方法(闭包)之前把下面的部分放在 FlatMapFunction
中,这应该可以解决问题。
配置hconfig = HBaseConfiguration.create();
hconfig.set(hbase.zookeeper.property.clientPort,2222);
hconfig.set(hbase.zookeeper.quorum,127.0.0.1);
hconn = HConnectionManager.createConnection(hconfig);
HTable htable = new HTable(hconf,Bytes.toBytes(tableName));
I'm confused about how spark interact with HBase in terms of data format. For instance, when I omitted the 'ERROR' line in the following code snippet, it runs well... but adding the line, I've caught the error saying 'Task not serializable' related to serialization issue.
How do I change the code? What is the reason why the error happens?
My code is following :
// HBase
Configuration hconfig = HBaseConfiguration.create();
hconfig.set("hbase.zookeeper.property.clientPort", "2222");
hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");
hconn = HConnectionManager.createConnection(hconfig);
HTable htable = new HTable(hconf, Bytes.toBytes(tableName));
// KAFKA configuration
Set<String> topics = Collections.singleton(topic);
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("zookeeper.connect", "localhost:2222");
kafkaParams.put("group.id", "tag_topic_id");
//Spark Stream
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics );
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) throws IOException {
////////////// Put into HBase : ERROR /////////////////////
String[] data = x.split(",");
if (null != data && data.length > 2 ){
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
String ts = sdf.format(new Date());
Put put = new Put(Bytes.toBytes(ts));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0]));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1]));
put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2]));
htable.put(put); // ***** ERROR ********
htable.close();
}
return Arrays.asList(COLDELIM.split(x)).iterator();
}
});
records.print();
ssc.start();
ssc.awaitTermination();
When I start my application, I met the following error:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)
at org.apache.spark.streaming.dstream.DStream$$anonfun$flatMap$1.apply(DStream.scala:554)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:682)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:264)
at org.apache.spark.streaming.dstream.DStream.flatMap(DStream.scala:553)
at org.apache.spark.streaming.api.java.JavaDStreamLike$class.flatMap(JavaDStreamLike.scala:172)
at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.flatMap(JavaDStreamLike.scala:42)
Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.HTable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.client.HTable, value: MCSENSOR;hconnection-0x6839203b)
You have a hint here by serialization debugger
Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.HTable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.client.HTable, value: MCSENSOR;hconnection-0x6839203b)
put the below part inside FlatMapFunction
before call method (closure) where you are using it, that should solve the issue
Configuration hconfig = HBaseConfiguration.create();
hconfig.set("hbase.zookeeper.property.clientPort", "2222");
hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");
hconn = HConnectionManager.createConnection(hconfig);
HTable htable = new HTable(hconf, Bytes.toBytes(tableName));
这篇关于Spark序列化错误:将Spark Stream数据插入HBase时的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!