KStream到KTable的内部联接每次使用相同的数据处理时都会产生不同数量的记录 [英] KStream to KTable Inner Join producing different number of records every time processed with same data

查看:277
本文介绍了KStream到KTable的内部联接每次使用相同的数据处理时都会产生不同数量的记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想对KTable Join执行KStream.使用KTable作为查找表. 以下步骤显示了执行代码的顺序

I want to do a KStream to KTable Join. using KTable as just a lookup table. below steps shows the sequence in which code is executed

  1. 构造KTable

  1. Construct KTable

ReKey KTable

ReKey KTable

构造KStream

ReKey KStream

ReKey KStream

加入KStream-KTable

Join KStream - KTable

让我们说KStream中有8000条记录,KTable中有14条记录,并假设KStreams中的每个键在KTable中都有一条记录.因此,预期输出将是8000条记录.

Lets say there are 8000 records in KStream, 14 records in KTable and Assuming that for each key in KStreams there is a record in KTable. So the expected output would be 8000 records.

每次我第一次或在我启动应用程序时加入. 预期的输出为8000条记录,但有时我只会看到6200条记录,有时会看到8000条完整记录集(两次),有时没有记录,等等.

Every time i do a join for first time or when i start the application. Expected output is 8000 records but i see only 6200 records sometimes, sometimes 8000 complete set of records(twice), some times no record, etc.

  • 问题1:为什么每次运行应用程序时记录中都存在不一致?

  • Question 1: why is there inconsistency in records every single time i run the application?

在构造KTable(构造+密钥)之前,先构造KStreams,并且可以从KStream端进行数据联接,然后从KTable开始联接,因此在构造KTable之前,在最终联接中不会看到数据.构造完KTable之后,我们可以看到剩余记录正在发生联接.

Before the KTable gets constructed(construct + Rekey), KStreams gets Constructed and data is available for join from KStream side then join starts with out KTable so there wont be data seen in final join until KTable is constructed. after once KTable is constructed then we can see join happening for the remaining records.

问题2:如何解决记录中的不一致联接?​​

Question 2: How to resolve the inconsistency join in records?

我尝试过使用嵌入式Kafka进行KStream和Ktable连接的测试用例.来自KStreams的10条记录和来自KTable的3条记录已被使用.当我第一次运行测试用例时,没有加入,加入之后我没有看到任何数据.当第二次运行相同时,它运行完美.如果我清除状态存储,则恢复为零.

I tried with Test case using Embedded Kafka for KStream and Ktable join. There was 10 records from KStreams and 3 records from KTable which were used process. when i ran the test case for the first time There was no Join and i didn’t see any data after join. When ran the same second time it ran perfectly. If i clear the state store then back to zero.

问题3:为什么会发生这种情况?

Question 3: Why is this behaviour happening?

我尝试使用KSQL,并且联接工作正常,并且我获得了8000条记录,然后我进入了KSQL源代码,我注意到KSQL也在执行相同的Join函数.

I tried with KSQL and the join was working perfectly and i got 8000 records, then i went into KSQL source code, i noticed KSQL is also doing same Join function.

问题4:KSQL如何解决此问题?

Question 4: How is KSQL resolving the issue?

我看不到示例建议的答案

I saw few example suggested answers

  • 使用无效的 GlobalKTable .我有同样不一致的加入.
  • 使用自定义连接器
  • Use GlobalKTable which didn’t work. i got same inconsistent join.
  • use Custom joiner https://github.com/confluentinc/kafka-streams-examples/blob/5.2.1-post/src/test/java/io/confluent/examples/streams/CustomStreamTableJoinIntegrationTest.java which didn’t work

我正在使用Spring cloud流作为依赖项.

I am using spring cloud streams as dependency.

我也看到JIRA上某个地方存在一个未解决的问题.

Also i saw there was a open issue regarding this somewhere on JIRA.

推荐答案

以下步骤显示了执行代码的顺序

below steps shows the sequence in which code is executed

请注意,构建拓扑仅是提供数据流程序的逻辑描述,并且没有不同运算符的执行顺序".程序将被翻译,所有操作员将同时执行.因此,所有主题的数据都将被并行读取.

Note that building a topology is just providing a logical description of data flow program and there is no "order of execution" of different operator. The program will be translated and all operator will be execute at the same time. Hence, data from all topics will be read in parallel.

这种并行处理是您观察到的根本原因,即,在处理开始之前(不是默认情况下至少没有保证),首先未加载表( ),因此流侧数据可能是即使表未完全加载也已处理.

This parallel processing is the root cause of your observation, i.e., the table is not loaded first before processing begins (at least there is no guarantee by default) and thus streams-side data may be processed even if the table is not fully loaded.

不同主题之间的处理顺序取决于记录的时间戳:具有较小时间戳的记录将首先处理.因此,如果要确保首先处理KTable数据,则必须确保记录时间戳小于流侧记录时间戳.在将输入数据生成到输入主题中时,或者使用自定义的时间戳提取器,都可以确保这一点.

The order of processing between different topic depend on the record timestamps: records with smaller timestamps are processed first. Hence, if you want to ensure that the KTable data is processed first, you must ensure that the record timestamps are smaller than the stream-side record timestamps. This can either be ensured when you produce the input data into the input topic, or by using a custom timestamp extractor.

第二,从主题中获取数据是不确定的,因此,如果仅返回流侧的数据(而不返回表侧数据),则无法进行时间戳比较,因此将在处理流侧数据之前对其进行处理.表端数据.要解决此问题,可以增加配置参数max.task.idle.ms(默认为0ms).如果增加此配置(我相信这也是KSQL的默认设置),如果没有一个输入的数据,该任务将阻塞并尝试获取空输入的数据(仅在经过空闲时间之后,处理才会继续即使一侧是空的.

Second, fetching data from topics is non-deterministic and thus, if data for only the stream side is returned (but not table side data) a timestamp comparison cannot be done and thus the stream-side data would be processed before the table side data. To tackle this issue, you can increase configuration parameter max.task.idle.ms (default is 0ms). If you increase this config (and I believe that is what KSQL also does by default) if there is no data for one input the task will block and try to fetch data for the empty input (only after the idle time passed, processing would continue even if one side is empty).

对于GlobalKTable,其行为是不同的.该表将在启动任何处理之前在启动时加载.因此,我不确定为什么这对您不起作用.

For a GlobalKTable the behavior is different. This table would be loaded on startup before any processing starts. Hence, I am not sure why this did not work for you.

这篇关于KStream到KTable的内部联接每次使用相同的数据处理时都会产生不同数量的记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆