在WindowedStream中查找计数-Flink [英] Find count in WindowedStream - Flink

查看:359
本文介绍了在WindowedStream中查找计数-Flink的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Streams领域还很陌生,在我的第一次尝试中遇到了一些问题.

I am pretty new in the world of Streams and I am facing some issues in my first try.

更具体地说,我正在尝试使用Flink在滑动窗口中实现count和groupBy功能.

More specifically, I am trying to implement a count and groupBy functionality in a sliding window using Flink.

我已经在普通的DateStream中完成了此操作,但是我无法使其在WindowedStream中工作.

I 've done it in a normal DateStream but I am not able to make it work in a WindowedStream.

您对我该如何做有什么建议?

Do you have any suggestion on how can I do it?

val parsedStream: DataStream[(String, Response)] = stream
      .mapWith(_.decodeOption[Response])
      .filter(_.isDefined)
      .map { record =>
        (
          s"${record.get.group.group_country}, ${record.get.group.group_state}, ${record.get.group.group_city}",
          record.get
        )
      }

val result: DataStream[((String, Response), Int)] = parsedStream
      .map((_, 1))
      .keyBy(_._1._1)
      .sum(1)

// The output of result is 
// ((us, GA, Atlanta,Response()), 14)
// ((us, SA, Atlanta,Response()), 4)

result
      .keyBy(_._1._1)
      .timeWindow(Time.seconds(5))

//the following part doesn't compile

      .apply(
        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
          def apply(
                   key: Tuple,
                   window: TimeWindow,
                   values: Iterable[(String, Response)],
                   out: Collector[(String, Int)]
                   ) {}
        }
      )

编译错误:

overloaded method value apply with alternatives:
  [R](function: (String, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[((String, com.flink.Response), Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$28: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
  [R](function: org.apache.flink.streaming.api.scala.function.WindowFunction[((String, com.flink.Response), Int),R,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$27: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
 cannot be applied to (org.apache.flink.streaming.api.functions.windowing.WindowFunction[((String, com.flink.Response), Int),(String, com.flink.Response),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]{def apply(key: String,window: org.apache.flink.streaming.api.windowing.windows.TimeWindow,input: Iterable[((String, com.flink.Response), Int)],out: org.apache.flink.util.Collector[(String, com.flink.Response)]): Unit})
      .apply(

推荐答案

我尝试了您的代码并发现了错误,在声明WindowFunction的类型时似乎出现了错误.

I have tried Your code and found the errors, it seems that you have an error when declaring the types for your WindowFunction.

文档说WindowFunction的预期类型是WindowFunction[IN, OUT, KEY, W <: Window].现在,如果您看一下您的代码,您的IN是您正在计算窗口的数据流的类型.流的类型是((String, Response), Int),而不是代码(String, Int)中声明的类型.

The documentation says that the expected types for WindowFunction are WindowFunction[IN, OUT, KEY, W <: Window]. Now, if you take a look at Your code, Your IN is the type of the datastream that You are calculating windows on. The type of the stream is ((String, Response), Int) and not as declared in the code (String, Int).

如果您要将未编译的部分更改为:

If You will change the part that is not compiling to :

.apply(new WindowFunction[((String, Response), Int), (String, Response), String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[((String, Response), Int)], out: Collector[(String, Response)]): Unit = ???
})

至于第二个示例,通常由于相同的原因而发生错误.当您将keyByTuple结合使用时,可以使用两个功能来使用keyBy(fields: Int*),该功能使用提供的索引使用整数来访问元组的字段(这就是您所使用的).还有keyBy(fun: T => K),其中您提供了提取将要使用的密钥的功能.

As for the second example the error occurs because of the same reason in general. When You are using keyBy with Tuple You have two possible functions to use keyBy(fields: Int*), which uses integer to access field of the tuple using index provided (this is what You have used). And also keyBy(fun: T => K) where You provide a function to extract the key that will be used.

但是这些函数之间有一个重要的区别,其中一个函数将键返回为JavaTuple,而另一个函数将返回其确切类型的键. 因此,基本上,如果您在简化示例中将String更改为Tuple,则应该可以清楚地编译.

But there is one important difference between those functions one of them returns key as JavaTuple and the other one returns the key with its exact type. So basically If You change the String to Tuple in Your simplified example it should compile clearly.

这篇关于在WindowedStream中查找计数-Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆