在flink映射中动态解析json [英] dynamically parse json in flink map

查看:684
本文介绍了在flink映射中动态解析json的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用flink动态分析json类型的数据,通过keyby并与给定的列求和,在我的mapFunction中,我将json转换为case类,但是结果流没有在keyBy函数中得到编译器,得到了错误线程"main"中的异常;org.apache.flink.api.common.InvalidProgramException:这种类型(GenericType< java.lang.Object>)不能用作键. .my这样的代码

 //conf.properties列= a:String,b:Int,c:String,d:LongdeclusteringColumns = a,cstatsColumns = b 

 //主要功能stream.map(新的MapFunc).keyBy(declusteringColumns(0),declusteringColumns.drop(0).toSeq:_ *).sum(statsColumns) 

 类MapFunc扩展了RichMapFunction [String,Any] {var clazz:Class [_] = _覆盖def打开(参数:配置):单位= {导入scala.reflect.runtime.universe导入scala.tools.reflect.ToolBoxval tb = Universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox()clazz = tb.compile(tb.parse("" |案例类Test(a:String,b:Int,c:String,d:Long){}| scala.reflect.classTag [Test] .runtimeClass.stripMargin)).apply.asInstanceOf [Class [_]]}覆盖def map(值:字符串){val tmp = JSON.parseObject(值)val值= Utils.loadProperties("columns").split(,").map(y =>val名称= y.substring(0,y.indexOf(":))val tpe = y.substring(y.indexOf(:")+ 1)tpe.toLowerCase match {大小写字符串"=>tmp.getString(名称)情况"int"=>tmp.getInteger(名称)情况为长"=>tmp.getLong(名称)情况_ =>null}}).toSeqclazz.getConstructors()(0).newInstance(值:_ *)}} 

如何将json转换为case类或元组?

解决方案

实际上,似乎是例外情况

  org.apache.flink.api.common.InvalidProgramException:此类型(GenericType< Test>)不能用作键 

即使对于普通情况下的类,也仍然存在(不是通过反射生成的)

 案例类Test(a:字符串,b:整数,c:字符串,d:长整数) 

第一个问题是该案例类不是POJO

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos

POJO

Flink将Java和Scala类视为特殊的POJO数据输入是否满足以下要求:

  • 该类必须是公共的.

  • 它必须具有不带参数的公共构造函数(默认构造函数).

  • 所有字段都是公共字段,或者必须可以通过getter和setter函数访问.对于名为foo的字段,使用getter和setter方法必须命名为getFoo()和setFoo().

  • 注册的序列化程序必须支持字段的类型.

所以您应该替换

 案例类Test(a:字符串,b:整数,c:字符串,d:长整数) 

使用

 导入scala.beans.BeanProperty案例类测试(@BeanProperty var a:字符串,@BeanProperty var b:整数,@BeanProperty var c:字符串,@BeanProperty var d:长){def this()= {this(null,0,null,0)}} 

第二个问题可能是Flink不允许不是静态内部类而是反射工具箱生成嵌套在方法中的局部类的内部类POJO

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types

POJO类型规则

Flink将数据类型识别为POJO类型(并允许按名称"字段引用)是否满足以下条件:

  • 该类是公共的和独立的(没有非静态内部类)
  • 该类具有公共的无参数构造函数
  • 该类(和所有超类)中的所有非静态,非瞬态字段都是公共的(并且是非最终的)或具有公共的遵循Java Bean命名的getter-和setter-方法getter和setter的约定.

这是工具箱生成的代码的反编译版本

 公共最终类__wrapper $ 1 $ a077cb72a4ee423291aac7dfb47454b9 $ {公共对象wrapper(){新的LazyRef();Test $ 1类实现产品,可序列化{私有字符串a;私人int b;私有字符串c;私人长d;...}返回scala.reflect.package..MODULE $ .classTag(scala.reflect.ClassTag..MODULE $ .apply(Test $ 1.class)).runtimeClass();}...} 

完整的反编译代码:

https://gist.github.com/DmytroMitin/f1554ad833ea1bb9eb97947ae872d220

因此,如果确实有必要为Flink生成类,则应该手动生成而不是通过工具箱生成

https://www.reddit.com/r/scala/评论/gfcmul/compile_scala_source_from_string_and/

https://www.reddit.com/r/scala/评论/jckld2/is_there_a_way_to_load_scala_code_at_runtime/

如何评估该代码使用InterfaceStability批注(以涉及类InterfaceStability的非法循环引用失败")?

如何以编程方式编译和实例化Java上课?

在运行时动态编译多个Scala类

Scala反射中的Tensorflow

但是带有手动生成的类的代码

https://gist.github.com/DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39

仍然抛出这种类型(GenericType< java.lang.Object>)不能用作键.

我认为其原因如下(这是第三期).

具有普通案例类(未生成)的代码似乎可以正常工作

https://gist.github.com/DmytroMitin/af426d4578dd5e76c9e0d344e6f079ce

但是,如果我们将 Test 类型替换为 Any ,那么它将抛出该类型(GenericType< java.lang.Object>)不能用作键

https://gist.github.com/DmytroMitin/a23e45a546790630e838e60c7206adcd

经过反思,我们只能返回 Any.


现在我正在生成的代码中创建 TypeInformation [Test] ,这似乎可以解决这种类型(GenericType< java.lang.Object>)不能用作键,但现在我有

  org.apache.flink.api.common.InvalidProgramException:UTF-8不可序列化.该对象可能包含或引用不可序列化的字段. 

https://gist.github.com/DmytroMitin/16d312dbafeae54518f7ac2c490426b0


我通过 InvalidProgramException解决了该问题:UTF-8不可序列化 @transient

注释 MapFunc 的字段

https://gist.github.com/DmytroMitin/f2f859273075370c4687a30e0c3a2431


实际上,如果我们在生成的代码中创建 TypeInformation ,那么工具箱就足够了

  import org.apache.flink.api.common.functions.RichMapFunction导入org.apache.flink.api.common.typeinfo.TypeInformation导入org.apache.flink.configuration.Configuration导入org.apache.flink.streaming.api.scala.StreamExecutionEnvironment导入scala.reflect.runtime导入scala.reflect.runtime.universe._导入scala.tools.reflect.ToolBox对象App {val工具箱=工具箱(runtime.currentMirror).mkToolBox()MapFunc类扩展RichMapFunction [String,Any] {var typeInfo:TypeInformation [_] = _@transient var classSymbol:ClassSymbol = _覆盖def打开(参数:配置):单位= {验证码="" |案例类Test(|@ scala.beans.BeanProperty var a:字符串,|@ scala.beans.BeanProperty var b:整数,|@ scala.beans.BeanProperty var c:字符串,|@ scala.beans.BeanProperty var d:长){|def this()= {|this(null,0,null,0)|}|}".stripMarginval树= toolbox.parse(代码)classSymbol = toolbox.define(tree.asInstanceOf [ImplDef]).asClasstypeInfo = toolbox.eval(q"org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf [$ {classSymbol.toType}])"").asInstanceOf [TypeInformation [_]]}覆盖def map(值:字符串):任何= {val值= Seq("aaa",1,"ccc",2L)//目前已进行硬编码createClassInstance(classSymbol,值:_ *)}}def main(args:Array [String]):Unit = {val func =新的MapFuncfunc.open(新配置)val classInstance = func.map("a","a",b:1,c:"ccc",d:2}")println(classInstance)//测试(aaa,1,ccc,2)val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream = env.socketTextStream("localhost",9999)val typeInfo = func.typeInfo.asInstanceOf [TypeInformation [Any]]println(typeInfo)//PojoType< __ wrapper $ 1 $ 75434c8e32f541f7a87513a2ad2aa0ce.Test,fields = [a:字符串,b:整数,c:字符串,d:长整数]>val res = stream.map(func)(typeInfo).keyBy("a","c").sum("b")println(res)//org.apache.flink.streaming.api.scala.DataStream@5927f904}def createClassInstance(classSymbol:ClassSymbol,args:Any *):任何= {val runtimeMirror = toolbox.mirrorval classType = classSymbol.typeSignatureval builderSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethodval classMirror = runtimeMirror.reflectClass(classSymbol)val constructorMirror = classMirror.reflectConstructor(constructorSymbol)constructorMirror(参数:_ *)}} 

I'm using flink to dynamically analyze json type data,to keyby and sum with the given column,in my mapFunction,I convert json to case class,but the result stream don't get compiler in keyBy function,got error Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key..my code like this

//conf.properties
columns=a:String,b:Int,c:String,d:Long
declusteringColumns=a,c
statsColumns=b

//main function
stream.map(new MapFunc)
      .keyBy(declusteringColumns(0), declusteringColumns.drop(0).toSeq: _*)
      .sum(statsColumns)

class MapFunc extends RichMapFunction[String,Any]{
var clazz:Class[_]=_
override def open(parameters: Configuration): Unit = {
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox() 
clazz = tb.compile(tb.parse(
"""|case class Test(a:String,b:Int,c:String,d:Long){}
   |scala.reflect.classTag[Test].runtimeClass"""
.stripMargin)).apply.asInstanceOf[Class[_]] 
}

override def map(value: String) {
val tmp = JSON.parseObject(value)
val values = Utils.loadProperties("columns").split(",").map(y => {
val name = y.substring(0, y.indexOf(":"))
val tpe = y.substring(y.indexOf(":") + 1)
tpe.toLowerCase match {
case "string" => tmp.getString(name)
case "int" => tmp.getInteger(name)
case "long" => tmp.getLong(name)
case _ => null}}).toSeq
clazz.getConstructors()(0).newInstance(values: _*) 
}}

how can I convert json to case class or tuple?

解决方案

Actually, it appeared that the exception

org.apache.flink.api.common.InvalidProgramException: 
This type (GenericType<Test>) cannot be used as key 

remains even for ordinary case class (not generated via reflection)

case class Test(a: String, b: Int, c: String, d: Long)

The first issue is that this case class is not a POJO

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos

POJOs

Java and Scala classes are treated by Flink as a special POJO data type if they fulfill the following requirements:

  • The class must be public.

  • It must have a public constructor without arguments (default constructor).

  • All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter methods must be named getFoo() and setFoo().

  • The type of a field must be supported by a registered serializer.

So you should replace

case class Test(a: String, b: Int, c: String, d: Long)

with

import scala.beans.BeanProperty

case class Test(
                 @BeanProperty var a: String,
                 @BeanProperty var b: Int,
                 @BeanProperty var c: String,
                 @BeanProperty var d: Long) {
  def this() = {
    this(null, 0, null, 0)
  }
}

The second issue can be that Flink doesn't allow inner-class POJOs that are not static inner classes but reflective toolbox generates a local class nested into a method

https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#rules-for-pojo-types

Rules for POJO types

Flink recognizes a data type as a POJO type (and allows "by-name" field referencing) if the following conditions are fulfilled:

  • The class is public and standalone (no non-static inner class)
  • The class has a public no-argument constructor
  • All non-static, non-transient fields in the class (and all superclasses) are either public (and non-final) or have a public getter- and a setter- method that follows the Java beans naming conventions for getters and setters.

Here is decompiled version of toolbox-generated code

public final class __wrapper$1$a077cb72a4ee423291aac7dfb47454b9$ {

   public Object wrapper() {
      new LazyRef();

      class Test$1 implements Product, Serializable {
         private String a;
         private int b;
         private String c;
         private long d;

         ...
      }

      return scala.reflect.package..MODULE$.classTag(scala.reflect.ClassTag..MODULE$.apply(Test$1.class)).runtimeClass();
   }

   ...
}

The full decompiled code:

https://gist.github.com/DmytroMitin/f1554ad833ea1bb9eb97947ae872d220

So it's possible that if it's really necessary to generate a class for Flink it should be generated manually rather than via toolbox

https://www.reddit.com/r/scala/comments/gfcmul/compile_scala_source_from_string_and/

https://www.reddit.com/r/scala/comments/jckld2/is_there_a_way_to_load_scala_code_at_runtime/

How to eval code that uses InterfaceStability annotation (that fails with "illegal cyclic reference involving class InterfaceStability")?

How do I programmatically compile and instantiate a Java class?

Dynamic compilation of multiple Scala classes at runtime

Tensorflow in Scala reflection

But the code with a class generated manually

https://gist.github.com/DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39

still throws This type (GenericType<java.lang.Object>) cannot be used as key.

I guess the reason for that is the following (and this is the third issue).

The code with ordinary case class (not generated) seems to work

https://gist.github.com/DmytroMitin/af426d4578dd5e76c9e0d344e6f079ce

But if we replace type Test with Any then it throws This type (GenericType<java.lang.Object>) cannot be used as key

https://gist.github.com/DmytroMitin/a23e45a546790630e838e60c7206adcd

And with reflection we can't return anything but Any.


Now I'm creating TypeInformation[Test] inside my code generated, this seems to fix This type (GenericType<java.lang.Object>) cannot be used as key but now I have

org.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable. 
The object probably contains or references non serializable fields.

https://gist.github.com/DmytroMitin/16d312dbafeae54518f7ac2c490426b0


I resolved the issue with InvalidProgramException: UTF-8 is not serializable annotating fields of MapFunc with @transient

https://gist.github.com/DmytroMitin/f2f859273075370c4687a30e0c3a2431


Actually it appeared that if we create TypeInformation inside code generated, then toolbox is enough

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

import scala.reflect.runtime
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBox

object App {
  val toolbox = ToolBox(runtime.currentMirror).mkToolBox()

  class MapFunc extends RichMapFunction[String, Any] {
    var typeInfo: TypeInformation[_] = _
    @transient var classSymbol: ClassSymbol = _

    override def open(parameters: Configuration): Unit = {
      val code =
        """|case class Test(
           |                 @scala.beans.BeanProperty var a: String,
           |                 @scala.beans.BeanProperty var b: Int,
           |                 @scala.beans.BeanProperty var c: String,
           |                 @scala.beans.BeanProperty var d: Long) {
           |  def this() = {
           |    this(null, 0, null, 0)
           |  }
           |}""".stripMargin

      val tree = toolbox.parse(code)
      classSymbol = toolbox.define(tree.asInstanceOf[ImplDef]).asClass
      typeInfo = toolbox.eval(
        q"org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[${classSymbol.toType}])"
      ).asInstanceOf[TypeInformation[_]]
    }

    override def map(value: String): Any = {
      val values = Seq("aaa", 1, "ccc", 2L) //hardcoded for now
      createClassInstance(classSymbol, values: _*)
    }
  }


  def main(args: Array[String]): Unit = {
    val func = new MapFunc
    func.open(new Configuration)
    val classInstance = func.map("""{a: "aaa", b: 1, c: "ccc", d: 2}""")
    println(classInstance) //Test(aaa,1,ccc,2)

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("localhost", 9999)
    val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
    println(typeInfo)//PojoType<__wrapper$1$75434c8e32f541f7a87513a2ad2aa0ce.Test, fields = [a: String, b: Integer, c: String, d: Long]>
    val res = stream.map(func)(typeInfo).keyBy("a", "c").sum("b")
    println(res)//org.apache.flink.streaming.api.scala.DataStream@5927f904
  }

  def createClassInstance(classSymbol: ClassSymbol, args: Any*): Any = {
    val runtimeMirror = toolbox.mirror
    val classType = classSymbol.typeSignature
    val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
    val classMirror = runtimeMirror.reflectClass(classSymbol)
    val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
    constructorMirror(args: _*)
  }
}

这篇关于在flink映射中动态解析json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆