闪烁中的类型信息 [英] TypeInformation in Flink

查看:19
本文介绍了闪烁中的类型信息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在以JSON格式将数据从Flink发送到Kafka主题的位置有一个管道。我也能够从Kafka主题中获得它,也能够获得JSON属性。现在,像scala reflect类(我还可以在运行时比较数据类型)一样,我试图使用TypeInformation在Fink中做同样的事情,在那里我可以设置一些预定义的格式,从主题读取的任何数据都应该在这个Validation下,并且应该相应地传递或失败。 我有如下数据:。
{"policyName":"String", "premium":2400, "eventTime":"2021-12-22 00:00:00" }

针对我的问题,我在Flink's book中看到了几个例子,里面提到了如何创建TypeInformation variable,但是没有提到如何使用,所以我尝试了一下:

    val objectMapper = new ObjectMapper()

    val tupleType: TypeInformation[(String, String, String)] =
    Types.TUPLE[(String, Int, String)]
    println(tupleType.getTypeClass)
    
    src.map(v => v)
      .map { x =>
        val policyName: String = objectMapper.readTree(x).get("policyName").toString()
        val premium: Int = objectMapper.readTree(x).get("premium").toString().toInt
        val eventTime: String = objectMapper.readTree(x).get("eventTime").toString()
        if ((policyName, premium, eventTime)== tupleType.getTypeClass) {
          println("Good Record: " + (policyName, premium, eventTime))
        }
        else {
          println("Bad Record: " + (id, category, eventTime))
        }
      } 

现在,如果我将如下输入传递给Flink Kafka生产者:

{"policyName":"whatever you feel like","premium":"4000","eventTime":"2021-12-20 00:00:00"}

它应该会将预期输出作为"Bad record" and the tuple,因为Premium的数据类型是字符串,而不是Long/Int。

如果a按如下方式传递输入:

{"policyName":"whatever you feel like","premium":4000,"eventTime":"2021-12-20 00:00:00"}

它应该会给出"Good Record" and the tuple

的输出

但根据我的代码,它总是给我Else部分。

如果我创建一个datastream变量并存储上述map的结果,然后进行如下比较,则会给出正确的结果:

if (tupleType == datas.getType()) { //where 'datas' is a datastream
      print("Good Records")
    } else {
      println("Bad Records")
    }  
但是我想将good/bad记录发送到different stream,或者可以直接插入到Cassandra表中。因此,这就是我使用循环逐个标识记录的原因。我的方式对吗?考虑到我试图实现的目标,最佳实践是什么?

根据Dominik的输入,我尝试创建owCustomDeserializer类:

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.api.common.serialization.DeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation

import java.nio.charset.StandardCharsets

class sample extends DeserializationSchema[String] {
  override def deserialize(message: Array[Byte]): Tuple3[Int, String, String] = {
    val data = new String(message,
      StandardCharsets.UTF_8)
    val objectMapper = new ObjectMapper()
    val id: Int = objectMapper.readTree(data).get("id").toString().toInt
    val category: String = objectMapper.readTree(data).get("Category").toString()
    val eventTime: String = objectMapper.readTree(data).get("eventTime").toString()
    return (id, category, eventTime)

  }

  override def isEndOfStream(t: String): Boolean = ???

  override def getProducedType: TypeInformation[String] = return TypeInformation.of(classOf[String])
}

我想尝试实现如下内容:

src.map(v => v)
      .map { x =>
        if (new sample().deserialize(x)==true) {
          println("Good Record: " + (id, category, eventTime))
        }
        else {
          println("Bad Record: " + (id, category, eventTime))
        }
      }  

但输入为Array[Bytes]形式。那么,我如何才能实现它呢?我到底在哪里做错了?需要修改的内容是什么?这是我第一次尝试使用Flink Scala自定义类。

输入已传递:Inputs

推荐答案

我真的不认为使用TypeInformation做您想做的事情是最好的主意。您可以简单地使用类似于接受JSONStringProcessFunction,然后使用ObjectMapper将JSON反序列化为具有预期结构的类。您可以从ProcessFunction输出正确的反序列化对象,而反序列化失败的字符串可以作为辅助输出进行评估,因为它们将是您的Bad Records

这可能如下所示,请注意,这使用Jackson Scala执行到Case类的反序列化。您可以找到更多信息here

case class Premium(policyName: String, premium: Long, eventTime: String)

class Splitter extends ProcessFunction[String, Premium] {
  val outputTag = new OutputTag[String]("failed")

  def fromJson[T](json: String)(implicit m: Manifest[T]): Either[String, T] = {
    Try {
      lazy val mapper = new ObjectMapper() with ScalaObjectMapper
      mapper.registerModule(DefaultScalaModule)
      mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      mapper.readValue[T](json)
    } match {
      case Success(x) => Right(x)
      case Failure(err) => {
        Left(json)
      }
    }
  }
  override def processElement(i: String, context: ProcessFunction[String, Premium]#Context, collector: Collector[Premium]): Unit = {
    fromJson(i) match {
      case Right(data) => collector.collect(data)
      case Left(json) => context.output(outputTag, json)
    }
  }
}

然后,您可以使用outputTag从流中获取端输出数据,以获取不正确的记录。

这篇关于闪烁中的类型信息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆