KStream和KTable之间的时间语义 [英] Time semantics between KStream and KTable

查看:331
本文介绍了KStream和KTable之间的时间语义的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建以下拓扑:

  1. 使用Debezium连接器,我拉出2个表(我们称它们为表A和DA).根据DBZ,存储表行的主题的结构为{之前:"...",之后:"..."}.

  2. 我的拓扑结构的第一步是从这两个表"主题创建干净的" KStream.那里的子拓扑大致如下:

 private static KStream<String, TABLE_A.Value> getTableARowByIdStream(
    StreamsBuilder builder, Properties streamsConfig) {
  return builder
      .stream("TABLE_A", Consumed.withTimestampExtractor(Application::getRowDate))
      .filter((key, envelope) -> [ some filtering condition ] )
      .map((key, envelope) -> [ maps to TABLE_A.Value ] )
      .through(tableRowByIdTopicName);
}
 

  1. 请注意,我明确指定了记录时间,因为表行在最初发布后将被CDC标记为年".该函数当前正在执行的操作是在伪装从2010-01-01开始的时间,并使用AtomicInteger为每个消耗的实体增加1毫秒.它对表A执行此操作,但对DA不执行此操作(稍后将解释原因).

  2. 拓扑的第二阶段是基于表A的清理"主题构建1个KTable,如下所示:

 private static KTable<String, EntityInfoList> getEntityInfoListById(
    KStream<String, TABLE_A.Value> tableAByIdStream) {
  return tableAByIdStream
      .map((key, value) -> [ some mapping ] )
      .groupByKey()
      .aggregate(() -> [ builds up a EntityInfoList object ] ));
}
 

  1. 最后,在准备好KTable之后,我将通过DA与KStream一起加入他们,就像这样:

 private static KStream<String, OutputTopicEntity> getOutputTopicEntityStream(
    KStream<String, Table_DA.Value> tableDAStream,
    KTable<String, EntityInfoList> tableA_KTable) {

  KStream<String, Table_DA>[] branches = tableDAStream.branch(
      (key, value) -> [ some logic ],
      (key, value) -> true);

  KStream<String, OutputTopicEntity> internalAccountRefStream = branches[0]
      .join(
          tableA_KTable,
          (streamValue, tableValue) -> [ some logic to build a list of OutputTopicEntity ])
      .flatMap((key, listValue) -> [ some logic to flatten it ]));

   [ similar logic with branch[1] ]
}
 

我的问题是,尽管事实是我在假装"来自Table_A主题的记录(我已经证实它们使用kafkacat引用了2010/01/01)和Table_DA中的条目(流端)的时间的连接)在今天'2019/08/14'左右有时间戳记),似乎Kafka Streams不会从Table_DA KStream读取任何条目,直到将Table_A的所有记录都提取到KTable中为止.

因此,我没有我期望的所有"join hits",这也是不确定的.我基于这句话的理解是相反的是:KTable vs GlobalKTable和leftJoin()vs externalJoin()之间的区别?

对于流表连接,Kafka Stream会根据记录时间戳顺序排列记录处理.因此,对表的更新将与您的流记录保持一致.

到目前为止,我的经验是这没有发生.在使用完Table_DA流中的所有条目(它的大小恰好缩小了10倍)之后,我还可以轻松地看到我的应用程序如何继续通过Table_A主题方式进行搅动.

我做错什么了吗?

解决方案

在2.1.0版本发布之前,时间戳同步是最大的努力(请参见max.task.idle.ms(默认值为0).

I am trying to build the following topology:

  1. Using Debezium Connectors, I am pulling 2 tables (let's called them tables A, and DA). As per DBZ, the topics where the table rows are stored have the structure { before: "...", after: "..." }.

  2. First steps in my topology are to create "clean" KStreams off these two "table" topics. The sub-topology there looks roughly like this:

private static KStream<String, TABLE_A.Value> getTableARowByIdStream(
    StreamsBuilder builder, Properties streamsConfig) {
  return builder
      .stream("TABLE_A", Consumed.withTimestampExtractor(Application::getRowDate))
      .filter((key, envelope) -> [ some filtering condition ] )
      .map((key, envelope) -> [ maps to TABLE_A.Value ] )
      .through(tableRowByIdTopicName);
}

  1. Notice that I am assigning the record time explicitly because the table rows will be CDC'ed "years" after they were originally published. What the function is doing at the moment is faking the time starting at 2010-01-01 and, using an AtomicInteger, adding 1 millisecond for each consumed entity. It does this for tables A but it doesn't for DA (I will explain why later).

  2. Phase 2 of the topology is to build 1 KTable based on the "cleaned" topic for table A, like this:

private static KTable<String, EntityInfoList> getEntityInfoListById(
    KStream<String, TABLE_A.Value> tableAByIdStream) {
  return tableAByIdStream
      .map((key, value) -> [ some mapping ] )
      .groupByKey()
      .aggregate(() -> [ builds up a EntityInfoList object ] ));
}

  1. Finally, with th KTable ready, I'm joining them with the KStream over DA like so:

private static KStream<String, OutputTopicEntity> getOutputTopicEntityStream(
    KStream<String, Table_DA.Value> tableDAStream,
    KTable<String, EntityInfoList> tableA_KTable) {

  KStream<String, Table_DA>[] branches = tableDAStream.branch(
      (key, value) -> [ some logic ],
      (key, value) -> true);

  KStream<String, OutputTopicEntity> internalAccountRefStream = branches[0]
      .join(
          tableA_KTable,
          (streamValue, tableValue) -> [ some logic to build a list of OutputTopicEntity ])
      .flatMap((key, listValue) -> [ some logic to flatten it ]));

   [ similar logic with branch[1] ]
}

My problem is, despite the fact that I am "faking" the time for records coming from the Table_A topic (I've verified that they are referencing 2010/01/01 using kafkacat) and entries in Table_DA (the stream side of the join) have timestamps around today '2019/08/14'), it doesn't seem like Kafka Streams is holding reading any of the entries from Table_DA KStream until it has ingested all records from Table_A into the KTable.

As a result of that, I don't have all the "join hits" that I was expecting and it is also nondeterministic. My understanding based on this sentence from What are the differences between KTable vs GlobalKTable and leftJoin() vs outerJoin()? was the opposite:

For stream-table join, Kafka Stream align record processing ordered based on record timestamps. Thus, the update to the table are aligned with the records of you stream.

My experience so far is this is not happening. I can also easily see how my application continues churning through the Table_A topic way after it has consumed all entries in Table_DA stream (it happens to be 10 times smaller).

Am I doing something wrong?

解决方案

Timestamp synchronization is best effort before 2.1.0 release (cf. https://issues.apache.org/jira/browse/KAFKA-3514).

As of 2.1.0, timestamps are synchronized strictly. However, if one input does not have any data, Kafka Streams will "enforce" processing as described in KIP-353 to avoid blocking forever. If you have bursty inputs and want to "block" processing for some time if one input has no data, you can increase configuration parameter max.task.idle.ms (default is 0) as introduced in 2.1.0 via KIP-353.

这篇关于KStream和KTable之间的时间语义的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆