spring-kafka相关内容
我想将一些主题从一个经纪人镜像到另一个经纪人.只是所有主题的一部分.有一个现有的 MirrorMaker 工具可以做到这一点.但我也想更改目标主题名称.此外,自定义消息处理程序 已经做到了.然而,它不符合我的需求. 对我有几个要求: 镜像主题,可以为每个目标主题提供覆盖 动态检测新的源主题 将其作为独立应用程序(fe Java、Spring Boot)运行,而不是 CLI 方法
..
有什么方法可以通过编程方式找到 Kafka Consumer 中的延迟.我不希望在仪表板上安装和检查外部 Kafka Manager 工具. 我们可以列出所有消费者组并检查每个组的延迟. 目前我们确实有命令来检查延迟,它需要Kafka所在的相对路径. Spring-Kafka、kafka-python、Kafka Admin 客户端或使用 JMX - 有什么方法可以编码并找出延迟
..
我正在尝试从单个生产者发布两个不同主题的消息. 这里我创建了两个主题: @Bean公共新话题 multi1() {返回 TopicBuilder.name("multi1").partitions(1).build();}@豆角,扁豆公共新话题 multi2() {返回 TopicBuilder.name("multi2").partitions(1).build();} 这是我向两个
..
我正在尝试从特定主题中轮询数据,例如 kafka 正在接收 100 条记录/秒但大多数时候它不会获取所有记录.我使用超时为 5000 毫秒,我每隔 100ms 调用此方法注意:我也订阅了特定主题 @Scheduled(fixedDelayString = "100") public void pollRecords() {ConsumerRecords记录 =LeadConsumer.p
..
我使用的是 Spring Boot 版本 1.5.4.RELEASE &spring Kafka 版本 1.3.8.RELEASE. 我的 Kafka 消费者正在以 100 个块为单位进行批处理.我尝试使用的主题有 10 个分区 &我确实有 10 个 Kafka 消费者实例. 除了特定分区中的最后一个块外,我是否可以强制获得 100 个固定数量的记录(尽可能多). 解决方案 K
..
我正在尝试实现一种回复响应模式,在该模式中,我向多个消费者组收听的主题发布消息.这意味着他们都会收到消息并在回复主题中提交回复. 问题是因为他们都回复同一个消息,只有回复主题中第一个收到的消息才会被回复.其他的将被丢弃.鉴于我知道我应该在回复主题上得到多少回复(调用那个数字-n),我怎样才能让 ReplyingKafkaTemplate 等待 n 个回复然后解决答案?我试过从 Spring
..
我有一个运行 Kafka 消费者的应用程序,我想监控从该主题消费的每条消息的处理时间.该应用程序是一个 Spring 启动应用程序,并使用微米注册表将 Kafka 消费者指标公开给 Spring Actuator Prometheus 端点.我可以使用 kafka_consumer_commit_latency_avg_seconds 或 kafka_consumer_commit_latency
..
我想做一个 KStream 到 KTable Join.使用 KTable 作为查找表.下面的步骤显示了代码执行的顺序 构建 KTable ReKey KTable 构建 KStream ReKey KStream 加入KStream - KTable 假设 KStream 中有 8000 条记录,KTable 中有 14 条记录,假设 KStreams 中的每
..
我只是好奇 Spring Kafka 中的批处理侦听器模式是否比非批处理侦听器模式提供更好的性能?如果我们正在处理异常,那么我们仍然需要在 Batch-listener 模式下处理每条记录.非批处理似乎不易出错、稳定且可定制. 请分享您对此的看法,因为我没有找到任何好的比较. 解决方案 这完全取决于你的听众对数据做了什么. 如果它循环处理每条记录,则没有任何好处;您也可以让容器
..
我们有一个使用 spring-kafka (2.2.5.RELEASE) 的 spring boot 应用程序,它在启动时总是出现这个错误: 无法配置主题org.springframework.kafka.KafkaException:等待获取存在超时话题;嵌套异常是 java.util.concurrent.TimeoutException 但是,应用程序继续启动: org.springf
..
我正在使用注解 @KafkaListener 在我的应用程序中使用主题.我需要在运行时更改已经运行的使用者的主题模式,以便可以使用与新模式匹配的新主题. 我尝试了下面的代码,但它仍然使用与旧主题模式匹配的主题.在这里,我在应用程序启动时设置了“旧主题模式".然后,我使用 Spring @Scheduler 每 10 秒将模式更新为“new-topic-pattern". 类“KafkaTo
..
我正在尝试根据事件类型为 kafka 事件创建多个实现类. 公共类KafkaListener {@自动连线服务服务;@KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")公共无效消费来源(对象事件){服务进程(事件);}}公共接口 Service{无效过程(E事件);}
..
我正在使用基于注解的 spring kafka 监听器来消费 kafka 消息,代码如下 使用员工对象 Class Employee{私人字符串名称;私有字符串地址;私人对象帐户;//获取者//二传手} Account 对象在运行时决定它是 Saving Account 还是 Current Account 等. Class SavingAcc{私人 BigDecimal 余额
..
我有一个要求,我必须对 Kafka 使用同步请求-回复模式,因此我使用 ReplyingKafkaTemplate. 作为实现的一部分,有一个生产者正在推送关于一个主题的请求消息(input-message-topic1),但作为回报,我期待来自两个主题的响应(output-message-topic1 和 output-message-topic2),我必须进一步汇总和处理. 问题:
..
我们有一个问题,即当 Kafka 代理必须离线时,没有任何消费者服务对此有任何了解并继续运行. 我们尝试在新的 Kafka 实例中列出消费者,但没有看到那里列出现有消费者.列出的所有消费者都是新创建的消费者. 每次遇到此问题时,我们都必须手动终止所有现有的消费者服务,这很不方便. 问题 - 消费者如何知道它不再列在 Kafka 集群中,因此它应该自行终止? 附言我们使用 S
..
我想从一条消息(来自同一主题)中反序列化不同的对象,并根据对象类型将其保存/更新到数据库中的相应表.正如加里提到的这里我可以使用标题来提供我的对象必须反序列化为哪种类型的信息.您能否提供有关如何实现这一目标的示例? 解决方案 参见 sample-02
..
我们有一个使用 spring-kafka (2.2.5.RELEASE) 的 spring boot 应用程序,它在启动时总是出现这个错误: 无法配置主题org.springframework.kafka.KafkaException:等待获取存在超时话题;嵌套异常是 java.util.concurrent.TimeoutException 但是,应用程序继续启动: org.springf
..
我正在尝试根据事件类型为 kafka 事件创建多个实现类. 公共类KafkaListener {@自动连线服务服务;@KafkaListener(topics = ("mytopic"), containerFactory = "kafkaListenerContainerFactory")公共无效消费来源(对象事件){服务进程(事件);}}公共接口 Service{无效过程(E事件);}
..
我使用的是 Spring Boot 版本 1.5.4.RELEASE &spring Kafka 版本 1.3.8.RELEASE. 我的 Kafka 消费者正在以 100 个块为单位进行批处理.我尝试使用的主题有 10 个分区 &我确实有 10 个 Kafka 消费者实例. 除了特定分区中的最后一个块外,我是否可以强制获得 100 个固定数量的记录(尽可能多). 解决方案 K
..
我有一个简单的 Scala 项目,看起来像这样... @Configuration公共类 CommonConfiguration{...@Value("${spring.kafka.topic}")公共字符串主题;...}@服务class KafkaService @Autowired()(producer: KafkaTemplate[String, Array[Byte]], config:
..