kafka-consumer-api相关内容

Kafka 0.10 快速入门:消费者在“主要"时失败经纪人被打倒

所以我正在按照主要文档尝试 kafka 快速入门.按照说明获取了多集群示例的所有设置和测试,并且可以正常工作.例如,关闭一个代理,生产者和消费者仍然可以发送和接收. 但是,根据示例,我们设置了 3 个代理并关闭了代理 2(代理 ID = 1).现在,如果我再次调出所有经纪人,但我调低了经纪人 1(经纪人 id = 0),消费者就会挂起.这只会发生在代理 1 (id = 0) 上,不会发生在代 ..
发布时间:2021-11-12 02:30:58 其他开发

我可以在与 Kafka Broker 相同的机器上运行 Kafka Streams 应用程序吗?

我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它 ..

基于时间的桶记录(kafka-hdfs-connector)

我正在尝试使用 Confluent 平台提供的 kafka-hdfs-connector 将数据从 Kafka 复制到 Hive 表中.虽然我能够成功做到这一点,但我想知道如何根据时间间隔对传入的数据进行存储.例如,我想每 5 分钟创建一个新分区. 我尝试了 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner 和 partit ..

Spring Kafka:轮询新消息而不是使用 `onMessage` 通知

我在我的项目中使用 Spring Kafka,因为在基于 Spring 的项目中使用 Kafka 消息似乎是一个自然的选择.为了使用消息,我可以使用 MessageListener 接口.Spring Kafka 在内部负责为每条新消息调用我的 onMessage 方法. 但是,在我的设置中,我更喜欢显式轮询新消息并按顺序处理它们(这将需要几秒钟).作为一种解决方法,我可能只是阻塞在我的 o ..
发布时间:2021-11-12 02:29:25 其他开发

如何使用python列出Kafka消费者组

我想用 python 获取 Kafka 消费者组列表,但我不能. 我使用zookeeper python客户端(kazoo)但消费者组列表为空,因为这种方法适用于旧消费者,我们没有使用旧消费者. 如何使用python代码获取消费者组列表? ./kafka-consumer-groups.sh -bootstrap-server localhost:9092 -list 解决方案 ..
发布时间:2021-11-12 02:29:19 Python

为什么 Kafka Consumer 不断收到相同的消息(偏移量)

我有一个 SOAP Web 服务,它发送 kafka 请求消息并等待 kafka 响应消息(例如,consumer.poll(10000)). 每次调用 Web 服务时,它都会创建一个新的 Kafka Producer 和一个新的 Kafka Consumer. 每次我调用 Web 服务时,消费者都会收到相同的消息(例如,具有相同偏移量的消息). 我使用的是 Kafka 0.9 ..
发布时间:2021-11-12 02:29:16 其他开发

使用比分区更多的消费者进行持续的消费者组重新平衡

鉴于以下设置: Kafka v0.11.0.0 3 个经纪人 2 个主题,每个主题有 2 个分区,复制因子为 3 2 个消费者组,每个主题一个 3 个包含消费者的服务器 服务器包含两个消费者,每个主题一个,这样: 服务器 A consumer-A1 在组 topic-1-group 消费 topic-1 consumer-A2 在组 topic-2-group 消费 ..
发布时间:2021-11-12 02:28:41 Java开发

再次重新处理/读取 Kafka 记录/消息 - 消费者组偏移重置的目的是什么?

我的 kafka 主题总共有 10 条记录/消息,2 个分区,每个分区有 5 条消息.我的消费者组有 2 个消费者,每个消费者已经分别从他们分配的分区中读取了 5 条消息.现在,我想从开始/开始(偏移量 0)重新处理/读取来自我的主题的消息. 我停止了我的 kafka 消费者并运行以下命令将消费者组偏移重置为 0. ./kafka-consumer-groups.sh --group cg ..
发布时间:2021-11-12 02:27:32 其他开发

spring-kafka AckMode中MANUAL和MANUAL_IMMEDIATE有什么区别

从 spring-docs,我可以看到 MANUAL - 消息侦听器负责确认()确认;之后,应用与 BATCH 相同的语义. MANUAL_IMMEDIATE - 当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量. 但是如果侦听器提交偏移量到底有什么区别.MANUAL 模式 有哪些额外的步骤 解决方案 MANUAL - 在处理完最 ..
发布时间:2021-11-12 02:27:19 其他开发

从 Kafka 请求两个时间戳之间的消息

是否可以根据消息被摄取的时间段来消费来自 Kafka 的消息? 示例:我希望将所有消息提取到今天 0900-1000 之间的主题(现在是 1200). 如果只有一种方法可以指定开始时间,那很好 - 我的消费者可以在到达结束时间后停止处理消息. 我可以看到从给定偏移量请求消息的方法,以及获取第一个可用偏移量和最早可用偏移量的方法,但不是给定时间后的所有消息. 解决方案 您可 ..
发布时间:2021-11-12 02:26:49 其他开发

Kafka 镜像集群中如何维护客户偏移量?

假设我有两个 Kafka 集群,我正在使用 mirror maker 将主题从一个集群镜像到另一个集群.我知道消费者有一个嵌入式生产者来向 Kafka 集群中的 __consumer-offset 主题提交偏移量.我需要知道如果主 Kafka 集群出现故障会发生什么?我们是否也同步 __consumer-offset 主题?因为辅助集群可能有不同数量的代理和其他设置,我认为. 请问Kafka ..

Kafka 消费者启动延迟融合 dotnet

启动 confluent-dotnet 消费者时,在调用 subscribe 和后续轮询后,似乎需要很长时间才能从服务器接收到“已分配分区"事件,以及消息(大约 10-15 秒). 一开始我以为有自动创建主题的开销,但是不管消费者的主题/消费者组是否已经存在,时间都是一样的. 我用这个配置启动我的消费者,其余的代码与融合高级消费者示例中的相同: var kafkaConfig = ..
发布时间:2021-11-12 02:26:22 C#/.NET

卡夫卡有重复的消息

我在生成或使用数据时没有看到任何失败,但是在生产中存在大量重复消息.对于一个收到大约 100k 条消息的小主题,有大约 4k 重复,尽管就像我说的没有失败,最重要的是没有实现重试逻辑或设置配置值. 我还检查了那些重复消息的偏移值,每个消息都有不同的值,告诉我问题出在生产者身上. 任何帮助将不胜感激 解决方案 阅读有关 kafka 中消息传递的更多信息: https://k ..
发布时间:2021-11-12 02:26:20 其他开发

使用来自多个 kafka 主题的消息的最佳实践是什么?

我需要消费来自不同 kafka 主题的消息, 我是否应该为每个主题创建不同的消费者实例,然后根据分区数量启动一个新的处理线程.或 我应该从单个消费者实例订阅所有主题,并且应该启动不同的处理线程 谢谢&问候,梅加 解决方案 唯一的规则是你必须考虑 Kafka 做什么和不保证什么: Kafka 只保证单个主题/分区的消息顺序.编辑:这也意味着如果您的单个主题消费者出于某种原 ..
发布时间:2021-11-12 02:26:14 其他开发

如何初始化 kafka ConsumerRecords在 kafka 中进行测试

我正在为 kafka 消费者组件编写测试用例并模拟 kafkaConsumer.poll(),它返回 ConsumerRecords 的实例.我想初始化 ConsumerRecords 并在模拟中使用它,但是 ConsumerRecords 的构造函数期望我在测试中没有的实际 kafka 主题.我认为的一种方法是保留对象的序列化副本并反序列化以初始化 ConsumerRecords.有没有其他方法 ..
发布时间:2021-11-12 02:25:56 Java开发