是否可以从 kafka 消息中获取消息密钥的最新值 [英] Is it possible to get the latest value for a message key from kafka messages

查看:24
本文介绍了是否可以从 kafka 消息中获取消息密钥的最新值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我对同一个消息键有不同的值.

Suppose I have different values for a same message key.

例如:

{
userid: 1,
email: user123@xyz.com }

{
userid: 1,
email: user456@xyz.com }

{
userid: 1,
email: user789@xyz.com }

在上述情况下,我只想要用户更新的最新值,即user789@xyz.com".

In this above case I want only the latest value updated by the user, that is, 'user789@xyz.com'.

我的 kafka 流应该只给我第三个值,而不是前两个值.

My kafka stream should give me only the third value and not the previous 2 values.

推荐答案

由于您没有指定特定的客户端,我将向您展示如何使用 ksqlDB 和新添加的函数 LATEST_BY_OFFSET 来完成此操作.

Since you've not specified a particular client, I'll show you how this can be done with ksqlDB and the newly-added function, LATEST_BY_OFFSET.

首先,我用源数据填充主题:

First, I populate the topic with source data:

kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "user123@xyz.com" }
1:{ "userid": 1, "email": "user456@xyz.com" }
1:{ "userid": 1, "email": "user789@xyz.com" }
EOF

然后在 ksqlDB 中首先将其建模为事件流:

Then in ksqlDB model this as a stream of events first:

ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');

 Message
----------------
 Stream created
----------------

ksql> SET 'auto.offset.reset' = 'earliest';                                                                                                                                                                                                                                         [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY   |USERID   |EMAIL            |
+---------+---------+-----------------+
|1        |1        |user123@xyz.com  |
|1        |1        |user456@xyz.com  |
|1        |1        |user789@xyz.com  |

现在我们可以告诉 ksqlDB 获取这个事件流并直接给我们最新的值(基于偏移量):

Now we can tell ksqlDB to take this stream of events and give us just the latest value (based on the offset), either directly:

ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID              |KSQL_COL_1          |
+--------------------+--------------------+
|1                   |user789@xyz.com     |

Press CTRL-C to interrupt

或更有用的是,作为 ksqlDB 中的物化状态:

or more usefully, as materialised state within ksqlDB:

CREATE TABLE USER_LATEST_STATE AS 
    SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL 
      FROM USER_UPDATES 
     GROUP BY USERID 
     EMIT CHANGES;

这个表仍然由Kafka主题的变化驱动,但可以直接查询当前状态,无论是现在(拉查询"):

This table is still driven by changes to the Kafka topic, but can be queried directly for the current state, either as of now ("pull query"):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL               |
+--------------------+
|user789@xyz.com     |
Query terminated
ksql>

或者作为状态演变的变化流(推送查询"):

or as a stream of changes as the state evolves ("push query"):

ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL               |
+--------------------+
|user789@xyz.com     |

[ query continues indefinitely ]

这篇关于是否可以从 kafka 消息中获取消息密钥的最新值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆