Kafka 流处理器线程安全吗? [英] Kafka stream processor thread safe?

查看:26
本文介绍了Kafka 流处理器线程安全吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道这里之前有人问过这个问题:Kafka Streaming Concurrency?

I know this question was asked before here: Kafka Streaming Concurrency?

但这对我来说很奇怪.根据文档(或者我可能遗漏了一些东西),每个分区都有一个任务,意味着不同的处理器实例,每个任务都由不同的线程执行.但是当我测试它时,我看到不同的线程可以获得不同的处理器实例.因此,如果您想在处理器中保留任何内存状态(老式方式),您必须锁定吗?

But yet this is very strange to me. According to the documentation (or maybe I am missing something) each partition has a task meaning different instance of processors and each task is being execute by different thread. But when I tested it, I saw that different threads can get different instances of processor. Therefore if you want to keep any in memory state (old fashioned way) in your processor you must lock?

示例代码:

public class SomeProcessor extends AbstractProcessor<String, JsonObject> {

   private final String ID = UUID.randomUUID().toString();

   @Override
   public void process(String key, JsonObject value) {
     System.out.println("Thread id: " + Thread.currentThread().getId() +" ID: " + ID);

输出:

线程 ID:88 ID:26b11094-a094-404b-b610-88b38cc9d1ef

Thread id: 88 ID: 26b11094-a094-404b-b610-88b38cc9d1ef

线程 ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

Thread id: 88 ID: c667e669-9023-494b-9345-236777e9dfda

线程 ID:88 ID:c667e669-9023-494b-9345-236777e9dfda

Thread id: 88 ID: c667e669-9023-494b-9345-236777e9dfda

线程 ID:90 ID:0a43ecb0-26f2-440d-88e2-87e0c9cc4927

Thread id: 90 ID: 0a43ecb0-26f2-440d-88e2-87e0c9cc4927

线程 ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

Thread id: 90 ID: c667e669-9023-494b-9345-236777e9dfda

线程 ID:90 ID:c667e669-9023-494b-9345-236777e9dfda

Thread id: 90 ID: c667e669-9023-494b-9345-236777e9dfda

有没有办法强制每个实例线程?

推荐答案

每个实例的线程数是一个配置参数(num.stream.threads,默认值为1代码>).因此,如果您启动单个 KafkaStreams 实例,您将获得 num.stream.threads 个线程.

The number of threads per instance is a configuration parameter (num.stream.threads with default value of 1). Thus, if you start a single KafkaStreams instance you get num.stream.threads threads.

任务以并行单元(基于您的输入主题分区)拆分工作,并将分配给线程.因此,如果您有多个任务和一个线程,则所有任务都将分配给该线程.如果您有两个线程(对所有 KafkaStreams 实例求和),每个线程执行大约 50% 的任务.

Tasks split up the work in parallel units (based on your input topic partitions) and will be assigned to threads. Thus, if you have multiple tasks and a single thread, all tasks will be assigned to this thread. If you have two threads (sum over all KafkaStreams instances) each thread executes about 50% of the tasks.

注意:由于 Kafka Streams 应用程序本质上是分布式的,因此如果您运行具有多个线程的单个 KafkaStreams 实例,或使用一个实例化的多个 KafkaStreams 实例,则没有区别每个线程.任务将分布在应用程序的所有可用线程上.

Note: because a Kafka Streams application is distributed in nature, there is no difference if you run a single KafkaStreams instance with multiple threads, or multiple KafkaStreams instanced with one thread each. Tasks will be distributed over all available threads of your application.

如果您想在任务之间共享任何数据结构并且您有多个线程,则您有责任同步对该数据结构的访问.请注意,任务到线程的分配可以在运行时更改,因此所有访问都必须同步.但是,不建议使用此模式,因为它限制了可扩展性.您应该设计没有共享数据结构的程序! 这样做的主要原因是,您的程序通常分布在多台机器上,因此,不同的 KafkaStreams 实例无法访问共享的数据结构.反正数据结构.共享数据结构只能在单个 JVM 中工作,但使用单个 JVM 可防止应用横向扩展.

If you want to share any data structure between tasks and you have more then one thread, it's your responsibility to synchronize the access to this data structure. Note, that the task-to-thread assignment can change during runtime, and thus, all access must be synchronized. However, this pattern is not recommended as it limits scalability. You should design your program with no shared data structures! The main reason for this is, that your program in general is distributed over multiple machines, and thus, different KafkaStreams instances cannot access a shared data structure anyway. Sharing a data structure would only work within a single JVM but using a single JVM prevents horizontal scale out of your application.

这篇关于Kafka 流处理器线程安全吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆