spring-kafka相关内容

使用自定义配置镜像 kafka 主题作为独立应用程序

我想将一些主题从一个经纪人镜像到另一个经纪人.只是所有主题的一部分.有一个现有的 MirrorMaker 工具可以做到这一点.但我也想更改目标主题名称.此外,自定义消息处理程序 已经做到了.然而,它不符合我的需求. 对我有几个要求: 镜像主题,可以为每个目标主题提供覆盖 动态检测新的源主题 将其作为独立应用程序(fe Java、Spring Boot)运行,而不是 CLI 方法 ..
发布时间:2021-11-12 03:18:03 其他开发

2019 年 8 月 - Kafka Consumer Lag 以编程方式

有什么方法可以通过编程方式找到 Kafka Consumer 中的延迟.我不希望在仪表板上安装和检查外部 Kafka Manager 工具. 我们可以列出所有消费者组并检查每个组的延迟. 目前我们确实有命令来检查延迟,它需要Kafka所在的相对路径. Spring-Kafka、kafka-python、Kafka Admin 客户端或使用 JMX - 有什么方法可以编码并找出延迟 ..
发布时间:2021-11-12 03:17:17 其他开发

如何将消息从单个生产者发布到两个主题并在 Kafka 中的单个侦听器中消费

我正在尝试从单个生产者发布两个不同主题的消息. 这里我创建了两个主题: @Bean公共新话题 multi1() {返回 TopicBuilder.name("multi1").partitions(1).build();}@豆角,扁豆公共新话题 multi2() {返回 TopicBuilder.name("multi2").partitions(1).build();} 这是我向两个 ..

Kafka 批量监听器,轮询固定数量的记录(尽可能多)

我使用的是 Spring Boot 版本 1.5.4.RELEASE &spring Kafka 版本 1.3.8.RELEASE. 我的 Kafka 消费者正在以 100 个块为单位进行批处理.我尝试使用的主题有 10 个分区 &我确实有 10 个 Kafka 消费者实例. 除了特定分区中的最后一个块外,我是否可以强制获得 100 个固定数量的记录(尽可能多). 解决方案 K ..
发布时间:2021-11-12 03:16:56 其他开发

使用 ReplyingKafkaTemplate 处理多个响应

我正在尝试实现一种回复响应模式,在该模式中,我向多个消费者组收听的主题发布消息.这意味着他们都会收到消息并在回复主题中提交回复. 问题是因为他们都回复同一个消息,只有回复主题中第一个收到的消息才会被回复.其他的将被丢弃.鉴于我知道我应该在回复主题上得到多少回复(调用那个数字-n),我怎样才能让 ReplyingKafkaTemplate 等待 n 个回复然后解决答案?我试过从 Spring ..
发布时间:2021-11-12 03:16:35 其他开发

如何找到Kafka消息的处理时间?

我有一个运行 Kafka 消费者的应用程序,我想监控从该主题消费的每条消息的处理时间.该应用程序是一个 Spring 启动应用程序,并使用微米注册表将 Kafka 消费者指标公开给 Spring Actuator Prometheus 端点.我可以使用 kafka_consumer_commit_latency_avg_seconds 或 kafka_consumer_commit_latency ..

SpringBoot 批量监听模式 vs 非批量监听模式

我只是好奇 Spring Kafka 中的批处理侦听器模式是否比非批处理侦听器模式提供更好的性能?如果我们正在处理异常,那么我们仍然需要在 Batch-listener 模式下处理每条记录.非批处理似乎不易出错、稳定且可定制. 请分享您对此的看法,因为我没有找到任何好的比较. 解决方案 这完全取决于你的听众对数据做了什么. 如果它循环处理每条记录,则没有任何好处;您也可以让容器 ..
发布时间:2021-11-12 03:15:31 其他开发

Spring Kafka:在运行时订阅新的主题模式

我正在使用注解 @KafkaListener 在我的应用程序中使用主题.我需要在运行时更改已经运行的使用者的主题模式,以便可以使用与新模式匹配的新主题. 我尝试了下面的代码,但它仍然使用与旧主题模式匹配的主题.在这里,我在应用程序启动时设置了“旧主题模式".然后,我使用 Spring @Scheduler 每 10 秒将模式更新为“new-topic-pattern". 类“KafkaTo ..
发布时间:2021-11-12 03:14:33 Java开发

kafka消息的多种实现(java泛型)

我正在尝试根据事件类型为 kafka 事件创建多个实现类. 公共类KafkaListener {@自动连线服务服务;@KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")公共无效消费来源(对象事件){服务进程(事件);}}公共接口 Service{无效过程(E事件);} ..
发布时间:2021-11-12 03:14:27 Java开发

Spring Kafka Consumer 将消息作为 LinkedHashMap 消费,因此自动将 BigDecimal 转换为 double

我正在使用基于注解的 spring kafka 监听器来消费 kafka 消息,代码如下 使用员工对象 Class Employee{私人字符串名称;私有字符串地址;私人对象帐户;//获取者//二传手} Account 对象在运行时决定它是 Saving Account 还是 Current Account 等. Class SavingAcc{私人 BigDecimal 余额 ..

配置 ReplyingKafkaTemplate 以获取来自多个主题的响应

我有一个要求,我必须对 Kafka 使用同步请求-回复模式,因此我使用 ReplyingKafkaTemplate. 作为实现的一部分,有一个生产者正在推送关于一个主题的请求消息(input-message-topic1),但作为回报,我期待来自两个主题的响应(output-message-topic1 和 output-message-topic2),我必须进一步汇总和处理. 问题: ..
发布时间:2021-11-12 03:13:45 其他开发

消费者如何知道它不再列在 Kafka 集群中?

我们有一个问题,即当 Kafka 代理必须离线时,没有任何消费者服务对此有任何了解并继续运行. 我们尝试在新的 Kafka 实例中列出消费者,但没有看到那里列出现有消费者.列出的所有消费者都是新创建的消费者. 每次遇到此问题时,我们都必须手动终止所有现有的消费者服务,这很不方便. 问题 - 消费者如何知道它不再列在 Kafka 集群中,因此它应该自行终止? 附言我们使用 S ..
发布时间:2021-11-12 03:13:39 其他开发

kafka消息的多种实现(java泛型)

我正在尝试根据事件类型为 kafka 事件创建多个实现类. 公共类KafkaListener {@自动连线服务服务;@KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")公共无效消费来源(对象事件){服务进程(事件);}}公共接口 Service{无效过程(E事件);} ..
发布时间:2021-11-12 03:12:51 Java开发

Kafka 批量监听器,轮询固定数量的记录(尽可能多)

我使用的是 Spring Boot 版本 1.5.4.RELEASE &spring Kafka 版本 1.3.8.RELEASE. 我的 Kafka 消费者正在以 100 个块为单位进行批处理.我尝试使用的主题有 10 个分区 &我确实有 10 个 Kafka 消费者实例. 除了特定分区中的最后一个块外,我是否可以强制获得 100 个固定数量的记录(尽可能多). 解决方案 K ..
发布时间:2021-11-12 03:12:28 其他开发