在WindowedStream中查找计数-Flink [英] Find count in WindowedStream - Flink
问题描述
我在Streams领域还很陌生,在我的第一次尝试中遇到了一些问题.
I am pretty new in the world of Streams and I am facing some issues in my first try.
更具体地说,我正在尝试使用Flink在滑动窗口中实现count和groupBy功能.
More specifically, I am trying to implement a count and groupBy functionality in a sliding window using Flink.
我已经在普通的DateStream
中完成了此操作,但是我无法使其在WindowedStream
中工作.
I 've done it in a normal DateStream
but I am not able to make it work in a WindowedStream
.
您对我该如何做有什么建议?
Do you have any suggestion on how can I do it?
val parsedStream: DataStream[(String, Response)] = stream
.mapWith(_.decodeOption[Response])
.filter(_.isDefined)
.map { record =>
(
s"${record.get.group.group_country}, ${record.get.group.group_state}, ${record.get.group.group_city}",
record.get
)
}
val result: DataStream[((String, Response), Int)] = parsedStream
.map((_, 1))
.keyBy(_._1._1)
.sum(1)
// The output of result is
// ((us, GA, Atlanta,Response()), 14)
// ((us, SA, Atlanta,Response()), 4)
result
.keyBy(_._1._1)
.timeWindow(Time.seconds(5))
//the following part doesn't compile
.apply(
new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
def apply(
key: Tuple,
window: TimeWindow,
values: Iterable[(String, Response)],
out: Collector[(String, Int)]
) {}
}
)
编译错误:
overloaded method value apply with alternatives:
[R](function: (String, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[((String, com.flink.Response), Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$28: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
[R](function: org.apache.flink.streaming.api.scala.function.WindowFunction[((String, com.flink.Response), Int),R,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$27: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
cannot be applied to (org.apache.flink.streaming.api.functions.windowing.WindowFunction[((String, com.flink.Response), Int),(String, com.flink.Response),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]{def apply(key: String,window: org.apache.flink.streaming.api.windowing.windows.TimeWindow,input: Iterable[((String, com.flink.Response), Int)],out: org.apache.flink.util.Collector[(String, com.flink.Response)]): Unit})
.apply(
推荐答案
我尝试了您的代码并发现了错误,在声明WindowFunction
的类型时似乎出现了错误.
I have tried Your code and found the errors, it seems that you have an error when declaring the types for your WindowFunction
.
文档说WindowFunction
的预期类型是WindowFunction[IN, OUT, KEY, W <: Window]
.现在,如果您看一下您的代码,您的IN
是您正在计算窗口的数据流的类型.流的类型是((String, Response), Int)
,而不是代码(String, Int)
中声明的类型.
The documentation says that the expected types for WindowFunction
are WindowFunction[IN, OUT, KEY, W <: Window]
. Now, if you take a look at Your code, Your IN
is the type of the datastream that You are calculating windows on. The type of the stream is ((String, Response), Int)
and not as declared in the code (String, Int)
.
如果您要将未编译的部分更改为:
If You will change the part that is not compiling to :
.apply(new WindowFunction[((String, Response), Int), (String, Response), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[((String, Response), Int)], out: Collector[(String, Response)]): Unit = ???
})
至于第二个示例,通常由于相同的原因而发生错误.当您将keyBy
与Tuple
结合使用时,可以使用两个功能来使用keyBy(fields: Int*)
,该功能使用提供的索引使用整数来访问元组的字段(这就是您所使用的).还有keyBy(fun: T => K)
,其中您提供了提取将要使用的密钥的功能.
As for the second example the error occurs because of the same reason in general. When You are using keyBy
with Tuple
You have two possible functions to use keyBy(fields: Int*)
, which uses integer to access field of the tuple using index provided (this is what You have used). And also keyBy(fun: T => K)
where You provide a function to extract the key that will be used.
但是这些函数之间有一个重要的区别,其中一个函数将键返回为JavaTuple
,而另一个函数将返回其确切类型的键.
因此,基本上,如果您在简化示例中将String
更改为Tuple
,则应该可以清楚地编译.
But there is one important difference between those functions one of them returns key as JavaTuple
and the other one returns the key with its exact type.
So basically If You change the String
to Tuple
in Your simplified example it should compile clearly.
这篇关于在WindowedStream中查找计数-Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!