spring-kafka相关内容

Spring Kafka:关闭容器并使用 ConcurrentKafkaListenerContainerFactory 从特定偏移量读取消息

在我的 spring kafka 应用程序中,我想根据某个调度程序的输入在运行时触发消费者.调度程序将告诉侦听器它可以从哪个主题开始消费消息.有带有自定义 ConcurrentKafkaListenerContainerFactory 类的 springboot 应用程序.我需要执行三项任务: 关闭容器,在成功读取主题上的所有可用消息后. 它将在数据库或文件系统中存储当前偏移量. 下次消 ..
发布时间:2021-11-12 03:11:13 Java开发

SpringBoot 批量监听模式 vs 非批量监听模式

我只是好奇 Spring Kafka 中的批处理侦听器模式是否比非批处理侦听器模式提供更好的性能?如果我们正在处理异常,那么我们仍然需要在 Batch-listener 模式下处理每条记录.非批处理似乎不易出错、稳定且可定制. 请分享您对此的看法,因为我没有找到任何好的比较. 解决方案 这完全取决于你的听众对数据做了什么. 如果它循环处理每条记录,则没有任何好处;您也可以让容器 ..
发布时间:2021-11-12 03:10:51 其他开发

消费者如何知道它不再列在 Kafka 集群中?

我们有一个问题,即当 Kafka 代理必须离线时,没有任何消费者服务对此有任何了解并继续运行. 我们尝试在新的 Kafka 实例中列出消费者,但没有看到那里列出现有消费者.列出的所有消费者都是新创建的消费者. 每次遇到此问题时,我们都必须手动终止所有现有的消费者服务,这很不方便. 问题 - 消费者如何知道它不再列在 Kafka 集群中,因此它应该自行终止? 附言我们使用 S ..
发布时间:2021-11-12 03:10:36 其他开发

配置 ReplyingKafkaTemplate 以获取来自多个主题的响应

我有一个要求,我必须对 Kafka 使用同步请求-回复模式,因此我使用 ReplyingKafkaTemplate. 作为实现的一部分,有一个生产者正在推送关于一个主题的请求消息(input-message-topic1),但作为回报,我期待来自两个主题的响应(output-message-topic1 和 output-message-topic2),我必须进一步汇总和处理. 问题: ..
发布时间:2021-11-12 03:10:22 其他开发

如何将消息从单个生产者发布到两个主题并在 Kafka 中的单个侦听器中消费

我正在尝试从单个生产者发布两个不同主题的消息. 这里我创建了两个主题: @Bean公共新话题 multi1() {返回 TopicBuilder.name("multi1").partitions(1).build();}@豆角,扁豆公共新话题 multi2() {返回 TopicBuilder.name("multi2").partitions(1).build();} 这是我向两个 ..

在读取所有消息直到某个特定时间后停止 KafkaListener(Spring Kafka Consumer)

我正在尝试从单个分区主题安排我的消费过程.我可以使用 endpointlistenerregistry.start() 启动它,但我想在消耗了当前分区中的所有消息后停止它,即当我到达当前分区中的最后一个偏移量时.在我完成消费并关闭它之后,生产进入主题.我应该如何确保我已经阅读了所有消息,直到我启动调度程序并停止我的消费者?我正在为消费者使用 @Kafkalistener. 解决方案 设置 ..
发布时间:2021-11-12 03:07:58 其他开发

如何在 Spring Kafka 中包含用于反序列化的类型元数据

我正在 Spring Kafka 中的 Listener 上进行反序列化.但这假设类型信息是由 Spring Kafka 生产者包含或发送的.在我的例子中,Json 是由 Debezium MySQLConnector 发送的,它没有添加这个元数据.所以我想把它添加到请求中.我了解它放置在 JsonSerializer 中某个位置的请求中,并且我查看了源代码,但无法弄清楚如何在序列化期间将元数据类 ..

使用 Kafka 实现 STOMP 协议

我正在使用 websocket 协议和 STOMP 作为消息协议为网络平台开发聊天模块. 这是我第一次使用任何消息代理,而 Kafka 是在公司(我正在为其工作的)网络平台上使用的,我猜是用于其他模块.之前我刚开始使用 RabbitMQ,现在我必须切换到 Kafka.我在RabbitMQ的网站上看到一整篇关于如何使用STOMP的文章,但是在Kafka的官网上却没有. 但我已经探索了其他 ..
发布时间:2021-11-12 03:07:49 Java开发

如何使用 ABSwitchCluster 在 Kafka 集群之间进行故障转移

我有一个应用程序,它使用 Kafka 在实例之间同步数据,因此它从 Kafka 生成和使用数据,此外,该应用程序正在使用 Kafka 主题并将该数据转换并流式传输到另一个主题供客户端使用. 我的应用程序有两个用于故障转移的集群.通过 Kafka 文档,我发现了这个https://docs.spring.io/spring-kafka/docs/current/reference/html/# ..
发布时间:2021-11-12 03:07:41 Java开发

Spring Boot Kafka Consumer 不消费,Kafka Listener 不触发

我正在尝试构建一个简单的 Spring Boot Kafka Consumer 来使用来自 kafka 主题的消息,但是没有消息被使用,因为 KafkaListener 方法没有被触发. 我在其他答案中看到,以确保 AUTO_OFFSET_RESET_CONFIG 设置为“最早",并且 GROUP_ID_CONFIG 是唯一的,我这样做了,但是仍然没有触发 KafkaListenerMeth ..
发布时间:2021-11-12 03:07:17 Java开发

KafkaMessageListenerContainer在servicedown异常时重新处理消息

我们希望以这样的方式配置 Spring Kafka 侦听器,如果任何外部服务关闭,我们不希望丢失从 Kafka 消费的消息.我们希望将其还原,直到它被成功处理. 能否请您帮助我进行配置以实现相同的目的. 批量消费消息如何处理. 我们使用的是 Kafka 0.9 解决方案 我认为 重试最符合您的要求: 为了重试传递,提供了方便的侦听器适配器 - RetryingMes ..
发布时间:2021-11-12 03:06:51 其他开发

为什么spring KafkaConsumer在授权失败时暂停所有来自n个主题的消费

我正在使用 spring-kafka 2.3.6.RELEASE 库来消费许多 kafka 主题.在界面允许的情况下,我正在使用单个侦听器来使用所有主题.我已经注意到,如果单个主题无法授权,KafkaConsumer 实例将挂起其线程,停止所有主题的消费,直到重试间隔结束. 这对我来说似乎很脆弱,特别是对于专注于弹性和并行性的技术. 有没有其他人观察到这种行为,如果有,除了编写我们自己 ..
发布时间:2021-11-12 03:05:44 其他开发

在读取所有消息直到某个特定时间后停止 KafkaListener(Spring Kafka Consumer)

我正在尝试从单个分区主题安排我的消费过程.我可以使用 endpointlistenerregistry.start() 启动它,但我想在消耗了当前分区中的所有消息后停止它,即当我到达当前分区中的最后一个偏移量时.在我完成消费并关闭它之后,生产进入主题.我应该如何确保我已经阅读了所有消息,直到我启动调度程序并停止我的消费者?我正在为消费者使用 @Kafkalistener. 解决方案 设置 ..
发布时间:2021-11-12 03:04:33 其他开发

Spring Cloud Kafka:当两个处理器处于活动状态时,无法序列化输出流的数据

我有一个具有函数式编程风格的 Spring Cloud Kafka Streams 的工作设置.有两个用例,通过 application.properties 进行配置.它们都单独工作,但是一旦我同时激活它们,我就会收到第二个用例的输出流的序列化错误: 线程异常“ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1 ..
发布时间:2021-11-12 03:03:49 Java开发

卡夫卡模板和卡夫卡生产者有什么区别?

正如我所见,Kafka 模板在内部使用了 Kafka 生产者.我只想知道确切的区别是什么.此外,与 Kafka 生产者相比,我发现 Kafka 模板中有许多可用的 send() 方法. 请帮我解决.如果有人知道更多. 解决方案 生产者是模式,而 KafkaTemplate 包装了一个生产者实例,并提供了向 Kafka 主题发送消息的便捷方法.(来源) Kafka Producer ..

使用 appication.yml/properties 配置具有不同属性的 Spring Boot Kafka 多个消费者

我见过一些例子,我们有一个 java 配置类,我们定义了多个 KafkaListenerContainer 并将所需的 containerType 传递给 @kafkaListener.但我正在探索是否有任何方法可以通过 appication.yml/properties 使用 Spring Boot 自动 Kafka 配置来实现相同的目标. 解决方案 否;Boot 只会自动配置一组基础设 ..
发布时间:2021-11-12 03:03:34 其他开发