KStream to KTable Inner Join 每次处理相同数据时都会产生不同数量的记录 [英] KStream to KTable Inner Join producing different number of records every time processed with same data

查看:36
本文介绍了KStream to KTable Inner Join 每次处理相同数据时都会产生不同数量的记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想做一个 KStream 到 KTable Join.使用 KTable 作为查找表.下面的步骤显示了代码执行的顺序

I want to do a KStream to KTable Join. using KTable as just a lookup table. below steps shows the sequence in which code is executed

  1. 构建 KTable

  1. Construct KTable

ReKey KTable

ReKey KTable

构建 KStream

Construct KStream

ReKey KStream

ReKey KStream

加入KStream - KTable

Join KStream - KTable

假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,假设 KStreams 中的每个键在 KTable 中有一条记录.所以预期的输出是 8000 条记录.

Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.

每次我第一次加入或启动应用程序时.预期输出是 8000 条记录,但有时我只看到 6200 条记录,有时会看到 8000 条完整的记录集(两次),有时没有记录,等等.

Every time i do a join for first time or when i start the application. Expected output is 8000 records but i see only 6200 records sometimes, sometimes 8000 complete set of records(twice), some times no record, etc.

  • 问题 1:为什么每次运行应用程序时记录都不一致?

  • Question 1: why is there inconsistency in records every single time i run the application?

在 KTable 被构造(构造 + 重新加密)之前,KStreams 被构造并且数据可用于从 KStream 端加入,然后加入从 KTable 开始,因此在 KTable 被构造之前不会在最终加入中看到数据.构建完 KTable 之后,我们可以看到剩余记录的连接发生.

Before the KTable gets constructed(construct + Rekey), KStreams gets Constructed and data is available for join from KStream side then join starts with out KTable so there wont be data seen in final join until KTable is constructed. after once KTable is constructed then we can see join happening for the remaining records.

问题 2:如何解决记录中的不一致联接?​​

Question 2: How to resolve the inconsistency join in records?

我尝试使用嵌入式 Kafka 进行 KStream 和 Ktable 连接的测试用例.有来自 KStreams 的 10 条记录和来自 KTable 的 3 条记录被使用过程.当我第一次运行测试用例时没有加入,加入后我没有看到任何数据.当第二次运行时,它运行完美.如果我清除状态存储然后回到零.

I tried with Test case using Embedded Kafka for KStream and Ktable join. There was 10 records from KStreams and 3 records from KTable which were used process. when i ran the test case for the first time There was no Join and i didn’t see any data after join. When ran the same second time it ran perfectly. If i clear the state store then back to zero.

问题 3:为什么会发生这种行为?

Question 3: Why is this behaviour happening?

我尝试使用 KSQL,连接运行良好,我得到了 8000 条记录,然后我进入了 KSQL 源代码,我注意到 KSQL 也在执行相同的连接功能.

I tried with KSQL and the join was working perfectly and i got 8000 records, then i went into KSQL source code, i noticed KSQL is also doing same Join function.

问题 4:KSQL 如何解决问题?

Question 4: How is KSQL resolving the issue?

我看到了几个示例建议答案

I saw few example suggested answers

  • Use GlobalKTable which didn’t work. i got same inconsistent join.
  • use Custom joiner https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java which didn’t work

我使用 Spring Cloud 流作为依赖项.

I am using spring cloud streams as dependency.

我还看到在 JIRA 的某个地方有一个关于此的未决问题.

Also i saw there was a open issue regarding this somewhere on JIRA.

推荐答案

下面的步骤展示了代码的执行顺序

below steps shows the sequence in which code is executed

请注意,构建拓扑只是提供数据流程序的逻辑描述,并没有不同算子的执行顺序".程序将被翻译,所有操作符将同时执行.因此,所有主题的数据将被并行读取.

Note that building a topology is just providing a logical description of data flow program and there is no "order of execution" of different operator. The program will be translated and all operator will be execute at the same time. Hence, data from all topics will be read in parallel.

这种并行处理是您观察的根本原因,即在处理开始之前没有首先加载表(至少默认情况下没有保证),因此流端数据可能是即使表未完全加载,也会处理.

This parallel processing is the root cause of your observation, i.e., the table is not loaded first before processing begins (at least there is no guarantee by default) and thus streams-side data may be processed even if the table is not fully loaded.

不同主题之间的处理顺序取决于记录时间戳:首先处理时间戳较小的记录.因此,如果要确保首先处理 KTable 数据,则必须确保记录时间戳小于流端记录时间戳.这可以通过在输入主题中生成输入数据或使用自定义时间戳提取器来确保.

The order of processing between different topic depend on the record timestamps: records with smaller timestamps are processed first. Hence, if you want to ensure that the KTable data is processed first, you must ensure that the record timestamps are smaller than the stream-side record timestamps. This can either be ensured when you produce the input data into the input topic, or by using a custom timestamp extractor.

第二,从主题中获取数据是不确定的,因此,如果只返回流端的数据(而不是表端数据),则无法进行时间戳比较,因此流端数据将在表侧数据.为了解决这个问题,您可以增加配置参数max.task.idle.ms(默认为0ms).如果你增加这个配置(我相信这也是 KSQL 在默认情况下所做的)如果一个输入没有数据,任务将阻塞并尝试为空输入获取数据(只有在空闲时间过去后,处理才会继续即使一侧是空的).

Second, fetching data from topics is non-deterministic and thus, if data for only the stream side is returned (but not table side data) a timestamp comparison cannot be done and thus the stream-side data would be processed before the table side data. To tackle this issue, you can increase configuration parameter max.task.idle.ms (default is 0ms). If you increase this config (and I believe that is what KSQL also does by default) if there is no data for one input the task will block and try to fetch data for the empty input (only after the idle time passed, processing would continue even if one side is empty).

对于 GlobalKTable,行为是不同的.该表将在任何处理开始之前在启动时加载.因此,我不确定为什么这对您不起作用.

For a GlobalKTable the behavior is different. This table would be loaded on startup before any processing starts. Hence, I am not sure why this did not work for you.

这篇关于KStream to KTable Inner Join 每次处理相同数据时都会产生不同数量的记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆