kafka-consumer-api相关内容

消费者组成员没有分区

我在同一个消费者组上启动了两个消费者,我订阅了 20 个主题(每个主题只有一个分区) 仅在消费者上使用: kafka-consumer-groups --bootstrap-server XXXXX:9092 --group foo--describe --members --verbose 注意:这不会显示有关旧的基于 Zookeeper 的消费者的信息.消费者 ID 主机客户端 ..
发布时间:2021-11-12 03:20:57 其他开发

如何在Java中获得消费者Kafka滞后

我在 java 中有一个生产者,在 nodeJS 中有一个消费者.我想在 Java 中知道什么是消费者滞后,所以我知道我是否可以为该主题生成更多数据. java 中获取消费者延迟的 API 是什么? 解决方案 您可以从 Java 调用的实际类是 kafka.admin.ConsumerGroupCommand.它是 Scala 代码,但很容易直接从 Java 调用. 这里有一个 ..

无法在 Spring Boot 中使用 Kafka 消息

我们有一个使用 org.apache.kafka.clients.consumer.KafkaConsumer 消费 Kafka 消息的 Java 应用程序 我们创建了一个具有 Spring-Kafka 依赖项的 Spring Boot 应用程序,但无法读取新项目中的消息.检查了明显的参数,包括引导服务器的主机名和端口(日志显示可以识别)、组、主题以及 Spring Boot 与原始使用者一样 ..

Confluent 控制中心拦截器

如何将 Confluent Control Center Interceptor 添加到现有的 S3(Sink) Connector?监视接收器.我正在寻找文档.任何帮助表示赞赏. 解决方案 要绝对清楚,您的接收器上需要拦截器和源.如果不这样做,您将无法使用 Confluent Control Center 监控您的管道,就像现在一样. 要在 Kafka Connect 中启用拦截器 ..

如果我在 Kafka 中有事务性生产者,我可以使用 Kafka Streams 读取一次消息吗?

我想拥有 Exactly-once 语义,但我不想使用 Consumer 读取消息.我宁愿使用 Kafka Streams AP 阅读消息.如果我将 processing.guarantee=exactly_once 添加到 Stream 配置中,是否会保留恰好一次语义? 解决方案 Exactly-once 处理基于读-处理-写模式.Kafka Streams 使用这种模式,因此,如果您编 ..

卡夫卡消费者 - 不一致地接收消息

我可以在命令行上针对 Kafka 位置安装发送和接收消息.我还可以通过 Java 代码发送消息.这些消息显示在 Kafka 命令提示符中.我还有一个供 Kafka 消费者使用的 Java 代码.代码昨天收到消息.但是,今天早上它没有收到任何消息.代码没有改变.我想知道属性配置是否不太正确.这是我的配置: 制作人: bootstrap.servers - 本地主机:9092group.id ..
发布时间:2021-11-12 03:19:13 其他开发

使用主题启用 ACL 的 Kafka java 生产者和消费者

我对 kafka ACL 配置有点困惑,我们为生产者和消费者配置授权.有各种示例显示使用命令行生成/使用消息.我们是否需要任何额外的配置来使用 JAVA api 向/从安全的 kafka 主题生成/使用消息. 解决方案 如果您想了解安全 Kafka 服务器的配置细节,Confluent docs 有很好的描述.您可以在文档中注意到您通过文件 client.properties 设置某些属性 ..

如果 Kafka Consumer 实例死亡会发生什么?

Kafka Broker 有 3 个分区. Kafka 消费者实例的数量为 3. 突然,一个消费者实例死亡. 我知道如果 Kafka Consumer 实例死亡,Kafka Broker 正在重新平衡,另一个消费者实例会被分配到该分区. 我想知道假设另一个实例消耗了它最初消耗的所有分区然后分配和消耗死分区是否正确. (我是否必须在客户端代码中实现 ConsumerRebala ..
发布时间:2021-11-12 03:18:47 其他开发

Kafka消费者api订阅主题失败

我正在使用简单的 Kafka 客户端 API.据我所知,消费者消息有两种方式,订阅主题和为消费者分配分区. 但是第一种方法不起作用.消费者 poll() 将永远挂起.它仅适用于 assign. //消费者的通用配置映射config = new HashMap();config.put("bootstrap.servers", bootstrap);config.put("group.id" ..
发布时间:2021-11-12 03:18:26 Java开发

Kafka 消费者组再平衡

我正在使用 kafka 消费者组管理来处理我的消息. 我的消息的处理时间各不相同.所以我将最大轮询间隔设置为 20 分钟,最大记录数为 20.并且我使用了 5 个分区和 5 个消费者实例,除了上述两个之外,还有默认配置值. 但我仍然间歇性地收到以下错误: [Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] 尝 ..
发布时间:2021-11-12 03:17:52 其他开发

如何将消息从单个生产者发布到两个主题并在 Kafka 中的单个侦听器中消费

我正在尝试从单个生产者发布两个不同主题的消息. 这里我创建了两个主题: @Bean公共新话题 multi1() {返回 TopicBuilder.name("multi1").partitions(1).build();}@豆角,扁豆公共新话题 multi2() {返回 TopicBuilder.name("multi2").partitions(1).build();} 这是我向两个 ..

Kafka Connect SMT 添加 Kafka 标头字段

我需要找到或编写一个将向请求添加标头字段的 SMT.请求缺少一些类型字段,我想添加它们. 您究竟如何在 SMT 中添加标题,我所看到的只是如下所示的记录转换,但如果我想更改标题或向其中添加字段怎么办? 私有 R applySchemaless(R record) {最终映射value = requireMap(operatingValue(record), 目的);//record.he ..

Kafka 消费者组再平衡

我正在使用 kafka 消费者组管理来处理我的消息. 我的消息的处理时间各不相同.所以我将最大轮询间隔设置为 20 分钟,最大记录数为 20.并且我使用了 5 个分区和 5 个消费者实例,除了上述两个之外,还有默认配置值. 但我仍然间歇性地收到以下错误: [Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] 尝 ..
发布时间:2021-11-12 03:15:45 其他开发

Spring Kafka:在运行时订阅新的主题模式

我正在使用注解 @KafkaListener 在我的应用程序中使用主题.我需要在运行时更改已经运行的使用者的主题模式,以便可以使用与新模式匹配的新主题. 我尝试了下面的代码,但它仍然使用与旧主题模式匹配的主题.在这里,我在应用程序启动时设置了“旧主题模式".然后,我使用 Spring @Scheduler 每 10 秒将模式更新为“new-topic-pattern". 类“KafkaTo ..
发布时间:2021-11-12 03:14:33 Java开发