如何在不使用flatMapsGroupWithState的情况下使用结构化流2.3.0在Spark中进行无状态聚合? [英] How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState?
问题描述
如何在不使用flatMapsGroupWithState或Dstream API的情况下使用结构化流2.3.0在Spark中进行无状态聚合?寻找一种更具声明性的方式
How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState or Dstream API? looking for a more declarative way
示例:
select count(*) from some_view
我希望输出仅计算每个批次中可用的任何记录,而不是前一个批次的总和
I want the output to just count whatever records are available in each batch but not aggregate from the previous batch
推荐答案
要在不使用flatMapsGroupWithState
或Dstream API的情况下使用结构化流2.3.0在spark中进行无状态聚合,可以使用以下代码-
To do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState
or Dstream API, you can use following code-
import spark.implicits._
def countValues = (_: String, it: Iterator[(String, String)]) => it.length
val query =
dataStream
.select(lit("a").as("newKey"), col("value"))
.as[(String, String)]
.groupByKey { case(newKey, _) => newKey }
.mapGroups[Int](countValues)
.writeStream
.format("console")
.start()
这是我们正在做的-
- 我们在
datastream
-newKey
中添加了一列.我们这样做是为了可以使用groupByKey
在其上执行groupBy
.我使用了文字字符串"a"
,但是您可以使用任何东西.另外,您需要从datastream
中的可用列中选择任何人列.为此,我选择了value
列,您可以选择任何人. - 我们创建了一个映射函数-
countValues
,以通过编写it.length
来计算groupByKey
函数汇总的值.
- We added one column to our
datastream
-newKey
. We did this so that we can do agroupBy
over it, usinggroupByKey
. I have used a literal string"a"
, but you can use anything. Also, you need to select anyone column from the available columns indatastream
. I have selectedvalue
column for this purpose, you can select anyone. - We created a mapping function -
countValues
, to count the values aggregated bygroupByKey
function by writingit.length
.
因此,通过这种方式,我们可以计算每个批次中可用的任何记录,但不能与前一个批次进行汇总.
So, in this way, we can count whatever records are available in each batch but not aggregating from the previous batch.
希望对您有帮助!
这篇关于如何在不使用flatMapsGroupWithState的情况下使用结构化流2.3.0在Spark中进行无状态聚合?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!