如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合? [英] How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState?

查看:20
本文介绍了如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在不使用 flatMapsGroupWithState 或 Dstream API 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?寻找更具声明性的方式

How to do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState or Dstream API? looking for a more declarative way

示例:

select count(*) from some_view

我希望输出只计算每批中可用的任何记录,而不是从上一批中汇总

I want the output to just count whatever records are available in each batch but not aggregate from the previous batch

推荐答案

要在 Spark 中使用 Structured Streaming 2.3.0 进行无状态聚合而不使用 flatMapsGroupWithState 或 Dstream API,您可以使用以下代码-

To do stateless aggregations in spark using Structured Streaming 2.3.0 without using flatMapsGroupWithState or Dstream API, you can use following code-

import spark.implicits._

def countValues = (_: String, it: Iterator[(String, String)]) => it.length

val query =
  dataStream
    .select(lit("a").as("newKey"), col("value"))
    .as[(String, String)]
    .groupByKey { case(newKey, _) => newKey }
    .mapGroups[Int](countValues)
    .writeStream
    .format("console")
    .start()

这里我们正在做的是-

  1. 我们向 datastream - newKey 添加了一列.我们这样做是为了我们可以使用 groupByKey 对其进行 groupBy.我使用了文字字符串 "a",但您可以使用任何东西.此外,您需要从 datastream 中的可用列中选择任何列.为此,我选择了 value 列,您可以选择任何人.
  2. 我们创建了一个映射函数——countValues,通过写it.length来统计groupByKey函数聚合的值.
  1. We added one column to our datastream - newKey. We did this so that we can do a groupBy over it, using groupByKey. I have used a literal string "a", but you can use anything. Also, you need to select anyone column from the available columns in datastream. I have selected value column for this purpose, you can select anyone.
  2. We created a mapping function - countValues, to count the values aggregated by groupByKey function by writing it.length.

因此,通过这种方式,我们可以计算每批中可用的记录,但不能从前一批中汇总.

So, in this way, we can count whatever records are available in each batch but not aggregating from the previous batch.

希望能帮到你!

这篇关于如何在不使用 flatMapsGroupWithState 的情况下使用 Structured Streaming 2.3.0 在 spark 中进行无状态聚合?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆