带有死信队列的Flink Scala贴图 [英] flink scala map with dead letter queue
本文介绍了带有死信队列的Flink Scala贴图的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试创建一些Scala函数,以帮助Flinkmap
和filter
操作将其错误重定向到死信队列。
然而,我正在努力解决Scala的类型擦除问题,这使我无法使它们成为泛型。下面mapWithDeadLetterQueue
的实现未编译。
sealed trait ProcessingResult[T]
case class ProcessingSuccess[T,U](result: U) extends ProcessingResult[T]
case class ProcessingError[T: TypeInformation](errorMessage: String, exceptionClass: String, stackTrace: String, sourceMessage: T) extends ProcessingResult[T]
object FlinkUtils {
// https://stackoverflow.com/questions/1803036/how-to-write-asinstanceofoption-in-scala
implicit class Castable(val obj: AnyRef) extends AnyVal {
def asInstanceOfOpt[T <: AnyRef : ClassTag] = {
obj match {
case t: T => Some(t)
case _ => None
}
}
}
def mapWithDeadLetterQueue[T: TypeInformation,U: TypeInformation](source: DataStream[T], func: (T => U)): (DataStream[U], DataStream[ProcessingError[T]]) = {
val mapped = source.map(x => {
val result = Try(func(x))
result match {
case Success(value) => ProcessingSuccess(value)
case Failure(exception) => ProcessingError(exception.getMessage, exception.getClass.getName, exception.getStackTrace.mkString("
"), x)
}
} )
val mappedSuccess = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingSuccess[T,U]]).map(x => x.result)
val mappedFailure = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingError[T]])
(mappedSuccess, mappedFailure)
}
}
我得到:
[error] FlinkUtils.scala:35:36: overloaded method value flatMap with alternatives:
[error] [R](x$1: org.apache.flink.api.common.functions.FlatMapFunction[Product with Serializable with ProcessingResult[_ <: T],R], x$2: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[R] <and>
[error] [R](x$1: org.apache.flink.api.common.functions.FlatMapFunction[Product with Serializable with ProcessingResult[_ <: T],R])org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[R]
[error] cannot be applied to (ProcessingResult[T] => Option[ProcessingSuccess[T,U]])
[error] val mappedSuccess = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingSuccess[T,U]]).map(x => x.result)
有没有办法做到这一点?
推荐答案
好的,我将回答我自己的问题。我犯了几个错误:
- 首先,我严格地包含了Java数据流类,而不是Scala数据流类(这种情况经常发生)。Java变体显然不接受映射/过滤器/平面映射的Scala lambda
- 第二,Flinks序列化不支持密封特征。有a project应该可以解决,但我还没有试过。
解决方案:首先,我没有使用密封的特征,而是一个带有两个选项的简单Case类(表达能力稍弱一些,但仍然有效):
case class ProcessingError[T](errorMessage: String, exceptionClass: String, stackTrace: String, sourceMessage: T)
case class ProcessingResult[T: TypeInformation, U: TypeInformation](result: Option[U], error: Option[ProcessingError[T]])
然后,我可以让一切都像这样工作:
object FlinkUtils {
def mapWithDeadLetterQueue[T: TypeInformation: ClassTag,U: TypeInformation: ClassTag]
(source: DataStream[T], func: (T => U)):
(DataStream[U], DataStream[ProcessingError[T]]) = {
implicit val typeInfo = TypeInformation.of(classOf[ProcessingResult[T,U]])
val mapped = source.map((x: T) => {
val result = Try(func(x))
result match {
case Success(value) => ProcessingResult[T, U](Some(value), None)
case Failure(exception) => ProcessingResult[T, U](None, Some(
ProcessingError(exception.getMessage, exception.getClass.getName,
exception.getStackTrace.mkString("
"), x)))
}
} )
val mappedSuccess = mapped.flatMap((x: ProcessingResult[T,U]) => x.result)
val mappedFailure = mapped.flatMap((x: ProcessingResult[T,U]) => x.error)
(mappedSuccess, mappedFailure)
}
}
flatMap
和filter
函数看起来非常相似,但它们分别使用ProcessingResult[T,List[T]]
和ProcessingResult[T,T]
。
我使用如下函数:
val (result, errors) = FlinkUtils.filterWithDeadLetterQueue(input, (x: MyMessage) => {
x.`type` match {
case "something" => throw new Exception("how how how")
case "something else" => false
case _ => true
}
})
这篇关于带有死信队列的Flink Scala贴图的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文