KStream 和 KTable 之间的时间语义 [英] Time semantics between KStream and KTable

查看:21
本文介绍了KStream 和 KTable 之间的时间语义的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建以下拓扑:

I am trying to build the following topology:

  1. 使用 Debezium 连接器,我拉了 2 个表(我们称它们为表 A 和 DA).根据 DBZ,存储表行的主题具有结构 { before: "...", after: "..." }.

  1. Using Debezium Connectors, I am pulling 2 tables (let's called them tables A, and DA). As per DBZ, the topics where the table rows are stored have the structure { before: "...", after: "..." }.

我的拓扑结构的第一步是根据这两个表格"主题创建干净"的 KStream.那里的子拓扑大致如下所示:

First steps in my topology are to create "clean" KStreams off these two "table" topics. The sub-topology there looks roughly like this:

private static KStream<String, TABLE_A.Value> getTableARowByIdStream(
    StreamsBuilder builder, Properties streamsConfig) {
  return builder
      .stream("TABLE_A", Consumed.withTimestampExtractor(Application::getRowDate))
      .filter((key, envelope) -> [ some filtering condition ] )
      .map((key, envelope) -> [ maps to TABLE_A.Value ] )
      .through(tableRowByIdTopicName);
}

  1. 请注意,我明确指定了记录时间,因为表行将在最初发布后被 CDC 标记为年".该函数目前正在做的是伪造从 2010 年 1 月 1 日开始的时间,并使用 AtomicInteger 为每个消耗的实体增加 1 毫秒.它对表 A 执行此操作,但对 DA 不执行此操作(稍后我将解释原因).

  1. Notice that I am assigning the record time explicitly because the table rows will be CDC'ed "years" after they were originally published. What the function is doing at the moment is faking the time starting at 2010-01-01 and, using an AtomicInteger, adding 1 millisecond for each consumed entity. It does this for tables A but it doesn't for DA (I will explain why later).

拓扑的第 2 阶段是基于表 A 的清理"主题构建 1 个 KTable,如下所示:

Phase 2 of the topology is to build 1 KTable based on the "cleaned" topic for table A, like this:

private static KTable<String, EntityInfoList> getEntityInfoListById(
    KStream<String, TABLE_A.Value> tableAByIdStream) {
  return tableAByIdStream
      .map((key, value) -> [ some mapping ] )
      .groupByKey()
      .aggregate(() -> [ builds up a EntityInfoList object ] ));
}

  1. 最后,在准备好 KTable 后,我将通过 DA 加入 KStream,如下所示:

private static KStream<String, OutputTopicEntity> getOutputTopicEntityStream(
    KStream<String, Table_DA.Value> tableDAStream,
    KTable<String, EntityInfoList> tableA_KTable) {

  KStream<String, Table_DA>[] branches = tableDAStream.branch(
      (key, value) -> [ some logic ],
      (key, value) -> true);

  KStream<String, OutputTopicEntity> internalAccountRefStream = branches[0]
      .join(
          tableA_KTable,
          (streamValue, tableValue) -> [ some logic to build a list of OutputTopicEntity ])
      .flatMap((key, listValue) -> [ some logic to flatten it ]));

   [ similar logic with branch[1] ]
}

我的问题是,尽管我伪造"了来自 ​​Table_A 主题的记录的时间(我已经验证它们使用 kafkacat 引用了 2010/01/01)和 Table_DA(流端)中的条目加入)的时间戳在今天2019/08/14"左右),似乎 Kafka Streams 在将 Table_A 中的所有记录摄取到 KTable 之前,似乎不会从 Table_DA KStream 中读取任何条目.

My problem is, despite the fact that I am "faking" the time for records coming from the Table_A topic (I've verified that they are referencing 2010/01/01 using kafkacat) and entries in Table_DA (the stream side of the join) have timestamps around today '2019/08/14'), it doesn't seem like Kafka Streams is holding reading any of the entries from Table_DA KStream until it has ingested all records from Table_A into the KTable.

因此,我没有预期的所有加入命中",而且它也是不确定的.我的理解基于 WhatKTable 与 GlobalKTable 以及 leftJoin() 与 outerJoin() 之间的区别是什么? 恰恰相反:

As a result of that, I don't have all the "join hits" that I was expecting and it is also nondeterministic. My understanding based on this sentence from What are the differences between KTable vs GlobalKTable and leftJoin() vs outerJoin()? was the opposite:

对于流表连接,Kafka Stream 根据记录时间戳排列记录处理顺序.因此,对表的更新与您流的记录对齐.

For stream-table join, Kafka Stream align record processing ordered based on record timestamps. Thus, the update to the table are aligned with the records of you stream.

到目前为止,我的经验是这种情况没有发生.我还可以很容易地看到我的应用程序在消耗了 Table_DA 流中的所有条目后如何继续通过 Table_A 主题方式搅动(它恰好小了 10 倍).

My experience so far is this is not happening. I can also easily see how my application continues churning through the Table_A topic way after it has consumed all entries in Table_DA stream (it happens to be 10 times smaller).

我做错了什么吗?

推荐答案

时间戳同步是 2.1.0 版本之前最好的努力(参见 https://issues.apache.org/jira/browse/KAFKA-3514).

Timestamp synchronization is best effort before 2.1.0 release (cf. https://issues.apache.org/jira/browse/KAFKA-3514).

从 2.1.0 开始,时间戳是严格同步的.但是,如果一个输入没有任何数据,Kafka Streams 将按照 KIP-353 以避免永远阻塞.如果你有突发输入并且想在一个输入没有数据的情况下阻塞"处理一段时间,你可以增加配置参数max.task.idle.ms(默认为0),如 2.1.0 中通过 KIP-353 引入的.

As of 2.1.0, timestamps are synchronized strictly. However, if one input does not have any data, Kafka Streams will "enforce" processing as described in KIP-353 to avoid blocking forever. If you have bursty inputs and want to "block" processing for some time if one input has no data, you can increase configuration parameter max.task.idle.ms (default is 0) as introduced in 2.1.0 via KIP-353.

这篇关于KStream 和 KTable 之间的时间语义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆