spring-kafka相关内容

使用Spring EL将可选的后缀从属性添加到@KafkaListener中的Consumer Group

我有一个简单的适用于Kafka消费者的Spring Boot应用程序,如下所示 @KafkaListener(topics="topic", groupId="SOME_CONSTANT") { .... } 我需要做的是添加可选的Spring Boot属性(从环境变量,但这并不重要),假设: myapp.env: TEST 当该变量存在时,我应该自动将消费者组更新为 SOME_CON ..
发布时间:2022-08-15 09:51:23 其他开发

打造消费者动态春天卡夫卡

我正在创建一个与另一个服务对话的服务,以确定要收听的卡夫卡主题。卡夫卡主题可能有不同的键和值类型。因此,我希望为每个配置(主题、键类型、值类型)动态创建不同的Kafka消费者,其中配置仅在运行时才知道。然而,在春季的Kafka中,我看不到动态传递所有这些参数的方法(至少我不知道有)。我应该如何进行这项工作。 推荐答案 只需在运行时创建新的侦听器容器。 https://docs.s ..
发布时间:2022-08-13 15:40:48 其他开发

Spring-Kafka使用Spring Boot版本2.3.7进行批量错误处理

我正在尝试执行Spring Kafka批处理错误处理。首先,我有几个问题。 监听器错误处理程序和容器错误处理程序有什么区别,这两个类别有哪些错误? 您能帮助一些样本更好地了解这一点吗? 这是我们的设计: 每隔一定时间间隔轮询 批量消费消息 基于键推送到本地缓存(应用缓存)(避免重复事件) 批处理完成后,将所有值逐个推送到另一个主题。 操作3完成后清除缓存并手动确认偏 ..
发布时间:2022-07-19 10:19:31 其他开发

我们可以在春靴中使用多个卡夫卡模板吗?

在我的Spring Boot Kafka发布应用程序中,我希望提供对以字符串(Json)或字节格式发布消息的支持,因为我希望同时支持json和avro。但是春装中的卡夫卡模板让我们只能定义其中的一个模板。有没有办法同时使用两个模板或任何其他方式来同时支持JSON和Avro? KafkaTemplate只适用于字符串,但我也想发布Avro,它应该类似于Kafka ..
发布时间:2022-07-18 17:13:45 其他开发

Spring Kafka消费者客户端-ID配置

我有两个Kafka侦听器组件,每个侦听不同的主题,并期待不同的有效负载。 我的问题是,我可以对两者使用相同的客户端ID,还是必须不同?如果客户ID必须不同,我想了解一个可以有效使用客户ID的用例。 推荐答案 根据文档: 发出请求时传递给服务器的id字符串。这样做的目的是允许在服务器端请求记录中包括逻辑应用程序名称,从而能够跟踪请求的来源,而不仅仅是IP/端口。 所以,从技术 ..
发布时间:2022-05-06 19:55:01 其他开发

单记录卡夫卡消费者和卡夫卡批量消费者的基本区别是什么?

我正在使用Spring-Kafka 2.2.8,试图了解单记录消费者和批量消费者之间的主要区别。 据我所知,从一个主题中读取消息/字节对于单个记录使用者和批处理使用者来说没有什么不同。唯一的区别是如何提交偏移量。并因此进行错误处理。我的理解正确吗?请确认。 推荐答案 使用基于记录的监听程序,轮询返回的记录一次传递给监听程序。可以将容器配置为一次提交一个偏移量,或在处理完所有记录之 ..
发布时间:2022-05-06 19:35:06 其他开发

多个KafkaListener类可以收听同一主题吗?

我有一个包含多个事件(不同类型)的Kafka主题,我想在单个应用程序中在不同的处理程序类中处理这些事件。所以我的问题是--我是否可以创建两个使用相同主题的类(Spring组件),但每个类处理不同的事件(来自同一主题)? @Component @KafkaListener(topics = "topicA") public class SomeClass { @KafkaHandle ..
发布时间:2022-05-06 19:25:11 其他开发

用Spring Kafka对消息进行加密和解密

我使用的是Spring Kafka,其中一个主题包含带有个人数据的消息。有没有什么办法可以将Spring Kafka配置为自动加密生产者中的消息/解密消费者中的消息,或者必须手动完成? 推荐答案 Spring或Kafka中没有内置任何内容(尽管您可以在网络上使用SSL来防止窥探。 对于应用程序级加密/解密,您需要实现它。 您可以使用ProducerInterceptor和C ..
发布时间:2022-05-06 19:17:32 其他开发

如何在春靴中按顺序消费卡夫卡话题

我有一个问题,我让一个阿帕奇Kafka消费者在Spring Boot中消费了3个不同的主题。但我需要先使用第一个主题中的所有数据,然后再使用以下主题中的数据,有什么方法可以做到这一点吗?或者你会一直以同样的方式阅读它们吗? @Component public class KafkaTestListener { @KafkaListener(topics = "${message.topic ..
发布时间:2022-05-06 19:10:38 其他开发

Kafka侦听器并发-如何处理从6个线程激发的停止/空闲事件

我每天/每周都有两个卡夫卡听众。Daily的AutoStartup=TRUE并且 每周的AutoStartup=False。我有一个终结点,可以停止正在运行的Daily并启动Weekly。一旦Weekly消费完消息,我就等待IDLE事件(设置为1分钟)触发,然后停止Weekly。现在我正在收听《我每天开始的周刊》上的停止活动。现在的问题是我的并发设置为6。所以我得到了6个空闲事件和6个停止事件。我 ..
发布时间:2022-05-06 18:49:34 其他开发

春季的卡夫卡消费者我可以通过编程重新分配分区吗?

我刚接触Kafka,并且使用@KafkaListener(Spring)来定义Kafka消费者。 我想检查是否可以在运行时手动将分区分配给使用者。 例如,当应用程序启动时,我不想使用任何数据。我目前正在使用@KafkaListener(autoStartup=false ... )用于该目的。 在某个时刻,我应该(从应用程序的另一部分)收到包含要处理的分区ID的通知,因此我希望跳过该分区的 ..
发布时间:2022-05-06 18:43:20 Java开发

当产生的消息是消费者没有引用的模型类型时,如何反序列化消费者中的消息?

我正在尝试反序列化从Conumers作用域之外的服务生成的消息。 如果序列化/反序列化模型包不同,则Kafka模型不在受信任的包中。会出现错误 所以我想我有两个选择。或者以json/字符串的形式生成消息,或者让消费者包含生产者库并使用相同的模型来反序列化消息。 我想在消费者中反序列化时手动忽略消息的标头类型,但我未能做到这一点。有没有办法做到这一点,或者我是否坚持使用上述两个选项? ..
发布时间:2022-05-06 18:32:36 其他开发