spring-kafka相关内容

如何在启动上下文之前在内存中加载压缩的主题

我使用的是Kafka中的压缩主题,在应用程序启动时将其加载到HashMap中。 然后,我监听消息的普通主题,并使用从压缩的主题构造的HashMap来处理它们。 在开始收听其他主题之前,如何确保压缩的主题已完全阅读且HashMap已完全初始化? (RestController相同) 推荐答案 实现SmartLifecycle,将地图加载到start()中。请确保phase早于任何其他需 ..
发布时间:2022-05-06 14:54:20 其他开发

如何获得KafkaListener消费者

我使用SpringBoot 2.0.4.RELEASE的Spring-Kafka。 并使用KafkaListener获取消息 现在我要重置我的组的偏移量 但我不知道如何获得群的消费者 @KafkaListener(id="test",topics={"test"},groupId="group",containerFactory="batchContainerFac ..
发布时间:2022-05-06 14:48:11 其他开发

创建不带注释的KafkaListener&;不带弹簧靴子

我正在尝试为一个主题创建一个Kafka Consumer,而不使用@KafkaListener注释。我之所以要这样做,是因为我试图在不使用SpringBoot的情况下,基于Application.Properties动态创建侦听器。 我想最好的方法是手动创建一个KafkaListenerContainerFactory。谁能提供一个在它自己的类中如何做到这一点的例子吗? 推荐答案 ..
发布时间:2022-05-06 14:43:50 其他开发

多个消费者使用春卡夫卡

我希望在我的应用程序中设置多个Kafka主题侦听器。下面是我的设置。它应该由两个组使用,但只由一个监听器使用。我在这里错过了什么? @Bean public Map consumerConfigs() { Map props = new HashMap(); props.put(C ..
发布时间:2022-05-06 14:30:40 其他开发

春天·卡夫卡听着regex

我正在尝试使用以下代码收听新创建的主题,但不起作用。您能告诉我下面的代码是否正确吗? public class KafkaMessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageListener.class); private final Proce ..
发布时间:2022-05-06 14:01:49 Java开发

卡夫卡客户(消费者/生产商)在崩溃后恢复

在我工作的公司,我们在没有身份验证的情况下使用Spring for Kafka,最近我们做了一些实验来设置Kafka中的安全性,并且我们在短时间内启用了身份验证,这导致了我们微服务中所有消费者/生产者的崩溃!(微服务保持正常) 异常: Authorization Exception and no authorizationExceptionRetryInterval set org. ..
发布时间:2022-05-06 13:42:41 其他开发

我可以在运行时向我的@kafkalistener添加主题吗

我已经为主题数组创建了一个Bean,并在运行时向该主题数组添加了一些主题,但使用者没有更新主题,仍然从主题数组中的第一个主题开始消费。我希望消费者添加这些新主题并开始使用它们 @Autowired private String[] topicArray; @KafkaListener(topics = "#{topicArray}", groupId = "MyGroup") pu ..
发布时间:2022-05-06 13:33:49 其他开发

Spring Cloud Kafka Streams中的错误处理

我使用的是Spring Cloud Stream和Kafka Stream。假设我有一个处理器,它是一个将字符串的KStream转换为CityProgrammes的KStream的函数。它调用一个API来按名称查找City,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是转换过程中发生任何错误,整个应用程序都会停止。我想把这一条特别的信息发送给DLQ,然后继续前进。我已经读了几天了 ..

如何配置Spring Boot Kafka客户端以使其不尝试连接

这与Is there a "Circuit Breaker" for Spring Boot Kafka client?有关,但我仍然认为这是一个不同的问题:) 我们需要配置Spring Boot Kafka客户端,以便它根本不会尝试连接。 用例是,在测试环境中,我们没有运行Kafka,但我们仍然需要构建完整的Spring Boot上下文,因此使该Bean以配置文件为条件是不起作用的。 ..

如何在 spock 中测试 ListenableFuture 回调

几天前我问了一个关于从 kafka.send() 方法存根未来响应的问题.@kriegaex 此处回答并正确解释了这一点虽然我遇到了另一个问题,但我如何测试这个未来响应的 onSuccess 和 onFailure 回调.这是正在测试的代码. import org.springframework.kafka.core.KafkaTemplate;导入 org.springframework.ka ..
发布时间:2021-12-23 16:21:36 其他开发

Kubernetes 中的 Kafka - 将协调器标记为组死亡

我对 Kubernetes 还很陌生,想用它来设置 Kafka 和 zookeeper.我能够使用 StatefulSets 在 Kubernetes 中设置 Apache Kafka 和 Zookeeper.我关注了 this 和 this 来构建我的清单文件.我分别制作了 1 个 kafka 和 zookeeper 的副本,并且还使用了持久卷.所有 Pod 都在运行并准备就绪. 我试图通 ..