是否有 KSQL 语句来更新表中的值? [英] Is there a KSQL statement to update values in table?

查看:15
本文介绍了是否有 KSQL 语句来更新表中的值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在主题中有一个数据流,应该被视为 ksql 表(只有给定关键事项的最后一个值),这些数据是关于其他主题中某些数据特定字段的更新.KSQLDB 中是否有任何方法来处理更新其他流/表/主题中的值的流?目标主题具有比方说 20 个字段的实体,但我的包含更新的流具有 3 个字段的更新,因此我只想更新这 3 个字段,其他 17 个字段应在目标主题中保持不变(视为表格).

I have a stream of data in topic that should be treated as ksql table (only last value of given key matters) and this data is about updates of some data's specific fields in other topic. Is there any way in KSQLDB to process stream that update values in other stream/table/topic? Target topic has entities with let's say 20 fields, but my stream that contains update has update of 3 fields, so I want to update only these 3 fields and other 17 fields should remain the same in target topic (treated as table).

推荐答案

您可以使用 JOIN STATEMENT 稍作调整来解决您的问题,按照示例,将创建一个包含 5 个字段的表,但只需要更新另一个表中的领域技能和级别.

You can solve your problem using a JOIN STATEMENT with a little adjustment, follow the sample, will create a table with 5 fields, but only will be necessary to update the fields skill and level from another table.

1.从源主题创建表:

CREATE TABLE TBL_EMPLOYEE( `employee_id` VARCHAR, `name` varchar, `lastName` varchar, `age` INT, `skill` VARCHAR, `level` VARCHAR ) WITH ( KAFKA_TOPIC = 'employee-topic-input', PARTITIONS = 3, VALUE_FORMAT = 'JSON', KEY = '`employee_id`');

2.创建表来处理所需的更新(它可以是流或表,来自另一个查询)

2.Create the table to handle the desired updates ( It can be stream or table, resulting from another query)

CREATE TABLE TBL_EMPLOYEE_DESIRED_UPDATES (`employee_id` VARCHAR, `skill` VARCHAR, `level` VARCHAR) WITH( KAFKA_TOPIC = 'employee-desired-updates-topic', PARTITIONS = 3, VALUE_FORMAT ='JSON', KEY = '`employee_id`');

3.创建最终表以更新必填字段,左连接允许第一个表上的所有元素.如果第二个表没有任何更新,则技能和级别字段将相同.

3.Create the final table to update the required fields, the left join allows all the elements on the first table. if there is not any update on the second table, the skill and level fields will be the same.

SET 'auto.offset.reset' = 'earliest';
CREATE TABLE TBL_EMPLOYEE_FINAL AS 
    SELECT 
        EMP.`employee_id` AS `employee_id`, 
        EMP.`name` AS `name`, 
        EMP.`lastName` AS `lastName`, 
        IFNULL(UPD.`skill`, EMP.`skill`) as `skill`, 
        IFNULL(UPD.`level`, EMP.`level`) as `level` 
    FROM TBL_EMPLOYEE AS EMP 
    LEFT JOIN TBL_EMPLOYEE_DESIRED_UPDATES  UPD ON EMP.ROWKEY = UPD.ROWKEY EMIT CHANGES;

示例:

INSERT INTO TBL_EMPLOYEE (`employee_id`, `name`, `lastName`, `age`, `skill`, `level`) VALUES ('117', 'John', 'Constantine', 30, 'java', 'jr');
INSERT INTO TBL_EMPLOYEE (`employee_id`, `name`, `lastName`, `age`, `skill`, `level`) VALUES ('118', 'Anthony', 'Stark', 40, 'AWS', 'architect');
INSERT INTO TBL_EMPLOYEE (`employee_id`, `name`, `lastName`, `age`, `skill`, `level`) VALUES ('119', 'Clark', 'Kent', 35, 'python', 'senior');
ksql> SELECT * FROM TBL_EMPLOYEE_FINAL EMIT CHANGES;
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|ROWTIME                        |ROWKEY                         |employee_id                    |name                           |lastName                       |skill                          |level                          |
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|1611440363833                  |119                            |119                            |Clark                          |Kent                           |python                         |senior                         |
|1611440361284                  |117                            |117                            |John                           |Constantine                    |java                           |jr                             |
|1611440361408                  |118                            |118                            |Anthony                        |Stark                          |AWS                            |architect                      |

第二步是发送更新

INSERT INTO TBL_EMPLOYEE_DESIRED_UPDATES  (`employee_id`, `skill`, `level` ) VALUES ('118', 'mongo', 'senior');

结果

ksql> SELECT  * from TBL_EMPLOYEE_FINAL EMIT CHANGES;
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|ROWTIME                        |ROWKEY                         |employee_id                    |name                           |lastName                       |skill                          |level                          |
+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|1611440363833                  |119                            |119                            |Clark                          |Kent                           |python                         |senior                         |
|1611440361284                  |117                            |117                            |John                           |Constantine                    |java                           |jr                             |
|1611440361408                  |118                            |118                            |Anthony                        |Stark                          |AWS                            |architect                      |
|1611440585726                  |118                            |118                            |Anthony                        |Stark                          |mongo                          |senior                         |

您必须将表中的最新元素视为经过两次修改的新元素.另一个是表的更改日志的一部分.记录是不可变的.

You have to consider the latest element in the table as the new one with the two modifications. The other one is part of the changelog of the table. The records are immutables.

这篇关于是否有 KSQL 语句来更新表中的值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆