Kafka Streams-如何扩展Kafka存储生成的changelog主题 [英] Kafka Streams - How to scale Kafka store generated changelog topics

查看:410
本文介绍了Kafka Streams-如何扩展Kafka存储生成的changelog主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个冗余应用程序实例,这些实例要消耗主题的所有事件并独立存储它们以进行磁盘查找(通过rocksdb).

I am having multiple redundant app instances that want to consume all the events of a topic and store them independently for disk lookup (via a rocksdb).

为了论证,让我们假设这些多余的使用者正在服务无状态的http请求;因此不使用kafka来分担负载,而是使用kafka将数据从生产者复制到每个实例本地存储中.

For the sake of the argument, let's assume these redundant consumers are serving stateless http request; so the load is not shared using kafka, but kafka is rather used to replicate data from a producer into each of the instance localstore.

在查看生成的主题时,每个使用应用的应用都会创建3个额外的主题:

When looking at the topics generated, each consuming apps created 3 extra topics :

  • {topicname} STATE-STORE-0000000000-changelog
  • {application-name}-{storename} -changelog
  • {application-name}-{storename}-分区

但是这些生成的主题中的每一个都与原始主题的压缩视图一样大.意味着每个消费商店都将原始主题(已经压缩)的大小乘以3.

But each of these generated topics are as big as the compacted view of the original topic. Meaning each consuming store multiplies by 3 the size of the original topic (which was already compacted).

  1. 为什么kafka商店需要这3个主题.协调磁盘上存储时,我们不能简单地将流配置为从上次消耗的偏移量重新加载吗?
  2. 是冗余消费应用程序的每个实例都有其唯一的3组商店生成的主题"的想法,还是应该将它们配置为共享同一组changelog主题?那么,由于它们需要消耗所有分区的所有事件,因此它们应该共享相同的applicationId还是不应该共享?

简而言之,我对存储可伸缩性感到担忧,因为我们使用的应用程序越来越多,这些应用程序会产生更多的更改日志主题...

In short, I am concerned by the storage scalability as we grow the number of consuming apps that would spawn more change log topics...

这是创建商店的代码

public class ProgramMappingEventStoreFactory {
  private static final Logger logger = Logger.getLogger(ProgramMappingEventStoreFactory.class.getName());
  private final static String STORE_NAME = "program-mapping-store";
  private final static String APPLICATION_NAME = "epg-mapping-catalog_program-mapping";

  public static ReadOnlyKeyValueStore<ProgramMappingEventKey, ProgramMappingEvent> newInstance(String kafkaBootstrapServerUrl,
                                                                                               String avroRegistryUrl,
                                                                                               String topic,
                                                                                               String storeDirectory)
  {
    Properties kafkaConfig = new KafkaConfigBuilder().withBootstrapServers(kafkaBootstrapServerUrl)
                                                     .withSchemaRegistryUrl(avroRegistryUrl)
                                                     .withApplicationId(createApplicationId(APPLICATION_NAME))
                                                     .withGroupId(UUID.randomUUID().toString())
                                                     .withClientId(UUID.randomUUID().toString())
                                                     .withDefaultKeySerdeClass(SpecificAvroSerde.class)
                                                     .withDefaultValueSerdeClass(SpecificAvroSerde.class)
                                                     .withStoreDirectory(storeDirectory)
                                                     .build();

    StreamsBuilder streamBuilder = new StreamsBuilder();
    bootstrapStore(streamBuilder, topic);
    KafkaStreams streams = new KafkaStreams(streamBuilder.build(), kafkaConfig);
    streams.start();
    try {
      return getStoreAndBlockUntilQueryable(STORE_NAME,
                                            QueryableStoreTypes.keyValueStore(),
                                            streams);
    } catch (InterruptedException e) {
      throw new IllegalStateException("Failed to create the LiveMediaPolicyIdStore", e);
    }
  }

  private static <T> T getStoreAndBlockUntilQueryable(String storeName,
                                                      QueryableStoreType<T> queryableStoreType,
                                                      KafkaStreams streams)
    throws InterruptedException
  {
    while (true) {
      try {
        return streams.store(storeName, queryableStoreType);
      } catch (InvalidStateStoreException ignored) {
        Thread.sleep(100);
      }
    }
  }

  private static void bootstrapStore(StreamsBuilder builder, String topic) {
    KTable<ProgramMappingEventKey, ProgramMappingEvent> table = builder.table(topic);

    table.groupBy((k, v) -> KeyValue.pair(k, v)).reduce((newValue, aggValue) -> newValue,
                                                        (newValue, aggValue) -> null,
                                                        Materialized.as(STORE_NAME));

  }

  private static String createApplicationId(String applicationName) {
    try {
      return String.format("%s-%s", applicationName, InetAddress.getLocalHost().getHostName());
    } catch (UnknownHostException e) {
      logger.warning(() -> "Failed to find the hostname, generating a uique applicationId");
      return String.format("%s-%s", applicationName, UUID.randomUUID());
    }
  }

}

推荐答案

如果要将同一状态加载到多个实例中,则应在所有实例(builder.globalTable())上使用GlobalKTable和唯一的application.id.

If you want to load the same state into multiple instances, you should use GlobalKTable and a unique application.id over all instances (builder.globalTable()).

如果您使用KTable,则数据已分区,迫使您对每个实例使用不同的application.id.这可以被认为是反模式.

If you use KTable data is partitioned forcing you to use different application.id for each instance. This can be considered an anti-pattern.

我也不确定,为什么要执行groupBy((k, v) -> KeyValue.pair(k, v)).reduce(),这会导致不必要的重新分区主题.

I am also not sure, why you do groupBy((k, v) -> KeyValue.pair(k, v)).reduce() -- this results in an unnecessary repartition topic.

对于为table()运算符生成的更改日志主题,如果使用StreamsBuilder(KStreamBuilder不受影响),则1.01.1发行版中存在一个已知错误.它已在2.0版本( https://issues.apache.org/jira/浏览/KAFKA-6729 )

For the generated changelog topics for table() operator, there is a know bug in 1.0 and 1.1 release if StreamsBuilder is used (KStreamBuilder is not affected). Its fixed in 2.0 release (https://issues.apache.org/jira/browse/KAFKA-6729)

这篇关于Kafka Streams-如何扩展Kafka存储生成的changelog主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆