KSQL窗口聚合流 [英] KSQL Windowed Aggregation Stream

查看:243
本文介绍了KSQL窗口聚合流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图使用我有一个用kafka主题制作的 STREAM ,并指定了 TIMESTAMP 属性.

I have a STREAM made from a kafka topic with the TIMESTAMP property well specified.

当我尝试创建带有会话窗口的 STREAM 时,查询如下:

When I try to create a STREAM with a Session Windowing with a query like:

CREATE STREAM SESSION_STREAM AS
SELECT ...
  FROM EVENT_STREAM
WINDOW SESSION (5 MINUTES)
   GROUP BY ...;

我总是会收到错误:

您的SELECT查询产生一个TABLE.请改用CREATE TABLE AS SELECT语句.

Your SELECT query produces a TABLE. Please use CREATE TABLE AS SELECT statement instead.

是否可以通过窗口聚合创建 STREAM ?

Is it possible to create a STREAM with a Windowed Aggregation?

当我按照建议尝试创建 TABLE ,然后创建一个包含所有会话开始事件的 STREAM 时,查询如下:

When I try as suggested to create a TABLE and then a STREAM that contains all the session starting events, with a query like:

CREATE STREAM SESSION_START_STREAM AS
SELECT *
  FROM SESSION_TABLE
 WHERE WINDOWSTART=WINDOWEND;

KSQL通知我:

KSQL不支持对窗口表进行持久查询

KSQL does not support persistent queries on windowed tables

如何在启动KSQL中的会话窗口时创建事件的 STREAM ?

How to create a STREAM of events starting a session window in KSQL?

推荐答案

您的create stream语句(如果切换到create table语句)将创建一个不断更新的表.接收器主题 SESSION_STREAM 将包含对表的更改流,即其更改日志.

Your create stream statement, if switched to a create table statement will create a table that is constantly being updated. The sink topic SESSION_STREAM will contain the stream of changes to the table, i.e. its changelog.

ksqlDB将其建模为TABLE,因为它具有TABLE语义,即,具有任何特定键的表中只能存在一行.但是,更改日志将包含已应用于表的更改的流.

ksqlDB models this as a TABLE, because it has TABLE semantics, i.e. only a single row can exist in the table with any specific key. However, the changelog will contain the STREAM of changes that have been applied to the table.

如果您想要的是一个包含所有会话的主题,则类似这样的内容将创建该内容:

If what you want is a topic containing all the sessions then something like this will create that:

-- create a stream with a new 'data' topic:
CREATE STREAM DATA (USER_ID INT) 
    WITH (kafka_topic='data', value_format='json');

-- create a table that tracks user interactions per session:
CREATE TABLE SESSION AS
SELECT USER_ID, COUNT(USER_ID) AS COUNT
  FROM DATA
WINDOW SESSION (5 SECONDS)
   GROUP BY USER_ID;

这将创建一个 SESSIONS 主题,其中包含对 SESSIONS 表的更改:即其更改日志.

This will create a SESSIONS topic that contains the changes to the SESSIONS table: i.e. its changelog.

如果要将其转换为会话启动事件流,那么不幸的是,ksqlDB还没有 允许您直接从表中更改创建流,但是可以通过表的更改日志:

If you want to convert this into a stream of session start events, then unfortunately ksqlDB doesn't yet allow you to directly change create a stream from the table, but you can create a stream over the table's change log:

-- Create a stream over the existing `SESSIONS` topic.
-- Note it states the window_type is 'Session'.
CREATE STREAM SESSION_STREAM (ROWKEY INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');

-- Create a stream of window start events:
CREATE STREAM SESSION_STARTS AS 
    SELECT * FROM SESSION_STREAM 
    WHERE WINDOWSTART = WINDOWEND;

注意,在即将发布的0.10版本中,您将能够正确命名 SESSION_STREAM 中的键列:

Note, with the upcoming 0.10 release you'll be able to name the key column in the SESSION_STREAM correctly:

CREATE STREAM SESSION_STREAM (USER_ID INT KEY, COUNT BIGINT) 
   WITH (kafka_topic='SESSIONS', value_format='JSON', window_type='Session');

这篇关于KSQL窗口聚合流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆