闪烁中的类型信息 [英] TypeInformation in Flink
本文介绍了闪烁中的类型信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
JSON
格式将数据从Flink
发送到Kafka
主题的位置有一个管道。我也能够从Kafka主题中获得它,也能够获得JSON属性。现在,像scala reflect
类(我还可以在运行时比较数据类型)一样,我试图使用TypeInformation
在Fink中做同样的事情,在那里我可以设置一些预定义的格式,从主题读取的任何数据都应该在这个Validation
下,并且应该相应地传递或失败。
我有如下数据:。{"policyName":"String", "premium":2400, "eventTime":"2021-12-22 00:00:00" }
针对我的问题,我在Flink's book
中看到了几个例子,里面提到了如何创建TypeInformation variable
,但是没有提到如何使用,所以我尝试了一下:
val objectMapper = new ObjectMapper()
val tupleType: TypeInformation[(String, String, String)] =
Types.TUPLE[(String, Int, String)]
println(tupleType.getTypeClass)
src.map(v => v)
.map { x =>
val policyName: String = objectMapper.readTree(x).get("policyName").toString()
val premium: Int = objectMapper.readTree(x).get("premium").toString().toInt
val eventTime: String = objectMapper.readTree(x).get("eventTime").toString()
if ((policyName, premium, eventTime)== tupleType.getTypeClass) {
println("Good Record: " + (policyName, premium, eventTime))
}
else {
println("Bad Record: " + (id, category, eventTime))
}
}
现在,如果我将如下输入传递给Flink Kafka生产者:
{"policyName":"whatever you feel like","premium":"4000","eventTime":"2021-12-20 00:00:00"}
它应该会将预期输出作为"Bad record" and the tuple
,因为Premium的数据类型是字符串,而不是Long/Int。
如果a按如下方式传递输入:
{"policyName":"whatever you feel like","premium":4000,"eventTime":"2021-12-20 00:00:00"}
它应该会给出"Good Record" and the tuple
但根据我的代码,它总是给我Else部分。
如果我创建一个datastream
变量并存储上述map
的结果,然后进行如下比较,则会给出正确的结果:
if (tupleType == datas.getType()) { //where 'datas' is a datastream
print("Good Records")
} else {
println("Bad Records")
}
但是我想将good/bad
记录发送到different stream
,或者可以直接插入到Cassandra
表中。因此,这就是我使用循环逐个标识记录的原因。我的方式对吗?考虑到我试图实现的目标,最佳实践是什么?
根据Dominik的输入,我尝试创建owCustomDeserializer
类:
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import java.nio.charset.StandardCharsets
class sample extends DeserializationSchema[String] {
override def deserialize(message: Array[Byte]): Tuple3[Int, String, String] = {
val data = new String(message,
StandardCharsets.UTF_8)
val objectMapper = new ObjectMapper()
val id: Int = objectMapper.readTree(data).get("id").toString().toInt
val category: String = objectMapper.readTree(data).get("Category").toString()
val eventTime: String = objectMapper.readTree(data).get("eventTime").toString()
return (id, category, eventTime)
}
override def isEndOfStream(t: String): Boolean = ???
override def getProducedType: TypeInformation[String] = return TypeInformation.of(classOf[String])
}
我想尝试实现如下内容:
src.map(v => v)
.map { x =>
if (new sample().deserialize(x)==true) {
println("Good Record: " + (id, category, eventTime))
}
else {
println("Bad Record: " + (id, category, eventTime))
}
}
但输入为Array[Bytes]
形式。那么,我如何才能实现它呢?我到底在哪里做错了?需要修改的内容是什么?这是我第一次尝试使用Flink Scala自定义类。
输入已传递:Inputs
推荐答案
我真的不认为使用TypeInformation
做您想做的事情是最好的主意。您可以简单地使用类似于接受JSONString
的ProcessFunction
,然后使用ObjectMapper
将JSON反序列化为具有预期结构的类。您可以从ProcessFunction
输出正确的反序列化对象,而反序列化失败的字符串可以作为辅助输出进行评估,因为它们将是您的Bad Records
。
case class Premium(policyName: String, premium: Long, eventTime: String)
class Splitter extends ProcessFunction[String, Premium] {
val outputTag = new OutputTag[String]("failed")
def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
Try {
lazy val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.readValue[T](json)
} match {
case Success(x) => Right(x)
case Failure(err) => {
Left(json)
}
}
}
override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
fromJson(i) match {
case Right(data) => collector.collect(data)
case Left(json) => context.output(outputTag, json)
}
}
}
然后,您可以使用outputTag从流中获取端输出数据,以获取不正确的记录。
这篇关于闪烁中的类型信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文