ksqldb相关内容

如何使用 ksql 在 Kafka 的时间窗口内在聚合之上执行聚合

我有一堆防火墙数据.我想: A) 将每个 IP 每小时的字节数相加,然后 B) 计算该小时内所有 IP 的最小和最大总和 我已经能够在 Kafka 中做 A,但是,我不知道如何做 B.我一直在研究文档,感觉自己快要接近了,但我似乎总是只找到了一部分解决方案. 我的 firewall_stream 运行良好. client.create_stream(table_name= ..
发布时间:2021-11-12 03:42:14 其他开发

是否可以从 kafka 消息中获取消息密钥的最新值

假设我对同一个消息键有不同的值. 例如: {用户名:1​​,电子邮件:user123@xyz.com }{用户名:1​​,电子邮件:user456@xyz.com }{用户名:1​​,电子邮件:user789@xyz.com } 在上述情况下,我只想要用户更新的最新值,即“user789@xyz.com". 我的 kafka 流应该只给我第三个值,而不是前两个值. 解决方案 ..

从 Postgres 表到带有更新跟踪的 KSQL 表

我的任务是将数据从 Postgres 表传输到 KSQL 表(以便将来与流连接).假设表有三条记录: id |姓名 |描述-------------------------1 |姓名1 |说明12 |姓名2 |说明23 |姓名3 |描述3 借助 Kafka JdbcSourceConnector 很容易做到.但是有一个小问题——表中的数据可能会发生变化.更改也必须在 KTable 中. ..
发布时间:2021-11-12 03:32:08 其他开发

从 Postgres 到 Kafka 的变化跟踪

这个问题跟在这个问题之后. 主要任务是在 KSQL 端进行连接.下面的例子将说明它.事件消息到达 Kafka 主题.该消息的结构: [{"name": "from_ts",“类型":“bigint"},{"name": "to_ts",“类型":“bigint"},{"name": "rulenode_id",“类型":“整数"}] 还有一个 Postgres 表 rulenode: ..
发布时间:2021-11-12 03:29:55 其他开发

将 JSON 模式附加到 KSQL 流记录

我一直在使用 KSQL,到目前为止效果很好.但是现在我想通过 Kafka Connect 将输出下沉到 BigQuery,并且需要附加一个 JSON 模式.我很难弄清楚如何做到这一点.这是我的查询: CREATE STREAM tweets_original (创建于 BIGINT,标识 BIGINT,文本 VARCHAR,源 VARCHAR,地理位置 VARCHAR,用户 STRUCT)WIT ..

如何根据连接器名称获取 Kafka 源连接器架构

我正在使用 Confluent JDBC Kafka 连接器将消息发布到主题中.源连接器将在每次轮询时将数据与模式一起发送到主题.我想检索这个模式. 有可能吗?如何?谁能推荐我 我的目的是基于 Kafka 连接器在轮询时构建的架构创建 KSQL 流或表. 解决方案 最好的方法是使用 Avro,其中架构单独存储,并由 Kafka Connect 和 KSQL 自动使用. 您 ..

confluent ksqlDB 无法创建流

我想为用户主题创建流 但是,ksqlDB 不能创建流. 明确指定 KSQL 流主题名称 解决方案 您应该创建一个主题名称为“user-value"的架构首先在架构注册表中,然后您可以创建流.有关如何将架构发布到架构注册表,请访问 https://docs.confluent.io/platform/current/schema-registry/develop/using.ht ..
发布时间:2021-11-12 03:15:16 其他开发

从 Postgres 表到带有更新跟踪的 KSQL 表

我的任务是将数据从 Postgres 表传输到 KSQL 表(以便将来与流连接).假设表有三条记录: id |姓名 |描述-------------------------1 |姓名1 |说明12 |姓名2 |说明23 |姓名3 |描述3 借助 Kafka JdbcSourceConnector 很容易做到.但是有一个小问题——表中的数据可能会发生变化.更改也必须在 KTable 中. ..
发布时间:2021-11-12 03:13:09 其他开发

KSQLDB Emit Final 不返回任何值

如果我使用“EMIT CHANGES",我会在 ksqldb 上进行以下查询;它确实有效,但如果将其更改为“EMIT FINAL"窗口结束后不返回任何值 CREATE TABLE sspc_3536660_v4 作为选择 sspc,>LATEST_BY_OFFSET(CASE WHEN metric ='sspc_ds_cir_fulfillment' THEN value ELSE NULL ..
发布时间:2021-11-12 03:11:49 其他开发

从 Docker (ksqlDB) 连接到主机上的 Kafka

我从这里找到的 docker-compor 运行 ksqldb-server https://ksqldb.io/quickstart.html#quickstart-content 我的 kafka 引导服务器在同一个 VM 上以标准单独模式运行.我可以使用控制台消费者查看一个主题中的消息: sudo kafka-avro-console-consumer --from-beginnin ..
发布时间:2021-11-12 03:11:10 其他开发

confluent ksqlDB 无法创建流

我想为用户主题创建流 但是,ksqlDB 不能创建流. 明确指定 KSQL 流主题名称 解决方案 您应该创建一个主题名称为“user-value"的架构首先在架构注册表中,然后您可以创建流.有关如何将架构发布到架构注册表,请访问 https://docs.confluent.io/platform/current/schema-registry/develop/using.ht ..
发布时间:2021-11-12 03:10:57 其他开发

KSQL - 将 XML 转换为 JSON/AVRO

我对将数据从 SQL Server 加载到 Kafka 感兴趣.表中的一列是 XML 一列.我们如何在 KSQL 流中将 XML 转换为 JSON 或 AVRO 共振峰? 解决方案 您可以使用 Kafka Connect 和 XML 单消息转换 (SMT). 例如: transforms=Keytransforms.Key.type=com.github.jcustenborder ..
发布时间:2021-11-12 03:04:54 其他开发

KSQL - 将 XML 转换为 JSON/AVRO

我对将数据从 SQL Server 加载到 Kafka 感兴趣.表中的一列是 XML 一列.我们如何在 KSQL 流中将 XML 转换为 JSON 或 AVRO 共振峰? 解决方案 您可以使用 Kafka Connect 和 XML 单消息转换 (SMT). 例如: transforms=Keytransforms.Key.type=com.github.jcustenborder ..
发布时间:2021-11-12 03:03:31 其他开发

KSQL - 删除主题

有没有办法从 KSQL 中删除主题?根据 github 有可能,我试过 DROP TOPIC my-topic删除主题“我的主题"删除主题'我的主题'删除主题`我的主题` 但是这两个命令都不起作用.我收到消息 消息-------------------------------------------------------------------------------io.conflue ..
发布时间:2021-11-12 03:01:49 其他开发

是否有 KSQL 语句来更新表中的值?

我在主题中有一个数据流,应该被视为 ksql 表(只有给定关键事项的最后一个值),这些数据是关于其他主题中某些数据特定字段的更新.KSQLDB 中是否有任何方法来处理更新其他流/表/主题中的值的流?目标主题具有比方说 20 个字段的实体,但我的包含更新的流具有 3 个字段的更新,因此我只想更新这 3 个字段,其他 17 个字段应在目标主题中保持不变(视为表格). 解决方案 您可以使用 JOIN ..
发布时间:2021-11-12 02:56:50 其他开发