Kafka流:从应用程序的每个实例中的所有分区读取 [英] Kafka streams: Read from ALL partitions in every instance of an application

查看:140
本文介绍了Kafka流:从应用程序的每个实例中的所有分区读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当使用KTable时,当实例/使用者的数量等于分区数时,Kafka流不允许实例从特定主题的多个分区读取。我尝试使用GlobalKTable实现这一点,问题是数据将被覆盖,也无法对其进行聚合。

When using KTable, Kafka streams doesn't allow instances to read from multiple partitions of a particular topic when the number of instances / consumers is equal to number of partitions. I tried achieving this using GlobalKTable, the problem with this is that data will be overwritten, also aggregation cannot be applied on it.

假设我有一个名为data_in的主题有3个分区(P1,P2,P3)。当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例从data_in的所有分区读取数据。我的意思是I1可以读取P1,P2和P3,I2可以读取P1,P2和P3,I2以及打开和打开。

Let's suppose I have a topic named "data_in" with 3 partitions (P1, P2, P3). When I run 3 instances (I1, I2, I3) of a Kafka streaming application, I want each instance to read data from all partitions of "data_in". I mean that I1 can read from P1, P2 and P3, I2 can read from P1, P2 and P3, I2 and on and on.

编辑:保持记住,生产者可以在data_in中将两个相似的ID发布到两个不同的分区中。因此,当运行两个不同的实例时,GlobalKtable将被覆盖。

请问,如何实现这一目标?这部分代码

Please, how to achieve this? This a portion of my code

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}


推荐答案

更改分区数您的输入主题data_in到1分区或使用 GlobalKtable 从主题中的所有分区获取数据,然后您可以加入您的流。这样,您的应用实例不再必须位于不同的消费者群组中。

Either change the number of partitions of your input topic "data_in" to 1 partition or use a GlobalKtable to get data from all partitions in the topic and then you can join your stream with it. With that, your apps instances no longer have to be in different consumer group.

代码如下所示:

private GlobalKTable<String, theDataList> globalStream() {

   // KStream of records from data-in topic using String and theDataSerde deserializers
  KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic

  KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
  KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();

  Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
  materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable
  KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
      if (!value.getValideData())
          aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
      else
        aggregate.getList().add(value);
      return aggregate;
  }, materialized)
  .to("agg_data_in");

  return getBuilder().globalTable("agg_data_in");
}

编辑:我编辑了上面的代码,强制重新分配一个名为 new_data_in。

EDIT : I edited the code above to force a repartition on a topic called "new_data_in".

这篇关于Kafka流:从应用程序的每个实例中的所有分区读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆