Kafka Streaming 任务和内部状态存储的管理 [英] Kafka Streaming tasks and management of Internal state stores

查看:16
本文介绍了Kafka Streaming 任务和内部状态存储的管理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我们在 2 台不同的机器(实例)上启动了 2 个流任务,具有以下属性:-

Lets say we have launched 2 Streaming-Tasks at 2 different machines (instances) with following properties :-

public final static String applicationID = "StreamsPOC";
public final static String bootstrapServers = "10.21.22.56:9093";    
public final static String topicname = "TestTransaction";
public final static String shipmentTopicName = "TestShipment";
public final static String RECORD_COUNT_STORE_NAME = "ProcessorONEStore";

并使用这些上述属性,流任务的定义如下所示:-

and using these aforesaid properties, here is how stream-task's definition looks like :-

        Map<String, String> changelogConfig = new HashMap();
        changelogConfig.put("min.insyc.replicas", "1");
        // Below line not working.
        changelogConfig.put("topic", "myChangedTopicLog");
       
        StoreBuilder kvStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(AppConfigs.RECORD_COUNT_STORE_NAME),
                AppSerdes.String(), AppSerdes.Integer()
        ).withLoggingEnabled(changelogConfig);

        kStreamBuilder.addStateStore(kvStoreBuilder);


        KStream<String, String> sourceKafkaStream = kStreamBuilder.stream
                (AppConfigs.topicname, Consumed.with(AppSerdes.String(), AppSerdes.String()));

现在,正如我所观察到的,在幕后,kafka 使用以下名称创建了幕后主题(用于备份内部状态存储):- StreamsPOC-ProcessorONEStore-changelog

Now, as I observed, under the hoods, kafka created the topic under the hoods(for the purpose of backing up the Internal state store) with following name:- StreamsPOC-ProcessorONEStore-changelog

第一个问题是:- 两个不同的流任务是否都将 Internal-State-store 维护和备份到同一主题?

First Question is :- Whether both of the different streaming tasks maintains and backs-up the Internal-State-store to the same topic ?

第二个问题是 ;- 说 Task-1 在 partition-1 上拾取并且它写说 <K1, V1>到它的本地内部状态存储,Task-2 开始在 Partition-2 上工作并说它也写入 <K1,V1>到其本地各自的状态存储,那么它是否不会引发数据被覆盖的风险,因为这两个任务都将数据备份到相同的变更日志主题?

Second question is ;- Say Task-1 picks-up on the partition-1 and it writes say <K1, V1> to its local internal-state-store and Task-2 starts working on Partition-2 and say it also writes <K1,V1> to its local respective state-store, then does it not throws the risk of data being overridden, as both of the tasks are backing up the data to same changelog topic ?

第三个问题是:- 如何指定自定义名称以更改日志主题?

Third Question is :- How can I specify the custom-name to Change-log-topic ?

回复将受到高度赞赏!

推荐答案

首先,对术语的一些思考:术语任务";在 Kafka Stream 中具有明确定义的含义,作为用户,您不会自己创建任务.当您的程序执行时,Kafka Streams 会创建作为独立计算单元"的任务.并为您执行这些任务.——我猜,你所说的任务"是什么意思?实际上是一个KafkaStreams 客户端(也就是一个实例).

First, some thought on terminology: the term "task" has a well-define meaning in Kafka Stream and as a user you don't create tasks by yourself. When your program is executed, Kafka Streams creates tasks that are "independents units of computation" and executes those tasks for you. -- I guess, what you mean by "task" is actually a KafkaStreams client (that is called an instance).

如果您使用相同的 application.id 启动多个 实例,它们将形成一个消费者组,并且它们将以数据并行的方式分担负载.对于状态存储,每个实例将托管存储的 shard(有时也称为分区).所有实例都使用相同的主题,并且每个存储分片的主题都有一个分区.从存储分片到变更日志分区有一个 1:1 的映射.此外,从输入主题分区到 任务 之间存在 1:1 映射,任务和存储分片之间存在 1:1 映射.因此,总体而言,这是一个 1:1:1:1 映射:对于每个输入主题分区,创建一个任务,每个任务保存状态存储的一个分片,每个存储分片由变更日志主题的一个分区支持.(即,最重要的是,输入主题分区的数量决定了您获得的并行任务和存储分片的数量,并且更改日志主题创建的分区数量与您的输入主题相同.)

If you start multiple instances with the same application.id they will form a consumer group and they will share the load in a data-parallel manner. For state stores, each instance will host a shard (sometimes also called partition) of the store. All instances use the same topic and the topic has a partition for each store shard. There is a 1:1 mapping from store shard to changelog partition. Furthermore, there is a 1:1 mapping from input topic partitions to tasks and a 1:1 mapping between tasks and store shards. Thus, overall it's a 1:1:1:1 mapping: For each input topic partition one task is created and each task holds one shard of the state store and each store shard is backed by one partition of the changelog topic. (Ie, bottom line is, that the number of input topic partitions determines how many parallel task and store shards you get, and the changelog topic is created the the same number of partitions as your input topic.)

  1. 是的,所有实例都使用相同的变更日志主题.
  2. 由于任务通过分片和变更日志主题分区进行隔离,因此它们不会相互覆盖.然而,任务的想法是每个任务处理一个不同的(非重叠)键空间,因此所有具有相同 <k1,...> 的记录应该 由相同的任务处理.当然,这条规则可能有例外,如果您的应用程序不使用非重叠键空间,则程序只会被执行(当然,这取决于您的业务逻辑要求,这可能正确或不正确).
  3. 您似乎已经这样做了:请注意,您只能自定义更改日志主题名称的一部分:--changelog -- 即,您可以选择application.idstoreName.不过,整体命名模式是硬编码的.
  1. So yes, all instances use the same changelog topic.
  2. As tasks are isolated via shards and changelog topic partitions, they won't overwrite each other. However, the idea of tasks is that each task processes a different (non-overlapping) key-space, and thus all records with the same <k1,...> should be processed by the same tasks. Of course, there might be exceptions from this rule and if your application does not use non-overlapping key-spaces the program will just be executed (of course, depending on your business logic requirement, this might be correct or incorrect).
  3. It seems you did already: note, that you can only customize parts of the changelog topic name: <application.id>-<storeName>-changelog -- ie, you can pick the application.id and storeName. The overall naming pattern is hard-coded though.

这篇关于Kafka Streaming 任务和内部状态存储的管理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆