动态解析flink map中的json [英] dynamically parse json in flink map
问题描述
我正在使用flink动态分析json类型数据,对给定列进行keyby和sum,在我的mapFunction中,我将json转换为case类,但结果流在keyBy函数中没有得到编译器,出现错误 线程main"中的异常org.apache.flink.api.common.InvalidProgramException: 这种类型 (GenericType
.my code like this
//conf.propertiescolumns=a:String,b:Int,c:String,d:Long去聚列= a,c统计列 = b
//主函数stream.map(新 MapFunc).keyBy(declusteringColumns(0), declusteringColumns.drop(0).toSeq: _*).sum(statsColumns)
class MapFunc extends RichMapFunction[String,Any]{var clazz:Class[_]=_覆盖 def 打开(参数:配置):单位 = {导入 scala.reflect.runtime.universe导入 scala.tools.reflect.ToolBoxval tb = Universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox()clazz = tb.compile(tb.parse("""|case class Test(a:String,b:Int,c:String,d:Long){}|scala.reflect.classTag[Test].runtimeClass""";.stripMargin)).apply.asInstanceOf[Class[_]]}覆盖定义映射(值:字符串){val tmp = JSON.parseObject(value)val values = Utils.loadProperties("columns").split(",").map(y => {val name = y.substring(0, y.indexOf(":"))val tpe = y.substring(y.indexOf(":") + 1)tpe.toLowerCase 匹配 {案例字符串"=>tmp.getString(名称)案例int"=>tmp.getInteger(名称)案例长"=>tmp.getLong(name)案例_ =>null}}).toSeqclazz.getConstructors()(0).newInstance(values: _*)}}
如何将 json 转换为 case 类或元组?
其实好像是例外
org.apache.flink.api.common.InvalidProgramException:此类型 (GenericType) 不能用作键
即使对于普通的 case 类(不是通过反射生成的)也保持不变
case class Test(a: String, b: Int, c: String, d: Long)
第一个问题是这个case类不是POJO
https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos
<块引用>POJO
Java 和 Scala 类被 Flink 视为特殊的 POJO 数据如果它们满足以下要求,请键入:
课程必须是公开的.
它必须有一个没有参数的公共构造函数(默认构造函数).
所有字段要么是公共的,要么必须可以通过 getter 和 setter 函数访问.对于名为 foo 的字段,getter 和 setter方法必须命名为 getFoo() 和 setFoo().
字段的类型必须由已注册的序列化程序支持.
所以你应该更换
case class Test(a: String, b: Int, c: String, d: Long)
与
import scala.beans.BeanProperty案例类测试(@BeanProperty var a: 字符串,@BeanProperty var b: Int,@BeanProperty var c: 字符串,@BeanProperty var d:长) {def this() = {这(空,0,空,0)}}
第二个问题可能是 Flink 不允许内部类 POJO 不是静态内部类,而是反射工具箱生成嵌套在方法中的本地类
<块引用>POJO 类型的规则
Flink 将数据类型识别为 POJO 类型(并允许by-name"字段引用)如果满足以下条件:
- 该类是公共和独立的(没有非静态内部类)
- 该类有一个公共的无参数构造函数
- 类(和所有超类)中的所有非静态、非瞬态字段要么是公共的(和非最终的)要么有一个公共的遵循 Java bean 命名的 getter- 和 setter- 方法getter 和 setter 的约定.
这里是工具箱生成代码的反编译版本
公共final class __wrapper$1$a077cb72a4ee423291aac7dfb47454b9$ {公共对象包装器(){新的 LazyRef();类 Test$1 实现产品,可序列化 {私人字符串a;私人国际b;私人字符串 c;私人长d;...}return scala.reflect.package..MODULE$.classTag(scala.reflect.ClassTag..MODULE$.apply(Test$1.class)).runtimeClass();}...}
完整的反编译代码:
https://gist.github.com/DmytroMitin/f1554ad833ea1bb9eb972d220>2pae>
所以有可能如果真的需要为 Flink 生成一个类,应该手动生成而不是通过工具箱
https://www.reddit.com/r/scala/评论/gfcmul/compile_scala_source_from_string_and/
https://www.reddit.com/r/scala/评论/jckld2/is_there_a_way_to_load_scala_code_at_runtime/
如何评估代码使用 InterfaceStability 注释(因涉及类 InterfaceStability 的非法循环引用而失败")?
但是带有手动生成的类的代码
https://gist.github.com/DmytroMitin/e33cd244b37f9b37f9b33b67f7ac3e6609d36p> 仍然抛出 我想原因如下(这是第三个问题). 普通案例类(未生成)的代码似乎可以工作 https://gist.github.com/DmytroMitin/af426d4578dd5e734c4e6e0d 但是如果我们用 https://gist.github.com/DmytroMitin/a23e45a546790606e>a23e45a546790630e838e60c7206adcd 通过反射,除了 现在我在生成的代码中创建 https://gist.github.com/DmytroMitin/16d312dbafeae54518f722cc 我通过 https://gist.github.com/DmytroMitin/f2f85927307653075370c4687a30e0c3a2431 实际上看起来如果我们在生成的代码中创建 I'm using flink to dynamically analyze json type data,to keyby and sum with the given column,in my mapFunction,I convert json to case class,but the result stream don't get compiler in keyBy function,got error
how can I convert json to case class or tuple? Actually, it appeared that the exception remains even for ordinary case class (not generated via reflection) The first issue is that this case class is not a POJO https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html#pojos POJOs Java and Scala classes are treated by Flink as a special POJO data
type if they fulfill the following requirements: The class must be public. It must have a public constructor without arguments (default constructor). All fields are either public or must be accessible through getter and setter functions. For a field called foo the getter and setter
methods must be named getFoo() and setFoo(). The type of a field must be supported by a registered serializer. So you should replace with The second issue can be that Flink doesn't allow inner-class POJOs that are not static inner classes but reflective toolbox generates a local class nested into a method Rules for POJO types Flink recognizes a data type as a POJO type (and allows "by-name"
field referencing) if the following conditions are fulfilled: Here is decompiled version of toolbox-generated code The full decompiled code: https://gist.github.com/DmytroMitin/f1554ad833ea1bb9eb97947ae872d220 So it's possible that if it's really necessary to generate a class for Flink it should be generated manually rather than via toolbox https://www.reddit.com/r/scala/comments/gfcmul/compile_scala_source_from_string_and/ https://www.reddit.com/r/scala/comments/jckld2/is_there_a_way_to_load_scala_code_at_runtime/ How do I programmatically compile and instantiate a Java class? Dynamic compilation of multiple Scala classes at runtime Tensorflow in Scala reflection But the code with a class generated manually https://gist.github.com/DmytroMitin/e33cd244b37f9b33b67f7ac3e6609d39 still throws I guess the reason for that is the following (and this is the third issue). The code with ordinary case class (not generated) seems to work https://gist.github.com/DmytroMitin/af426d4578dd5e76c9e0d344e6f079ce But if we replace type https://gist.github.com/DmytroMitin/a23e45a546790630e838e60c7206adcd And with reflection we can't return anything but Now I'm creating https://gist.github.com/DmytroMitin/16d312dbafeae54518f7ac2c490426b0 I resolved the issue with https://gist.github.com/DmytroMitin/f2f859273075370c4687a30e0c3a2431 Actually it appeared that if we create
这篇关于动态解析flink map中的json的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!这个类型(GenericType
.Any
替换类型 Test
那么它会抛出 This type (GenericType
代码>Any.
TypeInformation[Test]
,这似乎解决了此类型 (GenericType<java.lang.Object>) 不能用作键代码>但现在我有
org.apache.flink.api.common.InvalidProgramException:UTF-8 不可序列化.该对象可能包含或引用不可序列化的字段.
InvalidProgramException: UTF-8 is not serializable
用 @transient
MapFunc
的字段解决了这个问题TypeInformation
,那么toolbox就足够了import org.apache.flink.api.common.functions.RichMapFunction导入 org.apache.flink.api.common.typeinfo.TypeInformation导入 org.apache.flink.configuration.Configuration导入 org.apache.flink.streaming.api.scala.StreamExecutionEnvironment导入 scala.reflect.runtime导入 scala.reflect.runtime.universe._导入 scala.tools.reflect.ToolBox对象应用{val toolbox = ToolBox(runtime.currentMirror).mkToolBox()类 MapFunc 扩展 RichMapFunction[String, Any] {var typeInfo: 类型信息[_] = _@transient var classSymbol: ClassSymbol = _覆盖 def 打开(参数:配置):单位 = {验证码 ="""|case class Test(|@scala.beans.BeanProperty var a:字符串,|@scala.beans.BeanProperty var b: Int,|@scala.beans.BeanProperty var c: 字符串,|@scala.beans.BeanProperty var d: Long) {|def this() = {|这(空,0,空,0)|}|}""".stripMarginval 树 = toolbox.parse(code)classSymbol = toolbox.define(tree.asInstanceOf[ImplDef]).asClasstypeInfo = toolbox.eval(qorg.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[${classSymbol.toType}])").asInstanceOf[类型信息[_]]}覆盖 def map(value: String): Any = {val values = Seq("aaa", 1, "ccc", 2L)//暂时硬编码创建类实例(类符号,值:_*)}}def main(args: Array[String]): Unit = {val func = 新 MapFuncfunc.open(新配置)val classInstance = func.map("""{a: "aaa", b: 1, c: "ccc", d: 2}""")println(classInstance)//测试(aaa,1,ccc,2)val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream = env.socketTextStream(本地主机", 9999)val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]println(typeInfo)//PojoType<__wrapper$1$75434c8e32f541f7a87513a2ad2aa0ce.Test, fields = [a: String, b: Integer, c: String, d: Long]>val res = stream.map(func)(typeInfo).keyBy("a", "c").sum("b")println(res)//org.apache.flink.streaming.api.scala.DataStream@5927f904}def createClassInstance(classSymbol: ClassSymbol, args: Any*): Any = {val runtimeMirror = toolbox.mirrorval classType = classSymbol.typeSignatureval constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethodval classMirror = runtimeMirror.reflectClass(classSymbol)val constructorMirror = classMirror.reflectConstructor(constructorSymbol)构造函数镜像(参数:_*)}}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: This type (GenericType<java.lang.Object>) cannot be used as key.
.my code like this//conf.properties
columns=a:String,b:Int,c:String,d:Long
declusteringColumns=a,c
statsColumns=b
//main function
stream.map(new MapFunc)
.keyBy(declusteringColumns(0), declusteringColumns.drop(0).toSeq: _*)
.sum(statsColumns)
class MapFunc extends RichMapFunction[String,Any]{
var clazz:Class[_]=_
override def open(parameters: Configuration): Unit = {
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox
val tb = universe.runtimeMirror(universe.getClass.getClassLoader).mkToolBox()
clazz = tb.compile(tb.parse(
"""|case class Test(a:String,b:Int,c:String,d:Long){}
|scala.reflect.classTag[Test].runtimeClass"""
.stripMargin)).apply.asInstanceOf[Class[_]]
}
override def map(value: String) {
val tmp = JSON.parseObject(value)
val values = Utils.loadProperties("columns").split(",").map(y => {
val name = y.substring(0, y.indexOf(":"))
val tpe = y.substring(y.indexOf(":") + 1)
tpe.toLowerCase match {
case "string" => tmp.getString(name)
case "int" => tmp.getInteger(name)
case "long" => tmp.getLong(name)
case _ => null}}).toSeq
clazz.getConstructors()(0).newInstance(values: _*)
}}
org.apache.flink.api.common.InvalidProgramException:
This type (GenericType<Test>) cannot be used as key
case class Test(a: String, b: Int, c: String, d: Long)
case class Test(a: String, b: Int, c: String, d: Long)
import scala.beans.BeanProperty
case class Test(
@BeanProperty var a: String,
@BeanProperty var b: Int,
@BeanProperty var c: String,
@BeanProperty var d: Long) {
def this() = {
this(null, 0, null, 0)
}
}
public final class __wrapper$1$a077cb72a4ee423291aac7dfb47454b9$ {
public Object wrapper() {
new LazyRef();
class Test$1 implements Product, Serializable {
private String a;
private int b;
private String c;
private long d;
...
}
return scala.reflect.package..MODULE$.classTag(scala.reflect.ClassTag..MODULE$.apply(Test$1.class)).runtimeClass();
}
...
}
This type (GenericType<java.lang.Object>) cannot be used as key
.Test
with Any
then it throws This type (GenericType<java.lang.Object>) cannot be used as key
Any.
TypeInformation[Test]
inside my code generated, this seems to fix This type (GenericType<java.lang.Object>) cannot be used as key
but now I haveorg.apache.flink.api.common.InvalidProgramException: UTF-8 is not serializable.
The object probably contains or references non serializable fields.
InvalidProgramException: UTF-8 is not serializable
annotating fields of MapFunc
with @transient
TypeInformation
inside code generated, then toolbox is enoughimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import scala.reflect.runtime
import scala.reflect.runtime.universe._
import scala.tools.reflect.ToolBox
object App {
val toolbox = ToolBox(runtime.currentMirror).mkToolBox()
class MapFunc extends RichMapFunction[String, Any] {
var typeInfo: TypeInformation[_] = _
@transient var classSymbol: ClassSymbol = _
override def open(parameters: Configuration): Unit = {
val code =
"""|case class Test(
| @scala.beans.BeanProperty var a: String,
| @scala.beans.BeanProperty var b: Int,
| @scala.beans.BeanProperty var c: String,
| @scala.beans.BeanProperty var d: Long) {
| def this() = {
| this(null, 0, null, 0)
| }
|}""".stripMargin
val tree = toolbox.parse(code)
classSymbol = toolbox.define(tree.asInstanceOf[ImplDef]).asClass
typeInfo = toolbox.eval(
q"org.apache.flink.api.common.typeinfo.TypeInformation.of(classOf[${classSymbol.toType}])"
).asInstanceOf[TypeInformation[_]]
}
override def map(value: String): Any = {
val values = Seq("aaa", 1, "ccc", 2L) //hardcoded for now
createClassInstance(classSymbol, values: _*)
}
}
def main(args: Array[String]): Unit = {
val func = new MapFunc
func.open(new Configuration)
val classInstance = func.map("""{a: "aaa", b: 1, c: "ccc", d: 2}""")
println(classInstance) //Test(aaa,1,ccc,2)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.socketTextStream("localhost", 9999)
val typeInfo = func.typeInfo.asInstanceOf[TypeInformation[Any]]
println(typeInfo)//PojoType<__wrapper$1$75434c8e32f541f7a87513a2ad2aa0ce.Test, fields = [a: String, b: Integer, c: String, d: Long]>
val res = stream.map(func)(typeInfo).keyBy("a", "c").sum("b")
println(res)//org.apache.flink.streaming.api.scala.DataStream@5927f904
}
def createClassInstance(classSymbol: ClassSymbol, args: Any*): Any = {
val runtimeMirror = toolbox.mirror
val classType = classSymbol.typeSignature
val constructorSymbol = classType.decl(termNames.CONSTRUCTOR).alternatives.head.asMethod
val classMirror = runtimeMirror.reflectClass(classSymbol)
val constructorMirror = classMirror.reflectConstructor(constructorSymbol)
constructorMirror(args: _*)
}
}