用于CEP的简单Scala API示例不显示任何输出 [英] Simple Scala API for CEP example don't show any output

查看:121
本文介绍了用于CEP的简单Scala API示例不显示任何输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个简单的示例,使用最新的Github版本1.1-SNAPSHOT在Flink中测试用于CEP的新Scala API.

I'm programming a simple example for testing the new Scala API for CEP in Flink, using the latest Github version for 1.1-SNAPSHOT.

Pattern仅检查值,并为匹配的每个模式输出一个String作为结果.代码如下:

The Pattern is only a check for a value, and outputs a single String as a result for each pattern matched. Code is as follows:

val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)

val cepEventAlert = CEP.pattern(streamingAlert, pattern)

def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
    val startEvent = pattern.get("start").get
    "Alerta:"+startEvent._1+": Pattern"
}

val patternStreamSelected = cepEventAlert.select(selectFn(_))

patternStreamSelected.print()

它可以编译并在1.1-SNAPSHOT下运行,没有问题,但是jobmanager输出没有显示该print()的迹象.即使放宽模式条件,仅设置一个开始"(接受所有事件),也绝对不会返回任何结果.

It compiles and runs under 1.1-SNAPSHOT without issue, but the jobmanager output shows no sign of that print(). Even relaxing the pattern conditions, and setting only a "start" (Accepting all events) returns absolutely nothing.

此外,当尝试添加阶段时,代码无法编译.如果我将模式更改为(查找三个字段小于4的两个连续事件):

Also, when trying to add stages, the code fails to compile. If I change the Pattern to (Finding two consecutive events with 3rd field less than 4):

Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))

然后,编译器将抛出:

error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))

在开始"阶段之后的第一个where()中显示错误.我试图通过以下方式显式设置参数类型:

Showing the error is on the first where() after the "start" stage. I tryed to explicitly set the parameter type with:

(x: (String, Long, Int)) => x._3 < 4

那样,它将再次编译,但是当它在Flink上运行时,则不会显示任何输出. StreamingAlert是Scala DataStream [(String,Long,Int)],在代码的其他部分,我可以使用_._ < 4进行过滤,而不会出现问题,并且输出看起来是正确的.

That way it compiles again, but when it runs on Flink, then no output is shown. StreamingAlert is a Scala DataStream[(String, Long, Int)], and in other parts of code, I can filter with _._ < 4 without problems and the output seems correct.

推荐答案

流式API中的print() API调用不会触发热切的执行.您仍然必须在程序结尾处调用env.execute().

The print() API call in the streaming API does not trigger eager execution. You still have to call env.execute() at the end of your program.

定义模式时,应在某处提供事件类型.您可以按照自己的方式进行操作,也可以通过begin的类型参数进行操作:

When you define your pattern you should provide the event type somewhere. Either you do it as you've done it or you do it via a type parameter for begin:

Pattern.begin[(String, Long, Int)]("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))

这篇关于用于CEP的简单Scala API示例不显示任何输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆