spring-kafka相关内容
我有一个简单的适用于Kafka消费者的Spring Boot应用程序,如下所示 @KafkaListener(topics="topic", groupId="SOME_CONSTANT") { .... } 我需要做的是添加可选的Spring Boot属性(从环境变量,但这并不重要),假设: myapp.env: TEST 当该变量存在时,我应该自动将消费者组更新为 SOME_CON
..
我正在创建一个与另一个服务对话的服务,以确定要收听的卡夫卡主题。卡夫卡主题可能有不同的键和值类型。因此,我希望为每个配置(主题、键类型、值类型)动态创建不同的Kafka消费者,其中配置仅在运行时才知道。然而,在春季的Kafka中,我看不到动态传递所有这些参数的方法(至少我不知道有)。我应该如何进行这项工作。 推荐答案 只需在运行时创建新的侦听器容器。 https://docs.s
..
我苦苦寻找此功能的过程在令人作呕的日志问题Several last offsets aren't getting commited with reactive kafka中得到了充分的描述,它显示了我多次尝试不同的失败。 如何订阅ReactiveKafkaConsumerTemplate,它将以同步方式处理记录(为简单起见),并将每2秒确认/提交一次,并在手动取消
..
我正在尝试执行Spring Kafka批处理错误处理。首先,我有几个问题。 监听器错误处理程序和容器错误处理程序有什么区别,这两个类别有哪些错误? 您能帮助一些样本更好地了解这一点吗? 这是我们的设计: 每隔一定时间间隔轮询 批量消费消息 基于键推送到本地缓存(应用缓存)(避免重复事件) 批处理完成后,将所有值逐个推送到另一个主题。 操作3完成后清除缓存并手动确认偏
..
在我的Spring Boot Kafka发布应用程序中,我希望提供对以字符串(Json)或字节格式发布消息的支持,因为我希望同时支持json和avro。但是春装中的卡夫卡模板让我们只能定义其中的一个模板。有没有办法同时使用两个模板或任何其他方式来同时支持JSON和Avro? KafkaTemplate只适用于字符串,但我也想发布Avro,它应该类似于Kafka
..
我希望使用Spring Kafka API实现有状态监听器。 提供以下信息: ConCurrentKafkaListenerContainerFactory,并发设置为“n” Spring@Service类上的@KafkaListener批注方法 然后创建“n”个KafkaMessageListenerContainers。它们中的每一个都将有自己的KafkaConsumer,
..
我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器中。该处理器是用Kafka函数编写的。我查看了suggestion provided by Artem Bilan以将StreamsUncaughtExceptionHandler包括到我的服务中,但我的异常从未被它捕获/处理。 配置Bean: @Autowired UnCaughtE
..
我正在使用@KafkaListener注释编写一个Kafka使用者,我知道有一种方法可以使用ConCurentKafkaListenerContainerFactory中的方法增加来自不同分区的并发Kafka使用者的数量 e.g. factory.setConcurrency(3); setconency的Javadoc如下所示:- 并发运行的KafkaMessageListene
..
我有两个Kafka侦听器组件,每个侦听不同的主题,并期待不同的有效负载。 我的问题是,我可以对两者使用相同的客户端ID,还是必须不同?如果客户ID必须不同,我想了解一个可以有效使用客户ID的用例。 推荐答案 根据文档: 发出请求时传递给服务器的id字符串。这样做的目的是允许在服务器端请求记录中包括逻辑应用程序名称,从而能够跟踪请求的来源,而不仅仅是IP/端口。 所以,从技术
..
我使用的是Spring-Kafka 2.2.8,并编写了一个简单的异步生成器,设置如下: linger.ms : 300000, batch.size: 33554431, max.block.ms: 60000. 现在我正在通过调用下面的构造函数创建一个KafkaTemplate,并将AutoFlush设置为FALSE public KafkaTemplate(Produce
..
我正在使用Spring-Kafka 2.2.8,试图了解单记录消费者和批量消费者之间的主要区别。 据我所知,从一个主题中读取消息/字节对于单个记录使用者和批处理使用者来说没有什么不同。唯一的区别是如何提交偏移量。并因此进行错误处理。我的理解正确吗?请确认。 推荐答案 使用基于记录的监听程序,轮询返回的记录一次传递给监听程序。可以将容器配置为一次提交一个偏移量,或在处理完所有记录之
..
我有一个包含多个事件(不同类型)的Kafka主题,我想在单个应用程序中在不同的处理程序类中处理这些事件。所以我的问题是--我是否可以创建两个使用相同主题的类(Spring组件),但每个类处理不同的事件(来自同一主题)? @Component @KafkaListener(topics = "topicA") public class SomeClass { @KafkaHandle
..
我使用的是Spring Kafka,其中一个主题包含带有个人数据的消息。有没有什么办法可以将Spring Kafka配置为自动加密生产者中的消息/解密消费者中的消息,或者必须手动完成? 推荐答案 Spring或Kafka中没有内置任何内容(尽管您可以在网络上使用SSL来防止窥探。 对于应用程序级加密/解密,您需要实现它。 您可以使用ProducerInterceptor和C
..
我有一个问题,我让一个阿帕奇Kafka消费者在Spring Boot中消费了3个不同的主题。但我需要先使用第一个主题中的所有数据,然后再使用以下主题中的数据,有什么方法可以做到这一点吗?或者你会一直以同样的方式阅读它们吗? @Component public class KafkaTestListener { @KafkaListener(topics = "${message.topic
..
我使用的是Spring-kafka 2.2.8,并将并发设置为2,如下所示,并尝试了解如何在满足特定条件时暂停使用者线程/实例。 @KafkaListener(id = "myConsumerId", topics = "myTopic", concurrency=2) public void listen(String in) { System.out.printl
..
使用Spring的Java Configer类的消费者如下: @Configuration @EnableKafka public class KafkaConfig { public static final String TOPIC = "test-1"; private String bootstrapServers = "localhost:9092";
..
我每天/每周都有两个卡夫卡听众。Daily的AutoStartup=TRUE并且 每周的AutoStartup=False。我有一个终结点,可以停止正在运行的Daily并启动Weekly。一旦Weekly消费完消息,我就等待IDLE事件(设置为1分钟)触发,然后停止Weekly。现在我正在收听《我每天开始的周刊》上的停止活动。现在的问题是我的并发设置为6。所以我得到了6个空闲事件和6个停止事件。我
..
我刚接触Kafka,并且使用@KafkaListener(Spring)来定义Kafka消费者。 我想检查是否可以在运行时手动将分区分配给使用者。 例如,当应用程序启动时,我不想使用任何数据。我目前正在使用@KafkaListener(autoStartup=false ... )用于该目的。 在某个时刻,我应该(从应用程序的另一部分)收到包含要处理的分区ID的通知,因此我希望跳过该分区的
..
我正在使用Spring Kafka实现同步请求回复模式。 堆栈: org.springframework.cloud:spring-cloud-dependencies:2020.0.2 org.springfrawork.kafka:Spring-kafka io.confluent:kafka-avro-serializer:6.2.0 Java 11 我有一个包
..
我正在尝试反序列化从Conumers作用域之外的服务生成的消息。 如果序列化/反序列化模型包不同,则Kafka模型不在受信任的包中。会出现错误 所以我想我有两个选择。或者以json/字符串的形式生成消息,或者让消费者包含生产者库并使用相同的模型来反序列化消息。 我想在消费者中反序列化时手动忽略消息的标头类型,但我未能做到这一点。有没有办法做到这一点,或者我是否坚持使用上述两个选项?
..