从hbase读取时,Flink正在尝试序列化错误 [英] Flink thowing serialization error when reading from hbase
问题描述
当我在地图中使用richfatMapFunction从hbase读取时,出现序列化错误。我想要做的是,如果一个数据流等于从hbase读取的特定字符串,否则忽略。下面是示例程序和我遇到的错误。
package com.abb.Flinktest
import java.text。 SimpleDateFormat
import java.util.Properties
import scala.collection.concurrent.TrieMap
import org.apache.flink.addons.hbase.TableInputFormat
import org。 apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala。 createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util。收集器
导入org.apache.hadoop.hbase.HBaseConfiguration
导入org.apache.hadoop.hbase.TableName
导入org.apache.had oop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop。 hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase。 filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction
object Flinktesthbaseread {
def main(args:Array [String])
{
val env = StreamExecutionEnvironment.createLocalEnvironment()
val kafkaStream = env.fromElements(hello)
val c = kafkaStream.map(x => if(x.equals(hello))kafkaStream.flatMap(new ReadHbase()))
env.execute()
}
class ReadHbase extends RichFlatMapFunction [String,Tuple11 [String,可串行化的
{
var conf:org.apache.hadoop.conf.Configuration = null;
var table:org.apache.hadoop.hbase.client.HTable = null;
var hbaseconnection:org.apache.hadoop.hbase.client.Connection = null
var taskNumber:String = null;
var rowNumber = 0;
val serialVersionUID = 1L;
覆盖def open(参数:org.apache.flink.configuration.Configuration){
println(getting table)
conf = HBaseConfiguration.create()$ b $ (b)in = getClass()。getResourceAsStream(/ hbase-site.xml)
conf.addResource(in)
hbaseconnection = ConnectionFactory.createConnection(conf)
table =新的HTable(conf,testtable);
// this.taskNumber = String.valueOf(taskNumber);
}
覆盖def flatMap(msg:String,out:Collector [Tuple11 [String,String,String,String,String,String,String,String,String,String]] )
{
// flatmap operation here
}
覆写def close(){
table.flushCommits();
table.close();
}
}
}
错误:
log4j:WARN记录器没有找到appender(org.apache.flink.api.scala.ClosureCleaner $)。
log4j:WARN请正确初始化log4j系统。
log4j:WARN请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig了解更多信息。
线程main中的异常org.apache.flink.api.common.InvalidProgramException:任务不可序列化
在org.apache.flink.api.scala.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:172 )
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment。 scala:617)
在org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
在org.apache.flink.streaming.api.scala.DataStream。 map(DataStream.scala:484)
at com.abb.Flinktest.Flinktesthbaseread $ .main(Flinktesthbaseread.scala:45)
at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
导致:java.io.NotSerializableException:org.apache.flink.streaming.api.scala.DataStream
- field(classcom.abb.Flinktest.Flinktesthbaseread $$ anonfun $ 1,name: kafkaStream $ 1,输入:class org.apache.flink.streaming.api.sca );
- 根对象(类com.abb.Flinktest.Flinktesthbaseread $$ anonfun $ 1,< function1>)
在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:在java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)上的
在java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)上的
在java.io.ObjectOutputStream上的
.writeOrdinaryObject(ObjectOutputStream.java:1432)
位于java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)$ b $位于java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.api.scala.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:170)
... 6 more
我试图通过使类可序列化来封装方法和类中的字段尽管如此,但没有运气。有人可能会抛出一些灯光或建议一些解决方法。
问题是,您尝试访问kafka流变量在map函数中是不可序列化的。它只是数据的抽象表示。它不包含任何东西,首先使你的函数无效。
改为像这样做:
($ hello))。flatMap(new ReadHBase())
pre>
过滤器函数仅保留条件为true的元素,并且这些元素将传递到您的flatMap函数。
我强烈建议您阅读基础API概念文档,因为您在指定转换时似乎存在一些误解。
When I read from hbase using richfatMapFunction inside a map I am getting serialization error. What I am trying to do is if a datastream equals to a particular string read from hbase else ignore. Below is the sample program and error I am getting.
package com.abb.Flinktest
import java.text.SimpleDateFormat
import java.util.Properties
import scala.collection.concurrent.TrieMap
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction
object Flinktesthbaseread {
def main(args:Array[String])
{
val env = StreamExecutionEnvironment.createLocalEnvironment()
val kafkaStream = env.fromElements("hello")
val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()) )
env.execute()
}
class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable
{
var conf: org.apache.hadoop.conf.Configuration = null;
var table: org.apache.hadoop.hbase.client.HTable = null;
var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null
var taskNumber: String = null;
var rowNumber = 0;
val serialVersionUID = 1L;
override def open(parameters: org.apache.flink.configuration.Configuration) {
println("getting table")
conf = HBaseConfiguration.create()
val in = getClass().getResourceAsStream("/hbase-site.xml")
conf.addResource(in)
hbaseconnection = ConnectionFactory.createConnection(conf)
table = new HTable(conf, "testtable");
// this.taskNumber = String.valueOf(taskNumber);
}
override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]])
{
//flatmap operation here
}
override def close() {
table.flushCommits();
table.close();
}
}
}
Error:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484)
at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45)
at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
- field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", name: "kafkaStream$1", type: "class org.apache.flink.streaming.api.scala.DataStream")
- root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", <function1>)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 6 more
I tried wrapping the field inside a method and a class by making the class serializable as wel, but no luck. Could someone throw some lights on this or suggest some workaround for this.
The problem is that you're trying to access the kafka stream variable in the map function which is simply not serializable. It is just an abstract representation of the data. It doesn't contain anything, which invalidates your function in the first place.
instead, do something like this:
kafkaStream.filter(x => x.equals("hello")).flatMap(new ReadHBase())
The filter funtion will only retain the elements for which the condition is true, and those will be passed to your flatMap function.
I would highly recommend you to read the basis API concepts documentation, as there appears to be some misunderstanding as to what actually happens when you specify a transformation.
这篇关于从hbase读取时,Flink正在尝试序列化错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!