从 Postgres 到 Kafka 的变化跟踪 [英] From Postgres to Kafka with changes tracking

查看:25
本文介绍了从 Postgres 到 Kafka 的变化跟踪的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这个问题跟在这个问题之后.

主要任务是在 KSQL 端进行连接.下面的例子将说明它.事件消息到达 Kafka 主题.该消息的结构:

<预><代码>[{"name": "from_ts",类型":bigint"},{"name": "to_ts",类型":bigint"},{"name": "rulenode_id",类型":整数"}]

还有一个 Postgres 表 rulenode:

id |姓名 |描述

来自两个来源的数据需要通过字段rulenode_id = rulenode.id 连接,以便获得包含字段from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description 的单个记录.

我想通过 KSQL 来做到这一点,而不是像现在这样的后端.

现在来自 Postgres 表的数据由 JdbcSourceConnector 传输到 Kafka.但是有一个小问题——你可以猜到 Postgres 表中的数据可能会改变.当然,这些更改也应该在 KSQL 流或表中.

下面有人问我为什么是 KTable 而不是 Kstream.好吧,请访问此页面 并查看第一个 GIF.当新数据到达时,表的记录正在更新.我认为这种行为是我所需要的(我有 Postgres 表 rulenode 的主键 id,而不是名字 Alice,Bob).这就是我选择 KTable 的原因.

JdbcSourceConnect 的批量模式复制所有表.并且如您所知,所有行都到达 Kafka 表到以前的 Postgres 表快照.

<小时>

按照建议,我用配置创建了一个连接器:

<代码>{"name": "from-pg","connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","errors.log.enable": "true","connection.url": "connection.url","connection.user": "postgres","connection.password": "*************","table.whitelist": "rulenode",模式":批量","poll.interval.ms": "5000",topic.prefix":pg."}

然后创建一个流:

创建流规则节点与 (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');

现在尝试创建一个表:

创建表 rulenodes_unique作为 select * from rulenodes;

但这并没有出现错误:

<块引用>

无效的结果类型.您的 SELECT 查询生成一个 STREAM.请用改为 CREATE STREAM AS SELECT 语句.

我读到表用于存储聚合信息.例如使用 COUNT 函数存储聚合:

创建表 rulenodes_unique作为 select id, count(*) from rulenodes order by id;

你能说一下如何处理那个错误吗?

解决方案

您可以使用 ksqlDB 在 Kafka 主题之上创建 STREAMTABLE - 这是为了与您希望如何对数据建模.从您的问题中很明显您需要将其建模为表格(因为您想加入最新版本的密钥).所以你需要这样做:

创建表格规则节点与 (kafka_topic='pg.rules_rulenode', value_format='avro');

现在您还有一件事必须要做,那就是确保您的主题中的数据正确键入.您不能指定 key='id' 并且它会自动发生 - key 参数只是一个提示".您必须确保 Kafka 主题中的消息在 key 中具有 id 字段.请参阅参考文档 了解完整详情.

您可以使用 Kafka Connect 中的单消息转换:

"transforms":"createKey,extractInt","transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey","transforms.createKey.fields":"id","transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractInt.field":"id"

或者您可以在 ksqlDB 中执行此操作并更改键 - 因为我们要处理每个 事件,所以我们首先将其建模为 (!) 并声明重新键入主题的表格:

创建流规则nodes_source与 (kafka_topic='pg.rules_rulenode', value_format='avro');CREATE STREAM RULENODES_REKEY AS SELECT * FROM rulenodes_source PARITION BY id;CREATE TABLE rulenodes WITH (kafka_topic='RULENODES_REKEY', value_format='avro');

我会选择单消息转换路线,因为它总体上更整洁、更简单.

This question follows this one.

The main task is to make joins on KSQL side. Example below will illustrate it. Incidents messages arrive In Kafka topic. The structure of that messages:

[
    {
        "name": "from_ts", 
        "type": "bigint"
    },
    {
        "name": "to_ts", 
        "type": "bigint"
    },
    {
        "name": "rulenode_id",
        "type": "int"
    }
]

And there is a Postgres table rulenode:

id | name | description 

Data from both sources need to be joined by fields rulenode_id = rulenode.id so as to get single record with fields from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description.

I want to do this by means of KSQL but not backend as it is now.

Right now data from Postgres table transferred to Kafka by JdbcSourceConnector. But there is one little problem - as you could guess data in Postgres table may be changed. And of course these changes should be in KSQL stream OR table too.

Below I've been asked why KTable and not Kstream. Well, please, visit this page and look at the first GIF. There records of table are being updated when new data arrive. I thought such behaviour is what I need (where instead of names Alice, Bob I have primary key id of Postgres table rulenode). That's why I chose KTable.

Bulk mode of JdbcSourceConnect copies all of the table. And as you know all rows arrive into Kafka table to previous Postgres table snapshots.


As suggested I created a connector with configs:

{
  "name": "from-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "errors.log.enable": "true",
  "connection.url": "connection.url",
  "connection.user": "postgres",
  "connection.password": "*************",
  "table.whitelist": "rulenode",
  "mode": "bulk",
  "poll.interval.ms": "5000",
  "topic.prefix": "pg."
}

Then created a stream:

create stream rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');

and now trying to create a table:

create table rulenodes_unique 
    as select * from rulenodes;

but that didn't work with error:

Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.

I read that tables are used when to store aggregated info. For example to store aggregated with COUNT function:

create table rulenodes_unique 
    as select id, count(*) from rulenodes order by id;

Can you say please how to handle that error?

解决方案

You can create a STREAM or a TABLE on top of a Kafka topic with ksqlDB - it's to do with how you want to model the data. From your question it is clear that you need to model it as a table (because you want to join to the latest version of a key). So you need to do this:

create table rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro');

Now there is one more thing you have to do, which is ensure that the data in your topic is correctly keyed. You cannot specify key='id' and it automagically happen - the key parameter is just a 'hint'. You must make sure that the messages in the Kafka topic have the id field in the key. See ref doc for full details.

You can do this with a Single Message Transform in Kafka Connect:

"transforms":"createKey,extractInt",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"id"

Or you can do it in ksqlDB and change the key - and because we want to process every event we first model it as a stream (!) and the declare the table over the re-keyed topic:

create stream rulenodes_source 
    with (kafka_topic='pg.rules_rulenode', value_format='avro');

CREATE STREAM RULENODES_REKEY AS SELECT * FROM rulenodes_source PARITION BY id;

CREATE TABLE rulenodes WITH (kafka_topic='RULENODES_REKEY', value_format='avro');

I would go the Single Message Transform route because it is neater and simpler overall.

这篇关于从 Postgres 到 Kafka 的变化跟踪的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆