spring-kafka相关内容
我已经通过 maven docker 插件配置了 Kafka + ZooKeeper https://dmp.fabric8.io/: wurstmeister/zookeeper:latest动物园管理员2181:2181wurstmeister/kafka:1.0.0
..
我正在使用 websocket 协议和 STOMP 作为消息协议为网络平台开发聊天模块. 这是我第一次使用任何消息代理,而 Kafka 是在公司(我正在为其工作的)网络平台上使用的,我猜是用于其他模块.之前我刚开始使用 RabbitMQ,现在我必须切换到 Kafka.我在RabbitMQ的网站上看到一整篇关于如何使用STOMP的文章,但是在Kafka的官网上却没有. 但我已经探索了其他
..
我需要在不使用父类的情况下在同一个 Kafka Topic(例如,Foo、Bar、Car ...)上发送不同的 JSON 负载 基于 spring kafka 文档,我可以在类级别使用 @KafkaListener 并在方法级别指定 @KafkaHandler ( doc ) @KafkaListener(topics = "myTopic")静态类 MultiListenerBean {
..
在我的应用程序中,我使用 spring-kafka 来消费来自 kafka 服务器的消息,但是从控制台消费者我得到了所有活动消费者线程的 consumer-id TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID简单测试事件 9 247367 247367 0 p3-S14-0-e6a1d3cb
..
我在 Rest 控制器中使用 ReplyingKafkaTemplate 来返回同步响应.我也在设置标题 REPLY_TOPIC.对于监听器微服务部分, @KafkaListener(topics = "${kafka.topic.request-topic}")@发给公共模型监听(模型 请求)抛出 InterruptedException {SumModel 模型 = request.getR
..
当我重新启动我的应用程序时,我需要处理 Kafka 中剩余的消息,直到它为空,然后我的应用程序应该继续正常工作.我的问题是如何检查 Kafka 主题是否为空.我正在使用 Spring Kafka. 解决方案 您可以简单地向具有唯一标识符或特殊类型(例如 type: RESTART)的主题发送特殊的“重启"消息并配置您的 Kafka 侦听器以在收到“重启"消息时通知您的应用程序(例如,通
..
我有这个 docker-compose.yml,我在其中运行 Zookeeper、Kafka、Kafka Connect 和 KafDrop,问题是,当我在本地运行时,我可以从我的 Spring Boot 应用程序连接以使用一些主题消息. 我需要的是在 Linux 机器上运行相同的配置,并且能够以相同的方式从 Spring Boot 应用程序连接. 当在 Linux 机器上远程运行它时
..
我有一个 Kafka 代理运行,消息被成功消费,但我想处理 Kafka 代理在 Kafka 消费者端关闭的情况. 我已阅读 this 主题,但我知道日志显示在调试级别.我想知道我是否可以在事件触发器上手动处理这个可能是因为我想自己处理 Kafka 代理的故障.Spring Kafka 是否提供了一些东西来处理这种情况? 如果需要更多详细信息,请告诉我.我非常感谢任何有关这方面的建议,这
..
最初通过 api 调用触发 1.服务 A 生成 m1 到 topic1(非事务性发送)2.服务B消费topic1并做一些处理(开始发送)3.服务B产生m2到topic2(提交交易)4.服务A消费topic2(开始发送) 这是我的生产者配置: final Mapprops = Maps.newConcurrentMap();props.put(ProducerConfig.BOOTSTRAP_
..
我有一个 spring-kafka 消费者,它读取记录并将它们交给缓存.定时任务会定期清除缓存中的记录.我只想在批处理已成功保存到数据库中后更新 COMMIT OFFSET.我尝试将确认对象传递给缓存服务以调用确认方法,如下所示. public class KafkaConsumer {@KafkaListener( topicPattern = "${kafka.topicpattern}",
..
我有一个 Kafka 代理运行,消息被成功消费,但我想处理 Kafka 代理在 Kafka 消费者端关闭的情况. 我已阅读 this 主题,但我知道日志显示在调试级别.我想知道我是否可以在事件触发器上手动处理这个可能是因为我想自己处理 Kafka 代理的故障.Spring Kafka 是否提供了一些东西来处理这种情况? 如果需要更多详细信息,请告诉我.我非常感谢任何有关这方面的建议,这
..
我正在尝试在 Spring 中对 kafka 消费者类进行单元测试.我想知道如果 kafka 消息被发送到它的主题,监听器方法被正确调用.我的消费者类是这样注释的: @KafkaListener(topics = "${kafka.topics.myTopic}")public void myKafkaMessageEvent(final String message) { ... 如果我@A
..
我愿意使用 Spring Batch 进行 Kafka 数据消费.这个 spring-tips链接有一个基本示例. 这是我的阅读器: @BeanKafkaItemReaderkafkaItemReader() {var props = new Properties();props.putAll(this.properties.buildConsumerProperties());返回新的
..
我有一个经典的微服务架构.所以,有不同的应用.每个应用程序可能有 1..N 个实例.系统部署到Kubernetes. 所以,我们有很多不同的PODs,可以随时启动和停止. 我想实现 read-process-write 模式,所以我需要卡夫卡交易. 要配置事务,我需要为每个 Kafka 生产者设置一些 transaction id.(实际上,我需要 transaction-id-pre
..
我目前正在实现一个微服务,它从 Apache Kafka 主题读取数据.我正在为微服务使用“spring-boot, version: 1.5.6.RELEASE",为同一个微服务中的侦听器使用“spring-kafka, version: 1.2.2.RELEASE".这是我的 kafka 配置: @Bean公共地图消费者配置(){return new HashMap() {{把(Consu
..
我正在实施一个基于 kafka 的应用程序,我想手动确认传入的消息.架构迫使我在一个单独的线程中完成它. 问题是:在与消费者不同的线程中执行 Acknowledgement.acknowledge() 是否可能且安全? 解决方案 是的,只要你使用 MANUAL 而不是 MANUAL_IMMEDIATE,但我不认为你会得到你所期望的. Kafka 不跟踪每条消息,只跟踪分区内的偏
..
我有一个关于 Kafka 自动提交机制的问题.我正在使用启用了自动提交的 Spring-Kafka.作为一个实验,我在系统空闲时断开了消费者与 Kafka 的连接 30 秒(主题中没有新消息,没有消息正在处理).重新连接后,我收到了一些消息,如下所示: 偏移量异步自动提交 {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leade
..
我是第一次使用 Spring kafka,我使用 spring kafka 创建了生产者和消费者.我的 kafka 服务器在本地主机上运行,并创建了一个名为 test 的主题.我无法通过简单地调用 来向消费者发送消息 KafkaTemplate.send(topicName,Data); 在同一对象上调用 send 后,我不得不在 kafkaTemplate 上调用 flush() ,
..
所以我一直无法仅使用 yaml 为 JsonSerializer 配置 JavaType 方法.还不确定原因,但同时我如何以编程方式设置它? 我在文档中看到了它的代码,但是这段代码到底需要在哪里运行? Spring Kafka:JsonDeserializer 不接受 TRUSTED_PACKAGE 配置 这是我目前尝试过的.Kafka Spring Deserialzer re
..
我有一个关于 Kafka 自动提交机制的问题.我正在使用启用了自动提交的 Spring-Kafka.作为一个实验,我在系统空闲时断开了消费者与 Kafka 的连接 30 秒(主题中没有新消息,没有消息正在处理).重新连接后,我收到了一些消息,如下所示: 偏移量异步自动提交 {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leade
..