是否可以从kafka消息中获取消息密钥的最新值 [英] Is it possible to get the latest value for a message key from kafka messages
问题描述
假设同一消息密钥的值不同.
Suppose I have different values for a same message key.
例如:
{
userid: 1,
email: user123@xyz.com }
{
userid: 1,
email: user456@xyz.com }
{
userid: 1,
email: user789@xyz.com }
在上述情况下,我只希望用户更新最新值,即"user789@xyz.com".
In this above case I want only the latest value updated by the user, that is, 'user789@xyz.com'.
我的kafka流应该只给我第三个值,而不是前两个值.
My kafka stream should give me only the third value and not the previous 2 values.
推荐答案
由于您尚未指定特定的客户端,我将向您展示如何使用ksqlDB和新添加的功能 LATEST_BY_OFFSET
.
Since you've not specified a particular client, I'll show you how this can be done with ksqlDB and the newly-added function, LATEST_BY_OFFSET
.
首先,我用源数据填充主题:
First, I populate the topic with source data:
kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "user123@xyz.com" }
1:{ "userid": 1, "email": "user456@xyz.com" }
1:{ "userid": 1, "email": "user789@xyz.com" }
EOF
然后在ksqlDB中将其首先建模为事件流:
Then in ksqlDB model this as a stream of events first:
ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> SET 'auto.offset.reset' = 'earliest'; [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY |USERID |EMAIL |
+---------+---------+-----------------+
|1 |1 |user123@xyz.com |
|1 |1 |user456@xyz.com |
|1 |1 |user789@xyz.com |
现在,我们可以告诉ksqlDB采取这些事件流,并直接给我们提供最新值(基于偏移量):
Now we can tell ksqlDB to take this stream of events and give us just the latest value (based on the offset), either directly:
ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID |KSQL_COL_1 |
+--------------------+--------------------+
|1 |user789@xyz.com |
Press CTRL-C to interrupt
或更有用,作为ksqlDB中的物化状态:
or more usefully, as materialised state within ksqlDB:
CREATE TABLE USER_LATEST_STATE AS
SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL
FROM USER_UPDATES
GROUP BY USERID
EMIT CHANGES;
此表仍然受Kafka主题的更改驱动,但可以直接查询当前的状态,或者从现在开始(拉式查询"):
This table is still driven by changes to the Kafka topic, but can be queried directly for the current state, either as of now ("pull query"):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL |
+--------------------+
|user789@xyz.com |
Query terminated
ksql>
或随着状态的演变而变化(推送查询"):
or as a stream of changes as the state evolves ("push query"):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL |
+--------------------+
|user789@xyz.com |
[ query continues indefinitely ]
这篇关于是否可以从kafka消息中获取消息密钥的最新值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!