从hbase读取时,Flink正在尝试序列化错误 [英] Flink thowing serialization error when reading from hbase

查看:3629
本文介绍了从hbase读取时,Flink正在尝试序列化错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我在地图中使用richfatMapFunction从hbase读取时,出现序列化错误。我想要做的是,如果一个数据流等于从hbase读取的特定字符串,否则忽略。下面是示例程序和我遇到的错误。

  package com.abb.Flinktest 
import java.text。 SimpleDateFormat
import java.util.Properties

import scala.collection.concurrent.TrieMap
import org.apache.flink.addons.hbase.TableInputFormat
import org。 apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala。 createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util。收集器
导入org.apache.hadoop.hbase.HBaseConfiguration
导入org.apache.hadoop.hbase.TableName
导入org.apache.had oop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop。 hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase。 filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction

object Flinktesthbaseread {

def main(args:Array [String])
{
val env = StreamExecutionEnvironment.createLocalEnvironment()
val kafkaStream = env.fromElements(hello)
val c = kafkaStream.map(x => if(x.equals(hello))kafkaStream.flatMap(new ReadHbase()))
env.execute()
}
class ReadHbase extends RichFlatMapFunction [String,Tuple11 [String,可串行化的
{
var conf:org.apache.hadoop.conf.Configuration = null;
var table:org.apache.hadoop.hbase.client.HTable = null;
var hbaseconnection:org.apache.hadoop.hbase.client.Connection = null
var taskNumber:String = null;
var rowNumber = 0;
val serialVersionUID = 1L;

覆盖def open(参数:org.apache.flink.configuration.Configuration){
println(getting table)
conf = HBaseConfiguration.create()$ b $ (b)in = getClass()。getResourceAsStream(/ hbase-site.xml)

conf.addResource(in)
hbaseconnection = ConnectionFactory.createConnection(conf)
table =新的HTable(conf,testtable);
// this.taskNumber = String.valueOf(taskNumber);
}

覆盖def flatMap(msg:String,out:Collector [Tuple11 [String,String,String,String,String,String,String,String,String,String]] )
{
// flatmap operation here
}

覆写def close(){

table.flushCommits();
table.close();
}

}
}

错误:

  log4j:WARN记录器没有找到appender(org.apache.flink.api.scala.ClosureCleaner $)。 
log4j:WARN请正确初始化log4j系统。
log4j:WARN请参阅http://logging.apache.org/log4j/1.2/faq.html#noconfig了解更多信息。
线程main中的异常org.apache.flink.api.common.InvalidProgramException:任务不可序列化
在org.apache.flink.api.scala.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:172 )
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment。 scala:617)
在org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
在org.apache.flink.streaming.api.scala.DataStream。 map(DataStream.scala:484)
at com.abb.Flinktest.Flinktesthbaseread $ .main(Flinktesthbaseread.scala:45)
at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
导致:java.io.NotSerializableException:org.apache.flink.streaming.api.scala.DataStream
- field(classcom.abb.Flinktest.Flinktesthbaseread $$ anonfun $ 1,name: kafkaStream $ 1,输入:class org.apache.flink.streaming.api.sca );
- 根对象(类com.abb.Flinktest.Flinktesthbaseread $$ anonfun $ 1,< function1>)
在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:在java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)上的
在java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)上的
在java.io.ObjectOutputStream上的
.writeOrdinaryObject(ObjectOutputStream.java:1432)
位于java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)$ b $位于java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.api.scala.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:170)
... 6 more

我试图通过使类可序列化来封装方法和类中的字段尽管如此,但没有运气。有人可能会抛出一些灯光或建议一些解决方法。

解决方案

问题是,您尝试访问kafka流变量在map函数中是不可序列化的。它只是数据的抽象表示。它不包含任何东西,首先使你的函数无效。



改为像这样做:



($ hello))。flatMap(new ReadHBase())

pre>

过滤器函数仅保留条件为true的元素,并且这些元素将传递到您的flatMap函数。



我强烈建议您阅读基础API概念文档,因为您在指定转换时似乎存在一些误解。


When I read from hbase using richfatMapFunction inside a map I am getting serialization error. What I am trying to do is if a datastream equals to a particular string read from hbase else ignore. Below is the sample program and error I am getting.

package com.abb.Flinktest
import java.text.SimpleDateFormat
import java.util.Properties

import scala.collection.concurrent.TrieMap 
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction

object Flinktesthbaseread {

  def main(args:Array[String])
  {
   val env = StreamExecutionEnvironment.createLocalEnvironment()
   val kafkaStream = env.fromElements("hello")
   val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()) )       
   env.execute()
  }
      class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable
    {
        var conf: org.apache.hadoop.conf.Configuration = null;
    var table: org.apache.hadoop.hbase.client.HTable = null;
    var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null
    var taskNumber: String = null;
    var rowNumber = 0;
    val serialVersionUID = 1L;

    override def open(parameters: org.apache.flink.configuration.Configuration) {
      println("getting table")
       conf = HBaseConfiguration.create()
      val in = getClass().getResourceAsStream("/hbase-site.xml")

      conf.addResource(in)
      hbaseconnection = ConnectionFactory.createConnection(conf)
      table = new HTable(conf, "testtable");
     // this.taskNumber = String.valueOf(taskNumber);
    }

     override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]]) 
      {
                //flatmap operation here
      }

      override def close() {

      table.flushCommits();
      table.close();
    }

    }
}

Error:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484)
    at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45)
    at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
    - field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", name: "kafkaStream$1", type: "class org.apache.flink.streaming.api.scala.DataStream")
    - root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", <function1>)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
    ... 6 more

I tried wrapping the field inside a method and a class by making the class serializable as wel, but no luck. Could someone throw some lights on this or suggest some workaround for this.

解决方案

The problem is that you're trying to access the kafka stream variable in the map function which is simply not serializable. It is just an abstract representation of the data. It doesn't contain anything, which invalidates your function in the first place.

instead, do something like this:

   kafkaStream.filter(x => x.equals("hello")).flatMap(new ReadHBase())

The filter funtion will only retain the elements for which the condition is true, and those will be passed to your flatMap function.

I would highly recommend you to read the basis API concepts documentation, as there appears to be some misunderstanding as to what actually happens when you specify a transformation.

这篇关于从hbase读取时,Flink正在尝试序列化错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆