spring-kafka相关内容

使用 Kafka 实现 STOMP 协议

我正在使用 websocket 协议和 STOMP 作为消息协议为网络平台开发聊天模块. 这是我第一次使用任何消息代理,而 Kafka 是在公司(我正在为其工作的)网络平台上使用的,我猜是用于其他模块.之前我刚开始使用 RabbitMQ,现在我必须切换到 Kafka.我在RabbitMQ的网站上看到一整篇关于如何使用STOMP的文章,但是在Kafka的官网上却没有. 但我已经探索了其他 ..
发布时间:2021-11-12 03:01:40 Java开发

如何使用 Spring Kafka 检查 Kafka 是否为空?

当我重新启动我的应用程序时,我需要处理 Kafka 中剩余的消息,直到它为空,然后我的应用程序应该继续正常工作.我的问题是如何检查 Kafka 主题是否为空.我正在使用 Spring Kafka. 解决方案 您可以简单地向具有唯一标识符或特殊类型(例如 type: RESTART)的主题发送特殊的“重启"消息并配置您的 Kafka 侦听器以在收到“重​​启"消息时通知您的应用程序(例如,通 ..
发布时间:2021-11-12 02:58:39 其他开发

通过运行 Docker-Compose 和 Spring Boot 的 IP 地址访问远程主机中的 Kafka

我有这个 docker-compose.yml,我在其中运行 Zookeeper、Kafka、Kafka Connect 和 KafDrop,问题是,当我在本地运行时,我可以从我的 Spring Boot 应用程序连接以使用一些主题消息. 我需要的是在 Linux 机器上运行相同的配置,并且能够以相同的方式从 Spring Boot 应用程序连接. 当在 Linux 机器上远程运行它时 ..
发布时间:2021-11-12 02:57:10 其他开发

在 Kafka Broker 宕机的情况下处理失败

我有一个 Kafka 代理运行,消息被成功消费,但我想处理 Kafka 代理在 Kafka 消费者端关闭的情况. 我已阅读 this 主题,但我知道日志显示在调试级别.我想知道我是否可以在事件触发器上手动处理这个可能是因为我想自己处理 Kafka 代理的故障.Spring Kafka 是否提供了一些东西来处理这种情况? 如果需要更多详细信息,请告诉我.我非常感谢任何有关这方面的建议,这 ..
发布时间:2021-11-12 02:56:59 Java开发

成功批量插入后更新Kafka提交偏移量

我有一个 spring-kafka 消费者,它读取记录并将它们交给缓存.定时任务会定期清除缓存中的记录.我只想在批处理已成功保存到数据库中后更新 COMMIT OFFSET.我尝试将确认对象传递给缓存服务以调用确认方法,如下所示. public class KafkaConsumer {@KafkaListener( topicPattern = "${kafka.topicpattern}", ..
发布时间:2021-11-12 02:54:04 Java开发

在 Kafka Broker 宕机的情况下处理失败

我有一个 Kafka 代理运行,消息被成功消费,但我想处理 Kafka 代理在 Kafka 消费者端关闭的情况. 我已阅读 this 主题,但我知道日志显示在调试级别.我想知道我是否可以在事件触发器上手动处理这个可能是因为我想自己处理 Kafka 代理的故障.Spring Kafka 是否提供了一些东西来处理这种情况? 如果需要更多详细信息,请告诉我.我非常感谢任何有关这方面的建议,这 ..
发布时间:2021-11-12 02:53:52 Java开发

如何为托管在 Kubernetes 中的多个应用程序选择 Kafka 事务 ID?

我有一个经典的微服务架构.所以,有不同的应用.每个应用程序可能有 1..N 个实例.系统部署到Kubernetes. 所以,我们有很多不同的PODs,可以随时启动和停止. 我想实现 read-process-write 模式,所以我需要卡夫卡交易. 要配置事务,我需要为每个 Kafka 生产者设置一些 transaction id.(实际上,我需要 transaction-id-pre ..

当与 Apache Kafka 服务器的连接丢失时,如何使用 Spring Kafka Listener 停止微服务?

我目前正在实现一个微服务,它从 Apache Kafka 主题读取数据.我正在为微服务使用“spring-boot, version: 1.5.6.RELEASE",为同一个微服务中的侦听器使用“spring-kafka, version: 1.2.2.RELEASE".这是我的 kafka 配置: @Bean公共地图消费者配置(){return new HashMap() {{把(Consu ..
发布时间:2021-11-12 02:51:54 其他开发

Spring Kafka 是 Acknowledgement.acknowledge 线程安全的吗?

我正在实施一个基于 kafka 的应用程序,我想手动确认传入的消息.架构迫使我在一个单独的线程中完成它. 问题是:在与消费者不同的线程中执行 Acknowledgement.acknowledge() 是否可能且安全? 解决方案 是的,只要你使用 MANUAL 而不是 MANUAL_IMMEDIATE,但我不认为你会得到你所期望的. Kafka 不跟踪每条消息,只跟踪分区内的偏 ..
发布时间:2021-11-12 02:51:45 其他开发

偏移量的异步自动提交失败

我有一个关于 Kafka 自动提交机制的问题.我正在使用启用了自动提交的 Spring-Kafka.作为一个实验,我在系统空闲时断开了消费者与 Kafka 的连接 30 秒(主题中没有新消息,没有消息正在处理).重新连接后,我收到了一些消息,如下所示: 偏移量异步自动提交 {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leade ..
发布时间:2021-11-12 02:51:31 其他开发

Spring Kafka KafkaTemplate.flush() 需要吗?

我是第一次使用 Spring kafka,我使用 spring kafka 创建了生产者和消费者.我的 kafka 服务器在本地主机上运行,​​并创建了一个名为 test 的主题.我无法通过简单地调用 来向消费者发送消息 KafkaTemplate.send(topicName,Data); 在同一对象上调用 send 后,我不得不在 kafkaTemplate 上调用 flush() , ..
发布时间:2021-11-12 02:51:16 Java开发

如何在 Spring Kafka 中以编程方式设置 JsonDeserializer TypeValue 方法

所以我一直无法仅使用 yaml 为 JsonSerializer 配置 JavaType 方法.还不确定原因,但同时我如何以编程方式设置它? 我在文档中看到了它的代码,但是这段代码到底需要在哪里运行? Spring Kafka:JsonDeserializer 不接受 TRUSTED_PACKAGE 配置 这是我目前尝试过的.Kafka Spring Deserialzer re ..
发布时间:2021-11-12 02:50:52 Java开发

偏移量的异步自动提交失败

我有一个关于 Kafka 自动提交机制的问题.我正在使用启用了自动提交的 Spring-Kafka.作为一个实验,我在系统空闲时断开了消费者与 Kafka 的连接 30 秒(主题中没有新消息,没有消息正在处理).重新连接后,我收到了一些消息,如下所示: 偏移量异步自动提交 {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leade ..
发布时间:2021-11-12 02:50:46 其他开发