KSQL:将多个子记录追加到父记录 [英] KSQL: append multiple child records to parent record

查看:130
本文介绍了KSQL:将多个子记录追加到父记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用KSQL(作为confluent-5.0.0的一部分)从一组父记录和子记录中创建一条记录,其中每个父记录都有多个子记录(通常是付款明细和付款涉及的各方)。这些父母/子女记录通过父母的ID链接。为了说明这一点,我正在处理源系统中大致这种结构的记录:

I'm trying to use KSQL (as part of confluent-5.0.0) to create a single record out of a set of parent records and child records, where every parent record has multiple child records (spefically, payment details and the parties involved in the payment). These parent/child records are linked by the parent's id. To illustrate, I'm dealing with records of roughly this structure in the source system:

payment:
| id    | currency | amount | payment_date |
|------------------------------------------|
| pmt01 | USD      | 20000  | 2018-11-20   |
| pmt02 | USD      | 13000  | 2018-11-23   |

payment_parties:
| id    | payment_id | party_type   | party_ident | party_account |
|-----------------------------------------------------------------|
| prt01 | pmt01      | sender       | XXYYZZ23    | (null)        |
| prt02 | pmt01      | intermediary | AADDEE98    | 123456789     |
| prt03 | pmt01      | receiver     | FFGGHH56    | 987654321     |
| prt04 | pmt02      | sender       | XXYYZZ23    | (null)        |
| prt05 | pmt02      | intermediary | (null)      | (null)        |
| prt06 | pmt02      | receiver     | FFGGHH56    | 987654321     |

使用Oracle Golden Gate将这些记录以AVRO格式加载到一组Kafka主题上,每张桌子一个主题。这意味着存在以下主题: src_payment src_payment_parties 。按照源系统运行的方式,这些记录的时间戳落在几毫秒内。

These records are loaded, in AVRO format, onto a set of Kafka topics using Oracle Golden Gate, with one topic for every table. This means the following topics exist: src_payment and src_payment_parties. As per the way the source system functions, the timestamps of these records fall within several milliseconds.

现在,目的是将这些记录扁平化为单个记录,它将从传出主题中使用。例如,对于上面的记录,所需的输出将遵循以下内容:

Now, the intent is to 'flatten' these records into a single record, which will be consumed from an outgoing topic. To illustrate, for the records above, the desired output would be along these lines:

payment_flattened:
| id    | currency | amount | payment_date | sender_ident | sender_account | intermediary_ident | intermediary_account | receiver_ident | receiver_account |
|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| pmt01 | USD      | 20000  | 2018-11-20   | XXYYZZ23     | (null)         | AADDEE98           | 123456789            | FFGGHH56       | 987654321        |
| pmt02 | USD      | 13000  | 2018-11-23   | XXYYZZ23     | (null)         | (null)             | (null)               | FFGGHH56       | 987654321        |

我想在这里问的第一个问题如下:我如何最佳实现源主题中的数据组合?

The first question I'd like to ask here, is the following: How can I best achieve this combination of data from the source topics?

当然,我自己也尝试过一些操作。为了简洁起见,我将描述我为实现将第一个付款方追加到付款记录而尝试实现的目标。

Of course, I have tried some actions myself. In the interest of brevity, I'll describe what I have tried to achieve appending the first of the payment parties to the payment records.

第一步:设置源流

注意:由于OGG设置在AVRO模式中添加了一个名为 table的属性,因此我必须指定要从该主题获取的字段。另外,我对指定操作类型(例如插入或更新)的字段不感兴趣。

Step one: set up the source streams
Note: due to the OGG setup adding a property called 'table' to the AVRO schema, I have to specify the fields to take from the topic. Additionally, I'm not interested in the fields specifying the type of operation (e.g. insert or update).

create stream payment_stream (id varchar, currency varchar, amount double, \
payment_date varchar) with (kafka_topic='src_payment',value_format='avro');

create stream payment_parties_stream (id varchar, payment_id varchar, party_type varchar, \
party_ident varchar, party_account varchar) with (kafka_topic='src_payment_parties',\
value_format='avro');

第二步:为付款发件人创建流

注意:根据我从文档中收集的信息以及从实验中发现的内容,为了能够将付款流加入到付款方流中,后者需要按付款ID进行划分。另外,我获得加入工作的唯一方法是重命名该列。

Step two: create stream for the payment senders
Note: from what I've gathered from the documentation, and found out from experimenting, in order to be able to join the payment stream to a payment party stream, the latter needs to be partitioned by the payment id. Additionally, the only way I have gotten the join to work is by renaming the column.

create stream payment_sender_stream as select payment_id as id, party_ident, \
party_account from payment_parties_stream where party_type = 'sender' partition by id;

第三步:加入两个流

注意:我使用的是左联接,因为并非所有付款方都在场。如上面的示例记录所示,其中 pmt02 没有中介。

create stream payment_with_sender as select pmt.id as id, pmt.currency, pmt.amount, \
pmt.payment_date, snd.party_ident, snd.party_account from payment_stream pmt left join \
payment_sender_stream snd within 1 seconds on pmt.id = snd.id;

现在,我期望从此流中获得的输出符合以下内容:

Now, the output I would expect from this stream is something along these lines:

ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null

相反,我看到的输出是这样的:

Instead, the output I'm seeing is along these lines:

ksql> select * from payment_with_sender;
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | null | null
rowtime | pmt01 | pmt01 | USD | 20000 | 2018-11-20 | XXYYZZ23 | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | null | null
rowtime | pmt02 | pmt02 | USD | 13000 | 2018-11-23 | XXYYZZ23 | null

因此,我想问的第二个问题(分为两部分)是:为什么左联接会产生这些重复的记录?可以避免这种情况吗?

Hence, the second (two-part) question I'd like to ask is: Why does the left join produce these duplicate records? And can this be avoided?

对文字墙的道歉,我试图在问题描述中尽量做到完整。当然,我很乐意添加任何可能的缺失信息,并据我所知回答有关设置的问题。

Apologies for the wall of text, I tried to be as complete as possible in the description of the issue. Of course, I'd be happy to add any possible missing information, and answer questions regarding the setup to the best of my knowledge.

推荐答案

您快到了:-)

其中1秒将为您提供连接的两面。

相反,请尝试 WITHIN(0秒,1秒)。然后,仅连接右边的记录将被添加到左边,反之亦然。

Instead, try WITHIN (0 SECONDS, 1 SECONDS). Then only records from the right side of the join will be joined to the left, and not visa versa.

您可以在文章我在这里写过

BTW解决OGG 保留字的问题,您可以设置 includeTableName 到GG配置中的 false

BTW if you want to work around the table reserved word issue from OGG, you can set includeTableName to false in the GG config.

这篇关于KSQL:将多个子记录追加到父记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆