如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合? [英] How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState?
问题描述
如何在不使用 flatMapsGroupWithState 或 Dstream API 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?寻找更具声明性的方式
How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState or Dstream API? looking for a more declarative way
示例:
select count(*) from some_view
我希望输出只计算每批中可用的任何记录,而不是从上一批中汇总
I want the output to just count whatever records are available in each batch but not aggregate from the previous batch
推荐答案
要在 Spark 中使用 Structured Streaming 2.3.0 进行无状态聚合而不使用 flatMapsGroupWithState
或 Dstream API,您可以使用以下代码-
To do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState
or Dstream API, you can use following code-
import spark.implicits._
def countValues = (_: String, it: Iterator[(String, String)]) => it.length
val query =
dataStream
.select(lit("a").as("newKey"), col("value"))
.as[(String, String)]
.groupByKey { case(newKey, _) => newKey }
.mapGroups[Int](countValues)
.writeStream
.format("console")
.start()
这里我们正在做的是-
- 我们向
datastream
-newKey
添加了一列.我们这样做是为了我们可以使用groupByKey
对其进行groupBy
.我使用了文字字符串"a"
,但您可以使用任何东西.此外,您需要从datastream
中的可用列中选择任何列.为此,我选择了value
列,您可以选择任何人. - 我们创建了一个映射函数——
countValues
,通过写it.length
来统计groupByKey
函数聚合的值.
- We added one column to our
datastream
-newKey
. We did this so that we can do agroupBy
over it, usinggroupByKey
. I have used a literal string"a"
, but you can use anything. Also, you need to select anyone column from the available columns indatastream
. I have selectedvalue
column for this purpose, you can select anyone. - We created a mapping function -
countValues
, to count the values aggregated bygroupByKey
function by writingit.length
.
因此,通过这种方式,我们可以计算每批中可用的记录,但不能从前一批中汇总.
So, in this way, we can count whatever records are available in each batch but not aggregating from the previous batch.
希望能帮到你!
这篇关于如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!