带有死信队列的Flink Scala贴图 [英] flink scala map with dead letter queue

查看:0
本文介绍了带有死信队列的Flink Scala贴图的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一些Scala函数,以帮助Flinkmapfilter操作将其错误重定向到死信队列。

然而,我正在努力解决Scala的类型擦除问题,这使我无法使它们成为泛型。下面mapWithDeadLetterQueue的实现未编译。


sealed trait ProcessingResult[T]
case class ProcessingSuccess[T,U](result: U) extends ProcessingResult[T]
case class ProcessingError[T: TypeInformation](errorMessage: String, exceptionClass: String, stackTrace: String, sourceMessage: T) extends ProcessingResult[T]

object FlinkUtils {
    // https://stackoverflow.com/questions/1803036/how-to-write-asinstanceofoption-in-scala
    implicit class Castable(val obj: AnyRef) extends AnyVal {
        def asInstanceOfOpt[T <: AnyRef : ClassTag] = {
            obj match {
            case t: T => Some(t)
            case _ => None
            }
        }
    }

    def mapWithDeadLetterQueue[T: TypeInformation,U: TypeInformation](source: DataStream[T], func: (T => U)): (DataStream[U], DataStream[ProcessingError[T]]) = {
        val mapped = source.map(x => { 
            val result = Try(func(x)) 
            result match {
                case Success(value) => ProcessingSuccess(value)
                case Failure(exception) => ProcessingError(exception.getMessage, exception.getClass.getName, exception.getStackTrace.mkString("
"), x)
            }
        } )
        val mappedSuccess = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingSuccess[T,U]]).map(x => x.result)
        val mappedFailure = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingError[T]])
        (mappedSuccess, mappedFailure)
    }
  
}

我得到:

[error] FlinkUtils.scala:35:36: overloaded method value flatMap with alternatives:
[error]   [R](x$1: org.apache.flink.api.common.functions.FlatMapFunction[Product with Serializable with ProcessingResult[_ <: T],R], x$2: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[R] <and>
[error]   [R](x$1: org.apache.flink.api.common.functions.FlatMapFunction[Product with Serializable with ProcessingResult[_ <: T],R])org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator[R]
[error]  cannot be applied to (ProcessingResult[T] => Option[ProcessingSuccess[T,U]])
[error]         val mappedSuccess = mapped.flatMap((x: ProcessingResult[T]) => x.asInstanceOfOpt[ProcessingSuccess[T,U]]).map(x => x.result)

有没有办法做到这一点?

推荐答案

好的,我将回答我自己的问题。我犯了几个错误:

  • 首先,我严格地包含了Java数据流类,而不是Scala数据流类(这种情况经常发生)。Java变体显然不接受映射/过滤器/平面映射的Scala lambda
  • 第二,Flinks序列化不支持密封特征。有a project应该可以解决,但我还没有试过。

解决方案:首先,我没有使用密封的特征,而是一个带有两个选项的简单Case类(表达能力稍弱一些,但仍然有效):

case class ProcessingError[T](errorMessage: String, exceptionClass: String, stackTrace: String, sourceMessage: T)
case class ProcessingResult[T: TypeInformation, U: TypeInformation](result: Option[U], error: Option[ProcessingError[T]])

然后,我可以让一切都像这样工作:

object FlinkUtils {
    def mapWithDeadLetterQueue[T: TypeInformation: ClassTag,U: TypeInformation: ClassTag]
       (source: DataStream[T], func: (T => U)): 
       (DataStream[U], DataStream[ProcessingError[T]]) = {
        implicit val typeInfo = TypeInformation.of(classOf[ProcessingResult[T,U]])

        val mapped = source.map((x: T) => { 
            val result = Try(func(x)) 
            result match {
                case Success(value) => ProcessingResult[T, U](Some(value), None)
                case Failure(exception) => ProcessingResult[T, U](None, Some(
                  ProcessingError(exception.getMessage, exception.getClass.getName, 
                           exception.getStackTrace.mkString("
"), x)))
            }
        } )
        val mappedSuccess = mapped.flatMap((x: ProcessingResult[T,U]) => x.result)
        val mappedFailure = mapped.flatMap((x: ProcessingResult[T,U]) => x.error)
        (mappedSuccess, mappedFailure)
    }

}

flatMapfilter函数看起来非常相似,但它们分别使用ProcessingResult[T,List[T]]ProcessingResult[T,T]

我使用如下函数:

val (result, errors) = FlinkUtils.filterWithDeadLetterQueue(input, (x: MyMessage) => {
          x.`type` match {
            case "something" => throw new Exception("how how how")
            case "something else" => false
            case _ => true
          }
})

这篇关于带有死信队列的Flink Scala贴图的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆