从Postgres到Kafka,并进行更改跟踪 [英] From Postgres to Kafka with changes tracking

查看:120
本文介绍了从Postgres到Kafka,并进行更改跟踪的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

此问题遵循此问题.

主要任务是在KSQL端进行联接.下面的示例将对其进行说明.事件消息到达Kafka主题.该消息的结构:

The main task is to make joins on KSQL side. Example below will illustrate it. Incidents messages arrive In Kafka topic. The structure of that messages:

[
    {
        "name": "from_ts", 
        "type": "bigint"
    },
    {
        "name": "to_ts", 
        "type": "bigint"
    },
    {
        "name": "rulenode_id",
        "type": "int"
    }
]

并且有一个Postgres表rulenode:

And there is a Postgres table rulenode:

id | name | description 

两个来源的数据都必须由字段rulenode_id = rulenode.id合并,以便获得具有字段from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description的单个记录.

Data from both sources need to be joined by fields rulenode_id = rulenode.id so as to get single record with fields from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description.

我想通过KSQL来做到这一点,但不希望像现在这样后端.

I want to do this by means of KSQL but not backend as it is now.

现在,通过JdbcSourceConnector将Postgres表中的数据传输到Kafka.但是有一个小问题-您可能会猜测Postgres表中的数据可能已更改.当然,这些更改也应该在KSQL流OR表中.

Right now data from Postgres table transferred to Kafka by JdbcSourceConnector. But there is one little problem - as you could guess data in Postgres table may be changed. And of course these changes should be in KSQL stream OR table too.

下面我被问到为什么使用KTable而不是Kstream.好吧,请访问此页面并查看第一个GIF.当新数据到达时,表的记录将被更新.我认为这种行为是我所需要的(在这里,我没有Postgres表rulenode的主键id而不是爱丽丝,鲍勃).这就是为什么我选择KTable.

Below I've been asked why KTable and not Kstream. Well, please, visit this page and look at the first GIF. There records of table are being updated when new data arrive. I thought such behaviour is what I need (where instead of names Alice, Bob I have primary key id of Postgres table rulenode). That's why I chose KTable.

JdbcSourceConnect的批量模式复制所有表.如您所知,所有行都到达Kafka表中先前的Postgres表快照.

Bulk mode of JdbcSourceConnect copies all of the table. And as you know all rows arrive into Kafka table to previous Postgres table snapshots.

根据建议,我使用配置创建了一个连接器:

As suggested I created a connector with configs:

{
  "name": "from-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "errors.log.enable": "true",
  "connection.url": "connection.url",
  "connection.user": "postgres",
  "connection.password": "*************",
  "table.whitelist": "rulenode",
  "mode": "bulk",
  "poll.interval.ms": "5000",
  "topic.prefix": "pg."
}

然后创建了一个流:

create stream rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');

现在尝试创建表:

create table rulenodes_unique 
    as select * from rulenodes;

但是那没有错误:

无效的结果类型.您的SELECT查询产生一个STREAM.请用 改为创建STREAM AS SELECT语句.

Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.

我读到表用于存储汇总信息.例如,使用COUNT函数存储聚合:

I read that tables are used when to store aggregated info. For example to store aggregated with COUNT function:

create table rulenodes_unique 
    as select id, count(*) from rulenodes order by id;

您能说如何处理该错误吗?

Can you say please how to handle that error?

推荐答案

您可以使用ksqlDB在Kafka主题的顶部创建STREAMTABLE-这与您要对数据建模的方式有关.根据您的问题,很明显您需要将其建模为表(因为您想加入最新版本的键).因此,您需要执行以下操作:

You can create a STREAM or a TABLE on top of a Kafka topic with ksqlDB - it's to do with how you want to model the data. From your question it is clear that you need to model it as a table (because you want to join to the latest version of a key). So you need to do this:

create table rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro');

现在,您还要做<​​em> 一件事,这是确保正确键入主题中的数据.您不能指定key='id'并且它会自动发生-key参数只是一个提示".您必须确保Kafka主题中的消息在中具有id字段.请参见参考文档以获得完整详细信息.

Now there is one more thing you have to do, which is ensure that the data in your topic is correctly keyed. You cannot specify key='id' and it automagically happen - the key parameter is just a 'hint'. You must make sure that the messages in the Kafka topic have the id field in the key. See ref doc for full details.

您可以使用

或者您可以在ksqlDB中进行操作并更改密钥-由于我们要处理每个事件,因此我们首先将其建模为 stream (!),然后进行声明重新输入主题的表格:

Or you can do it in ksqlDB and change the key - and because we want to process every event we first model it as a stream (!) and the declare the table over the re-keyed topic:

create stream rulenodes_source 
    with (kafka_topic='pg.rules_rulenode', value_format='avro');

CREATE STREAM RULENODES_REKEY AS SELECT * FROM rulenodes_source PARITION BY id;

CREATE TABLE rulenodes WITH (kafka_topic='RULENODES_REKEY', value_format='avro');

我会选择单消息转换"路线,因为它更整洁,更简单.

I would go the Single Message Transform route because it is neater and simpler overall.

这篇关于从Postgres到Kafka,并进行更改跟踪的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆