Kafka流处理器线程安全吗? [英] Kafka stream processor thread safe?

查看:309
本文介绍了Kafka流处理器线程安全吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道在此之前曾问过这个问题: Kafka流并发吗?

I know this question was asked before here: Kafka Streaming Concurrency?

但是,这对我来说很奇怪.根据文档(或者我可能丢失了一些东西),每个分区都有一个任务,意味着不同的处理器实例,并且每个任务都由不同的线程执行.但是,当我对其进行测试时,我发现不同的线程可以获取不同的处理器实例.因此,如果要在处理器中保持任何内存状态(老式方式),必须锁定吗?

But yet this is very strange to me. According to the documentation (or maybe I am missing something) each partition has a task meaning different instance of processors and each task is being execute by different thread. But when I tested it, I saw that different threads can get different instances of processor. Therefore if you want to keep any in memory state (old fashioned way) in your processor you must lock?

示例代码:

public class SomeProcessor extends AbstractProcessor<String, JsonObject> {

   private final String ID = UUID.randomUUID().toString();

   @Override
   public void process(String key, JsonObject value) {
     System.out.println("Thread id: " + Thread.currentThread().getId() +" ID: " + ID);

输出:

线程ID:88 ID:26b11094-a094-404b-b610-88b38cc9d1ef

Thread id: 88 ID: 26b11094-a094-404b-b610-88b38cc9d1ef

线程ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

Thread id: 88 ID: c667e669-9023-494b-9345-236777e9dfda

线程ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

Thread id: 88 ID: c667e669-9023-494b-9345-236777e9dfda

线程ID:90 ID:0a43ecb0-26f2-440d-88e2-87e0c9cc4927

Thread id: 90 ID: 0a43ecb0-26f2-440d-88e2-87e0c9cc4927

线程ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

Thread id: 90 ID: c667e669-9023-494b-9345-236777e9dfda

线程ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

Thread id: 90 ID: c667e669-9023-494b-9345-236777e9dfda

是否有一种方法可以对每个实例强制执行线程?

推荐答案

每个实例的线程数是一个配置参数(num.stream.threads,默认值为1).因此,如果启动单个KafkaStreams实例,则会获得num.stream.threads线程.

The number of threads per instance is a configuration parameter (num.stream.threads with default value of 1). Thus, if you start a single KafkaStreams instance you get num.stream.threads threads.

任务将工作按并行单位(根据您输入的主题分区)划分工作,并将其分配给线程.因此,如果您有多个任务和一个线程,则所有任务都将分配给该线程.如果您有两个线程(所有KafkaStreams实例的总和),则每个线程执行大约50%的任务.

Tasks split up the work in parallel units (based on your input topic partitions) and will be assigned to threads. Thus, if you have multiple tasks and a single thread, all tasks will be assigned to this thread. If you have two threads (sum over all KafkaStreams instances) each thread executes about 50% of the tasks.

注意:由于Kafka Streams应用程序本质上是分布式的,因此,如果您运行具有多个线程的单个KafkaStreams实例,或运行每个线程一个的多个KafkaStreams实例,则没有区别.任务将分布在应用程序的所有可用线程上.

Note: because a Kafka Streams application is distributed in nature, there is no difference if you run a single KafkaStreams instance with multiple threads, or multiple KafkaStreams instanced with one thread each. Tasks will be distributed over all available threads of your application.

如果您要在任务之间共享任何数据结构,并且只有一个线程,那么您有责任同步对该数据结构的访问.请注意,任务到线程的分配可以在运行时更改,因此,所有访问都必须同步. 但是,不建议使用此模式,因为它会限制可伸缩性.您应该设计没有共享数据结构的程序!这样做的主要原因是,您的程序通常分布在多台计算机上,因此,不同的KafkaStreams实例始终无法访问共享数据结构.共享数据结构仅在单个JVM中有效,但是使用单个JVM可以防止应用程序横向扩展.

If you want to share any data structure between tasks and you have more then one thread, it's your responsibility to synchronize the access to this data structure. Note, that the task-to-thread assignment can change during runtime, and thus, all access must be synchronized. However, this pattern is not recommended as it limits scalability. You should design your program with no shared data structures! The main reason for this is, that your program in general is distributed over multiple machines, and thus, different KafkaStreams instances cannot access a shared data structure anyway. Sharing a data structure would only work within a single JVM but using a single JVM prevents horizontal scale out of your application.

这篇关于Kafka流处理器线程安全吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆