Kafka 流:从应用程序的每个实例中的所有分区读取 [英] Kafka streams: Read from ALL partitions in every instance of an application

查看:22
本文介绍了Kafka 流:从应用程序的每个实例中的所有分区读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 KTable 时,当实例/消费者数量等于分区数量时,Kafka 流不允许实例从特定主题的多个分区中读取.我尝试使用 GlobalKTable 来实现这一点,这样做的问题是数据将被覆盖,并且无法对其应用聚合.

When using KTable, Kafka streams doesn't allow instances to read from multiple partitions of a particular topic when the number of instances / consumers is equal to number of partitions. I tried achieving this using GlobalKTable, the problem with this is that data will be overwritten, also aggregation cannot be applied on it.

假设我有一个名为data_in"的主题,有 3 个分区(P1、P2、P3).当我运行 Kafka 流应用程序的 3 个实例(I1、I2、I3)时,我希望每个实例都从data_in"的所有分区读取数据.我的意思是 I1 可以读取 P1、P2 和 P3,I2 可以读取 P1、P2 和 P3,I2 等等.

Let's suppose I have a topic named "data_in" with 3 partitions (P1, P2, P3). When I run 3 instances (I1, I2, I3) of a Kafka streaming application, I want each instance to read data from all partitions of "data_in". I mean that I1 can read from P1, P2 and P3, I2 can read from P1, P2 and P3, I2 and on and on.

请记住,生产者可以将两个相似的 ID 发布到data_in"中的两个不同分区中.所以当运行两个不同的实例时,GlobalKtable 会被覆盖.

请问,如何实现?这是我的代码的一部分

Please, how to achieve this? This a portion of my code

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}

推荐答案

要么将输入主题data_in"的分区数更改为 1 个分区,要么使用 GlobalKtable 从所有分区获取数据在主题中,然后您可以加入您的信息流.有了这个,您的应用实例不再需要位于不同的消费者组中.

Either change the number of partitions of your input topic "data_in" to 1 partition or use a GlobalKtable to get data from all partitions in the topic and then you can join your stream with it. With that, your apps instances no longer have to be in different consumer group.

代码如下:

private GlobalKTable<String, theDataList> globalStream() {

   // KStream of records from data-in topic using String and theDataSerde deserializers
  KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic

  KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
  KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();

  Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
  materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable
  KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
      if (!value.getValideData())
          aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
      else
        aggregate.getList().add(value);
      return aggregate;
  }, materialized)
  .to("agg_data_in");

  return getBuilder().globalTable("agg_data_in");
}

我编辑了上面的代码以强制对名为new_data_in"的主题进行重新分区.

EDIT : I edited the code above to force a repartition on a topic called "new_data_in".

这篇关于Kafka 流:从应用程序的每个实例中的所有分区读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆