Kafka Streaming任务和内部状态存储的管理 [英] Kafka Streaming tasks and management of Internal state stores

查看:165
本文介绍了Kafka Streaming任务和内部状态存储的管理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

让我们说我们已经在2个具有以下属性的机器(实例)上启动了2个Streaming-Tasks:-

Lets say we have launched 2 Streaming-Tasks at 2 different machines (instances) with following properties :-

public final static String applicationID = "StreamsPOC";
public final static String bootstrapServers = "10.21.22.56:9093";    
public final static String topicname = "TestTransaction";
public final static String shipmentTopicName = "TestShipment";
public final static String RECORD_COUNT_STORE_NAME = "ProcessorONEStore";

并使用上述这些属性,以下是流任务的定义:-

and using these aforesaid properties, here is how stream-task's definition looks like :-

        Map<String, String> changelogConfig = new HashMap();
        changelogConfig.put("min.insyc.replicas", "1");
        // Below line not working.
        changelogConfig.put("topic", "myChangedTopicLog");
       
        StoreBuilder kvStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(AppConfigs.RECORD_COUNT_STORE_NAME),
                AppSerdes.String(), AppSerdes.Integer()
        ).withLoggingEnabled(changelogConfig);

        kStreamBuilder.addStateStore(kvStoreBuilder);


        KStream<String, String> sourceKafkaStream = kStreamBuilder.stream
                (AppConfigs.topicname, Consumed.with(AppSerdes.String(), AppSerdes.String()));

现在,正如我观察到的那样,kafka在幕后创建了主题(用于备份内部状态存储),其名称如下:- StreamsPOC-ProcessorONEStore-changelog

Now, as I observed, under the hoods, kafka created the topic under the hoods(for the purpose of backing up the Internal state store) with following name:- StreamsPOC-ProcessorONEStore-changelog

第一个问题是:-是否两个不同的流任务都维护内部状态存储并将其备份到同一主题?

First Question is :- Whether both of the different streaming tasks maintains and backs-up the Internal-State-store to the same topic ?

第二个问题是-说Task-1在分区1上启动,并写着< K1,V1>.任务2开始在Partition 2上工作,并说它也写< K1,V1>.到其各自的本地状态存储中,那么这不会引发数据被覆盖的风险,因为这两个任务都将数据备份到相同的changelog主题上?

Second question is ;- Say Task-1 picks-up on the partition-1 and it writes say <K1, V1> to its local internal-state-store and Task-2 starts working on Partition-2 and say it also writes <K1,V1> to its local respective state-store, then does it not throws the risk of data being overridden, as both of the tasks are backing up the data to same changelog topic ?

第三个问题是:-如何将自定义名称指定为Change-log-topic?

Third Question is :- How can I specify the custom-name to Change-log-topic ?

响应将受到高度赞赏!

推荐答案

首先,对术语进行一些思考:术语任务"在Kafka Stream中具有明确定义的含义,作为用户,您不能自己创建任务.当您的程序执行时,Kafka Streams会创建任务,它们是独立于计算单位".并为您执行这些任务.-我猜,您所说的任务"是什么意思?实际上是 KafkaStreams 客户端(称为 instance ).

First, some thought on terminology: the term "task" has a well-define meaning in Kafka Stream and as a user you don't create tasks by yourself. When your program is executed, Kafka Streams creates tasks that are "independents units of computation" and executes those tasks for you. -- I guess, what you mean by "task" is actually a KafkaStreams client (that is called an instance).

如果您使用相同的 application.id 启动多个 instances ,它们将组成一个使用者组,并且它们将以并行数据的方式共享负载.对于状态存储,每个实例都将托管该存储的 shard (有时也称为分区).所有实例都使用相同的主题,并且该主题为每个存储分片都有一个分区.从存储分片到更改日志分区有1:1映射.此外,从输入主题分区到任务有1:1映射,在任务和存储分片之间有1:1映射.因此,总体而言,这是1:1:1:1的映射:对于每个输入主题分区,将创建一个任务,并且每个任务都保存状态存储区的一个分片,并且每个存储区分片均由changelog主题的一个分区支持.(最重要的是,输入主题分区的数量决定了您获得多少个并行任务和存储分片,并且changelog主题的创建数量与输入主题的分区数量相同.)

If you start multiple instances with the same application.id they will form a consumer group and they will share the load in a data-parallel manner. For state stores, each instance will host a shard (sometimes also called partition) of the store. All instances use the same topic and the topic has a partition for each store shard. There is a 1:1 mapping from store shard to changelog partition. Furthermore, there is a 1:1 mapping from input topic partitions to tasks and a 1:1 mapping between tasks and store shards. Thus, overall it's a 1:1:1:1 mapping: For each input topic partition one task is created and each task holds one shard of the state store and each store shard is backed by one partition of the changelog topic. (Ie, bottom line is, that the number of input topic partitions determines how many parallel task and store shards you get, and the changelog topic is created the the same number of partitions as your input topic.)

  1. 是的,所有实例都使用相同的changelog主题.
  2. 由于任务是通过分片和changelog主题分区隔离的,因此它们不会相互覆盖.但是,任务的想法是每个任务处理不同的(不重叠)键空间,因此所有具有相同< k1,...> /em>由相同的任务处理.当然,该规则可能会有例外,如果您的应用程序不使用非重叠键空间,则程序将仅被执行(当然,根据您的业务逻辑要求,这可能是正确的还是不正确的).
  3. 似乎您已经做过:请注意,您只能自定义变更日志主题名称的一部分:< application.id>-< storeName> -changelog -即,您可以选择 application.id storeName .不过,整体命名模式是硬编码的.
  1. So yes, all instances use the same changelog topic.
  2. As tasks are isolated via shards and changelog topic partitions, they won't overwrite each other. However, the idea of tasks is that each task processes a different (non-overlapping) key-space, and thus all records with the same <k1,...> should be processed by the same tasks. Of course, there might be exceptions from this rule and if your application does not use non-overlapping key-spaces the program will just be executed (of course, depending on your business logic requirement, this might be correct or incorrect).
  3. It seems you did already: note, that you can only customize parts of the changelog topic name: <application.id>-<storeName>-changelog -- ie, you can pick the application.id and storeName. The overall naming pattern is hard-coded though.

这篇关于Kafka Streaming任务和内部状态存储的管理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆