Kafka Streams-如何扩展Kafka存储生成的changelog主题 [英] Kafka Streams - How to scale Kafka store generated changelog topics
问题描述
我有多个冗余应用程序实例,这些实例要消耗主题的所有事件并独立存储它们以进行磁盘查找(通过rocksdb).
I am having multiple redundant app instances that want to consume all the events of a topic and store them independently for disk lookup (via a rocksdb).
为了论证,让我们假设这些多余的使用者正在服务无状态的http请求;因此不使用kafka来分担负载,而是使用kafka将数据从生产者复制到每个实例本地存储中.
For the sake of the argument, let's assume these redundant consumers are serving stateless http request; so the load is not shared using kafka, but kafka is rather used to replicate data from a producer into each of the instance localstore.
在查看生成的主题时,每个使用应用的应用都会创建3个额外的主题:
When looking at the topics generated, each consuming apps created 3 extra topics :
- {topicname} STATE-STORE-0000000000-changelog
- {application-name}-{storename} -changelog
- {application-name}-{storename}-分区
但是这些生成的主题中的每一个都与原始主题的压缩视图一样大.意味着每个消费商店都将原始主题(已经压缩)的大小乘以3.
But each of these generated topics are as big as the compacted view of the original topic. Meaning each consuming store multiplies by 3 the size of the original topic (which was already compacted).
- 为什么kafka商店需要这3个主题.协调磁盘上存储时,我们不能简单地将流配置为从上次消耗的偏移量重新加载吗?
- 是冗余消费应用程序的每个实例都有其唯一的3组商店生成的主题"的想法,还是应该将它们配置为共享同一组changelog主题?那么,由于它们需要消耗所有分区的所有事件,因此它们应该共享相同的applicationId还是不应该共享?
简而言之,我对存储可伸缩性感到担忧,因为我们使用的应用程序越来越多,这些应用程序会产生更多的更改日志主题...
In short, I am concerned by the storage scalability as we grow the number of consuming apps that would spawn more change log topics...
这是创建商店的代码
public class ProgramMappingEventStoreFactory {
private static final Logger logger = Logger.getLogger(ProgramMappingEventStoreFactory.class.getName());
private final static String STORE_NAME = "program-mapping-store";
private final static String APPLICATION_NAME = "epg-mapping-catalog_program-mapping";
public static ReadOnlyKeyValueStore<ProgramMappingEventKey, ProgramMappingEvent> newInstance(String kafkaBootstrapServerUrl,
String avroRegistryUrl,
String topic,
String storeDirectory)
{
Properties kafkaConfig = new KafkaConfigBuilder().withBootstrapServers(kafkaBootstrapServerUrl)
.withSchemaRegistryUrl(avroRegistryUrl)
.withApplicationId(createApplicationId(APPLICATION_NAME))
.withGroupId(UUID.randomUUID().toString())
.withClientId(UUID.randomUUID().toString())
.withDefaultKeySerdeClass(SpecificAvroSerde.class)
.withDefaultValueSerdeClass(SpecificAvroSerde.class)
.withStoreDirectory(storeDirectory)
.build();
StreamsBuilder streamBuilder = new StreamsBuilder();
bootstrapStore(streamBuilder, topic);
KafkaStreams streams = new KafkaStreams(streamBuilder.build(), kafkaConfig);
streams.start();
try {
return getStoreAndBlockUntilQueryable(STORE_NAME,
QueryableStoreTypes.keyValueStore(),
streams);
} catch (InterruptedException e) {
throw new IllegalStateException("Failed to create the LiveMediaPolicyIdStore", e);
}
}
private static <T> T getStoreAndBlockUntilQueryable(String storeName,
QueryableStoreType<T> queryableStoreType,
KafkaStreams streams)
throws InterruptedException
{
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
Thread.sleep(100);
}
}
}
private static void bootstrapStore(StreamsBuilder builder, String topic) {
KTable<ProgramMappingEventKey, ProgramMappingEvent> table = builder.table(topic);
table.groupBy((k, v) -> KeyValue.pair(k, v)).reduce((newValue, aggValue) -> newValue,
(newValue, aggValue) -> null,
Materialized.as(STORE_NAME));
}
private static String createApplicationId(String applicationName) {
try {
return String.format("%s-%s", applicationName, InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
logger.warning(() -> "Failed to find the hostname, generating a uique applicationId");
return String.format("%s-%s", applicationName, UUID.randomUUID());
}
}
}
推荐答案
如果要将同一状态加载到多个实例中,则应在所有实例(builder.globalTable()
)上使用GlobalKTable
和唯一的application.id
.
If you want to load the same state into multiple instances, you should use GlobalKTable
and a unique application.id
over all instances (builder.globalTable()
).
如果您使用KTable
,则数据已分区,迫使您对每个实例使用不同的application.id
.这可以被认为是反模式.
If you use KTable
data is partitioned forcing you to use different application.id
for each instance. This can be considered an anti-pattern.
我也不确定,为什么要执行groupBy((k, v) -> KeyValue.pair(k, v)).reduce()
,这会导致不必要的重新分区主题.
I am also not sure, why you do groupBy((k, v) -> KeyValue.pair(k, v)).reduce()
-- this results in an unnecessary repartition topic.
对于为table()
运算符生成的更改日志主题,如果使用StreamsBuilder
(KStreamBuilder
不受影响),则1.0
和1.1
发行版中存在一个已知错误.它已在2.0
版本( https://issues.apache.org/jira/浏览/KAFKA-6729 )
For the generated changelog topics for table()
operator, there is a know bug in 1.0
and 1.1
release if StreamsBuilder
is used (KStreamBuilder
is not affected). Its fixed in 2.0
release (https://issues.apache.org/jira/browse/KAFKA-6729)
这篇关于Kafka Streams-如何扩展Kafka存储生成的changelog主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!