如何使用 ksql 在 Kafka 的时间窗口内在聚合之上执行聚合 [英] How can I perform an aggregation on top of an aggregation over a time window in Kafka with ksql

查看:23
本文介绍了如何使用 ksql 在 Kafka 的时间窗口内在聚合之上执行聚合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一堆防火墙数据.我想:

A) 将每个 IP 每小时的字节数相加,然后

B) 计算该小时内所有 IP 的最小和最大总和

我已经能够在 Kafka 中做 A,但是,我不知道如何做 B.我一直在研究文档,感觉自己快要接近了,但我似乎总是只找到了一部分解决方案.

我的 firewall_stream 运行良好.

client.create_stream(table_name='firewall_stream',columns_type=['src_ip VARCHAR','dst_ip VARCHAR','src_port 整数','dst_port INTEGER','协议 VARCHAR','动作 VARCHAR','时间戳 VARCHAR','字节 BIGINT',],主题='防火墙',value_format='JSON')

我创建了实体化视图 bytes_sent,滚动窗口为 1 小时,总和(字节)并按 IP 地址分组.这很好用!

client.ksql('''CREATE TABLE bytes_sent asSELECT src_ip, sum(bytes) as bytes_sumFROM firewall_stream按 src_ip 分组发出更改''')

这就是我卡住的地方.首先,我尝试从 bytes_sent 创建另一个物化视图,它通过 windowstart 做了一个 max(bytes_sum) 组,但我得到一个错误,你不能在窗口化的物化视图上进行聚合.>

然后我删除了时间窗口(我想我会把它放回第二个物​​化视图中),但是我的分组依据"没有任何字段.条款.在 Postgres 中,我可以在没有 group by 的情况下做 max,它会在整个表中计算它,但 Kafka 总是需要那个 group by.现在我不知道该用什么.

似乎无法与文档中的窗口表进行连接(尽管我没有尝试过,可能会产生误解).

我唯一的猜测是从物化视图 bytes_sent 创建另一个流并查看更改日志事件,然后以某种方式将它们转换为给定时间窗口内所有 IP 的最大字节数.

任何有关如何处理此问题的反馈将不胜感激!

解决方案

简而言之,您目前无法执行此操作.

我的快速解决方案基本上是:

  • 创建一个主题作为第一次聚合的输出
  • 在该新主题上创建一个新流,但在 ksql 之外
  • 在第二个上运行新的 ksql 聚合

也就是说,在这种设置中可能会出错很多.在这一点上,我们只是针对这个特定用例排除了 ksql,并且可能会直接使用流.

I have a bunch of firewall data. I would like to:

A) sum the bytes per IP per hour, and then

B) calculate the min and max sums across all IPs in that hour

I have been able to do A in Kafka, however, I cannot figure out how to do B. I've been poring over the docs and feel like I'm closing in but I always seem to find only part of the solution.

I have my firewall_stream going great.

client.create_stream(
    table_name='firewall_stream',
    columns_type=['src_ip VARCHAR',
                  'dst_ip VARCHAR',
                  'src_port INTEGER',
                  'dst_port INTEGER',
                  'protocol VARCHAR',
                  'action VARCHAR',
                  'timestamp VARCHAR',
                  'bytes BIGINT',
    ],
    topic='firewall',
    value_format='JSON'
)

I created materialized view bytes_sent with tumbling window of 1 hour, sum(bytes) and group by IP address. This works great!.

client.ksql('''
CREATE TABLE bytes_sent as
  SELECT src_ip, sum(bytes) as bytes_sum
  FROM firewall_stream
  GROUP BY src_ip
  EMIT CHANGES
''')

This is where I get stuck. First I tried to just create another materialized view off of bytes_sent that did a max(bytes_sum) group by windowstart but I got an error that you can't do an aggregation on a windowed materialized view.

So then I removed the time window (figured I'd put it back on in the 2nd materialized view), but then I don't have any field for my "group by" clause. In Postgres, I could do max without a group by and it will calculate it across the table but Kafka always requires that group by. And now I'm not sure what to use.

It appears one cannot do joins with windowed tables from the docs (although I haven't tried it and may be misunderstanding).

My only other guess is to create another stream from that materialized view bytes_sent and look at changelog events then somehow turn them into a max bytes across all IPs within a given time window.

Any feedback on how to approach this would be greatly appreciated!!

解决方案

The short answer is you can't currently do this.

My quick solution was to essentially:

  • create a topic as the output of the first aggregation
  • create a new stream on that new topic, but outside of ksql
  • run the new ksql aggregation on that 2nd one

That said, there's a lot that can go wrong in this sort of setup. At this point we're just ruling out ksql for this particular use case and will likely use streams directly.

这篇关于如何使用 ksql 在 Kafka 的时间窗口内在聚合之上执行聚合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆