Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题 [英] Kafka Streams - How to scale Kafka store generated changelog topics

查看:17
本文介绍了Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个冗余应用实例,它们想要使用一个主题的所有事件并独立存储它们以进行磁盘查找(通过 RocksDB).

I am having multiple redundant app instances that want to consume all the events of a topic and store them independently for disk lookup (via a rocksdb).

为了论证起见,让我们假设这些多余的消费者正在为无状态的 http 请求提供服务;所以负载不是使用 kafka 共享的,而是使用 kafka 将数据从生产者复制到每个实例本地存储中.

For the sake of the argument, let's assume these redundant consumers are serving stateless http request; so the load is not shared using kafka, but kafka is rather used to replicate data from a producer into each of the instance localstore.

查看生成的主题时,每个消费应用创建了 3 个额外的主题:

When looking at the topics generated, each consuming apps created 3 extra topics :

  • {topicname}STATE-STORE-0000000000-changelog
  • {application-name}-{storename}-changelog
  • {application-name}-{storename}-repartition

但是这些生成的主题中的每一个都与原始主题的压缩视图一样大.这意味着每个消费商店将原始主题(已经被压缩)的大小乘以 3.

But each of these generated topics are as big as the compacted view of the original topic. Meaning each consuming store multiplies by 3 the size of the original topic (which was already compacted).

  1. 为什么 kafka store 需要这 3 个主题.在协调磁盘存储时,我们不能简单地将流配置为从上次消耗的偏移量重新加载吗?
  2. 是否认为冗余消费应用程序的每个实例都有其独特的 3 个存储生成主题"集,还是应该将它们配置为共享相同的更改日志主题集?那么,它们应该共享相同的 applicationId 还是不共享,因为它们需要消耗所有分区的所有事件?

简而言之,随着消费应用程序数量的增加,会产生更多更改日志主题,我担心存储可扩展性...

In short, I am concerned by the storage scalability as we grow the number of consuming apps that would spawn more change log topics...

这是创建商店的代码

public class ProgramMappingEventStoreFactory {
  private static final Logger logger = Logger.getLogger(ProgramMappingEventStoreFactory.class.getName());
  private final static String STORE_NAME = "program-mapping-store";
  private final static String APPLICATION_NAME = "epg-mapping-catalog_program-mapping";

  public static ReadOnlyKeyValueStore<ProgramMappingEventKey, ProgramMappingEvent> newInstance(String kafkaBootstrapServerUrl,
                                                                                               String avroRegistryUrl,
                                                                                               String topic,
                                                                                               String storeDirectory)
  {
    Properties kafkaConfig = new KafkaConfigBuilder().withBootstrapServers(kafkaBootstrapServerUrl)
                                                     .withSchemaRegistryUrl(avroRegistryUrl)
                                                     .withApplicationId(createApplicationId(APPLICATION_NAME))
                                                     .withGroupId(UUID.randomUUID().toString())
                                                     .withClientId(UUID.randomUUID().toString())
                                                     .withDefaultKeySerdeClass(SpecificAvroSerde.class)
                                                     .withDefaultValueSerdeClass(SpecificAvroSerde.class)
                                                     .withStoreDirectory(storeDirectory)
                                                     .build();

    StreamsBuilder streamBuilder = new StreamsBuilder();
    bootstrapStore(streamBuilder, topic);
    KafkaStreams streams = new KafkaStreams(streamBuilder.build(), kafkaConfig);
    streams.start();
    try {
      return getStoreAndBlockUntilQueryable(STORE_NAME,
                                            QueryableStoreTypes.keyValueStore(),
                                            streams);
    } catch (InterruptedException e) {
      throw new IllegalStateException("Failed to create the LiveMediaPolicyIdStore", e);
    }
  }

  private static <T> T getStoreAndBlockUntilQueryable(String storeName,
                                                      QueryableStoreType<T> queryableStoreType,
                                                      KafkaStreams streams)
    throws InterruptedException
  {
    while (true) {
      try {
        return streams.store(storeName, queryableStoreType);
      } catch (InvalidStateStoreException ignored) {
        Thread.sleep(100);
      }
    }
  }

  private static void bootstrapStore(StreamsBuilder builder, String topic) {
    KTable<ProgramMappingEventKey, ProgramMappingEvent> table = builder.table(topic);

    table.groupBy((k, v) -> KeyValue.pair(k, v)).reduce((newValue, aggValue) -> newValue,
                                                        (newValue, aggValue) -> null,
                                                        Materialized.as(STORE_NAME));

  }

  private static String createApplicationId(String applicationName) {
    try {
      return String.format("%s-%s", applicationName, InetAddress.getLocalHost().getHostName());
    } catch (UnknownHostException e) {
      logger.warning(() -> "Failed to find the hostname, generating a uique applicationId");
      return String.format("%s-%s", applicationName, UUID.randomUUID());
    }
  }

}

推荐答案

如果你想将同一个状态加载到多个实例中,你应该使用 GlobalKTable 和唯一的 application.id 覆盖所有实例(builder.globalTable()).

If you want to load the same state into multiple instances, you should use GlobalKTable and a unique application.id over all instances (builder.globalTable()).

如果您使用 KTable 数据被分区,迫使您为每个实例使用不同的 application.id.这可以被认为是一种反模式.

If you use KTable data is partitioned forcing you to use different application.id for each instance. This can be considered an anti-pattern.

我也不确定,为什么要执行 groupBy((k, v) -> KeyValue.pair(k, v)).reduce() -- 这会导致不必要的重新分区主题.

I am also not sure, why you do groupBy((k, v) -> KeyValue.pair(k, v)).reduce() -- this results in an unnecessary repartition topic.

对于 table() 操作符生成的变更日志主题,1.01.1 版本中存在一个已知错误,如果 StreamsBuilder 被使用(KStreamBuilder 不受影响).它在 2.0 版本(https://issues.apache.org/jira/browse/KAFKA-6729)

For the generated changelog topics for table() operator, there is a know bug in 1.0 and 1.1 release if StreamsBuilder is used (KStreamBuilder is not affected). Its fixed in 2.0 release (https://issues.apache.org/jira/browse/KAFKA-6729)

这篇关于Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆