用于CEP的简单Scala API示例不显示任何输出 [英] Simple Scala API for CEP example don't show any output
问题描述
我正在编写一个简单的示例,使用最新的Github版本1.1-SNAPSHOT在Flink中测试用于CEP的新Scala API.
I'm programming a simple example for testing the new Scala API for CEP in Flink, using the latest Github version for 1.1-SNAPSHOT.
Pattern仅检查值,并为匹配的每个模式输出一个String作为结果.代码如下:
The Pattern is only a check for a value, and outputs a single String as a result for each pattern matched. Code is as follows:
val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)
val cepEventAlert = CEP.pattern(streamingAlert, pattern)
def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
val startEvent = pattern.get("start").get
"Alerta:"+startEvent._1+": Pattern"
}
val patternStreamSelected = cepEventAlert.select(selectFn(_))
patternStreamSelected.print()
它可以编译并在1.1-SNAPSHOT下运行,没有问题,但是jobmanager输出没有显示该print()的迹象.即使放宽模式条件,仅设置一个开始"(接受所有事件),也绝对不会返回任何结果.
It compiles and runs under 1.1-SNAPSHOT without issue, but the jobmanager output shows no sign of that print(). Even relaxing the pattern conditions, and setting only a "start" (Accepting all events) returns absolutely nothing.
此外,当尝试添加阶段时,代码无法编译.如果我将模式更改为(查找三个字段小于4的两个连续事件):
Also, when trying to add stages, the code fails to compile. If I change the Pattern to (Finding two consecutive events with 3rd field less than 4):
Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))
然后,编译器将抛出:
error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))
在开始"阶段之后的第一个where()中显示错误.我试图通过以下方式显式设置参数类型:
Showing the error is on the first where() after the "start" stage. I tryed to explicitly set the parameter type with:
(x: (String, Long, Int)) => x._3 < 4
那样,它将再次编译,但是当它在Flink上运行时,则不会显示任何输出. StreamingAlert是Scala DataStream [(String,Long,Int)],在代码的其他部分,我可以使用_._ < 4
进行过滤,而不会出现问题,并且输出看起来是正确的.
That way it compiles again, but when it runs on Flink, then no output is shown. StreamingAlert is a Scala DataStream[(String, Long, Int)], and in other parts of code, I can filter with _._ < 4
without problems and the output seems correct.
推荐答案
流式API中的print()
API调用不会触发热切的执行.您仍然必须在程序结尾处调用env.execute()
.
The print()
API call in the streaming API does not trigger eager execution. You still have to call env.execute()
at the end of your program.
定义模式时,应在某处提供事件类型.您可以按照自己的方式进行操作,也可以通过begin
的类型参数进行操作:
When you define your pattern you should provide the event type somewhere. Either you do it as you've done it or you do it via a type parameter for begin
:
Pattern.begin[(String, Long, Int)]("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))
这篇关于用于CEP的简单Scala API示例不显示任何输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!