从Postgres到Kafka,并进行更改跟踪 [英] From Postgres to Kafka with changes tracking
问题描述
此问题遵循此问题.
主要任务是在KSQL端进行联接.下面的示例将对其进行说明.事件消息到达Kafka主题.该消息的结构:
The main task is to make joins on KSQL side. Example below will illustrate it. Incidents messages arrive In Kafka topic. The structure of that messages:
[
{
"name": "from_ts",
"type": "bigint"
},
{
"name": "to_ts",
"type": "bigint"
},
{
"name": "rulenode_id",
"type": "int"
}
]
并且有一个Postgres表rulenode
:
And there is a Postgres table rulenode
:
id | name | description
两个来源的数据都必须由字段rulenode_id = rulenode.id
合并,以便获得具有字段from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description
的单个记录.
Data from both sources need to be joined by fields rulenode_id = rulenode.id
so as to get single record with fields from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description
.
我想通过KSQL来做到这一点,但不希望像现在这样后端.
I want to do this by means of KSQL but not backend as it is now.
现在,通过JdbcSourceConnector将Postgres表中的数据传输到Kafka.但是有一个小问题-您可能会猜测Postgres表中的数据可能已更改.当然,这些更改也应该在KSQL流OR表中.
Right now data from Postgres table transferred to Kafka by JdbcSourceConnector. But there is one little problem - as you could guess data in Postgres table may be changed. And of course these changes should be in KSQL stream OR table too.
下面我被问到为什么使用KTable而不是Kstream.好吧,请访问此页面并查看第一个GIF.当新数据到达时,表的记录将被更新.我认为这种行为是我所需要的(在这里,我没有Postgres表rulenode
的主键id
而不是爱丽丝,鲍勃).这就是为什么我选择KTable.
Below I've been asked why KTable and not Kstream. Well, please, visit this page and look at the first GIF. There records of table are being updated when new data arrive. I thought such behaviour is what I need (where instead of names Alice, Bob I have primary key id
of Postgres table rulenode
). That's why I chose KTable.
JdbcSourceConnect的批量模式复制所有表.如您所知,所有行都到达Kafka表中先前的Postgres表快照.
Bulk mode of JdbcSourceConnect copies all of the table. And as you know all rows arrive into Kafka table to previous Postgres table snapshots.
根据建议,我使用配置创建了一个连接器:
As suggested I created a connector with configs:
{
"name": "from-pg",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"errors.log.enable": "true",
"connection.url": "connection.url",
"connection.user": "postgres",
"connection.password": "*************",
"table.whitelist": "rulenode",
"mode": "bulk",
"poll.interval.ms": "5000",
"topic.prefix": "pg."
}
然后创建了一个流:
create stream rulenodes
with (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');
现在尝试创建表:
create table rulenodes_unique
as select * from rulenodes;
但是那没有错误:
无效的结果类型.您的SELECT查询产生一个STREAM.请用 改为创建STREAM AS SELECT语句.
Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.
我读到表用于存储汇总信息.例如,使用COUNT函数存储聚合:
I read that tables are used when to store aggregated info. For example to store aggregated with COUNT function:
create table rulenodes_unique
as select id, count(*) from rulenodes order by id;
您能说如何处理该错误吗?
Can you say please how to handle that error?
推荐答案
您可以使用ksqlDB在Kafka主题的顶部创建STREAM
或TABLE
-这与您要对数据建模的方式有关.根据您的问题,很明显您需要将其建模为表(因为您想加入最新版本的键).因此,您需要执行以下操作:
You can create a STREAM
or a TABLE
on top of a Kafka topic with ksqlDB - it's to do with how you want to model the data. From your question it is clear that you need to model it as a table (because you want to join to the latest version of a key). So you need to do this:
create table rulenodes
with (kafka_topic='pg.rules_rulenode', value_format='avro');
现在,您还要做<em> 一件事,这是确保正确键入主题中的数据.您不能指定key='id'
并且它会自动发生-key
参数只是一个提示".您必须确保Kafka主题中的消息在键中具有id
字段.请参见参考文档以获得完整详细信息.
Now there is one more thing you have to do, which is ensure that the data in your topic is correctly keyed. You cannot specify key='id'
and it automagically happen - the key
parameter is just a 'hint'. You must make sure that the messages in the Kafka topic have the id
field in the key. See ref doc for full details.