Apache Flink:ProcessWindowFunction 实现 [英] Apache Flink: ProcessWindowFunction implementation

查看:55
本文介绍了Apache Flink:ProcessWindowFunction 实现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 Scala 在我的 Apache Flink 项目中使用 ProcessWindowFunction.不幸的是,我已经无法实现基本的 ProcessWindowFunction,就像 Apache Flink 文档中使用的那样.

I am trying to use a ProcessWindowFunction in my Apache Flink project using Scala. Unfortunately, I already fail at implementing a basic ProcessWindowFunction like it is used in the Apache Flink Documentation.

这是我的代码:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.windowing.time.Time
import org.fiware.cosmos.orion.flink.connector.{NgsiEvent, OrionSource}
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.util.Collector
import scala.collection.TraversableOnce

object StreamingJob {
 def main(args: Array[String]) {

 val env = StreamExecutionEnvironment.getExecutionEnvironment
 val eventStream = env.addSource(new OrionSource(9001))

 val processedDataStream = eventStream.flatMap(event => event.entities)
   .map(entity => (entity.id, entity.attrs("temperature").value.asInstanceOf[String]))
     .keyBy(_._1)
     .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
     .process(new MyProcessWindowFunction())

 env.execute("Socket Window NgsiEvent")
 }
}


private class MyProcessWindowFunction extends ProcessWindowFunction[(String, String), String, String, TimeWindow] {

def process(key: String, context: Context, input: Iterable[(String, String)], out: Collector[String]): Unit = {
  var count: Int = 0
  for (in <- input) {
    count = count + 1
  }
  out.collect(s"Window ${context.window} count: $count")
 }
}

从 IntelliJ 我得到以下提示:

From IntelliJ I get the following hints:

1) 这显示了创建新类对象的位置:

1) This is shown where the new class object is created:

Type mismatch, expected: ProcessWindowFunction[(String, String), NotInferedR, String, TimeWindow], actual: MyProcessWindowFunction

2) 这直接显示在课堂上:

2) This is shown directly at the class:

Class 'MyProcessWindowFunction' must either be declared abstract or implement abstract member 'process(key:KEY, context:ProcessWindowFunction.Context, iterable:Iterable<IN>, collector:Collector<OUT>):void' in 'org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction'

构建代码显示以下错误:

Building the code shows me the following error:

Error:(51, 16) type mismatch;
found   : org.apache.flink.MyProcessWindowFunction
required: 
org.apache.flink.streaming.api.scala.function.ProcessWindowFunction[(String, String),?,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]
  .process(new MyProcessWindowFunction())

我很感激每一个帮助.

推荐答案

经过与另外 2 个人的调试,我们终于找到了问题所在.

After spending some time debugging with 2 more people we finally managed to find the problem.

在我的代码中,我使用了以下导入:

In my code I used the following import:

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction

但是使用 Scala 时正确的导入似乎是:

But the correct import when using Scala seems to be:

import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction

这篇关于Apache Flink:ProcessWindowFunction 实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆