kafka-consumer-api相关内容
所以我正在按照主要文档尝试 kafka 快速入门.按照说明获取了多集群示例的所有设置和测试,并且可以正常工作.例如,关闭一个代理,生产者和消费者仍然可以发送和接收. 但是,根据示例,我们设置了 3 个代理并关闭了代理 2(代理 ID = 1).现在,如果我再次调出所有经纪人,但我调低了经纪人 1(经纪人 id = 0),消费者就会挂起.这只会发生在代理 1 (id = 0) 上,不会发生在代
..
场景: 我正在将数据 JSON 对象数据写入 kafka 主题,同时阅读我想根据消息中存在的值仅读取一组特定的消息.我正在使用 kafka-python 库. 示例消息: {flow_status: "completed", value: 1, active: yes}{flow_status:"failure",value 2, active:yes} 这里我只想读取 flow_
..
我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它
..
我正在尝试使用 Confluent 平台提供的 kafka-hdfs-connector 将数据从 Kafka 复制到 Hive 表中.虽然我能够成功做到这一点,但我想知道如何根据时间间隔对传入的数据进行存储.例如,我想每 5 分钟创建一个新分区. 我尝试了 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner 和 partit
..
我在我的项目中使用 Spring Kafka,因为在基于 Spring 的项目中使用 Kafka 消息似乎是一个自然的选择.为了使用消息,我可以使用 MessageListener 接口.Spring Kafka 在内部负责为每条新消息调用我的 onMessage 方法. 但是,在我的设置中,我更喜欢显式轮询新消息并按顺序处理它们(这将需要几秒钟).作为一种解决方法,我可能只是阻塞在我的 o
..
我想用 python 获取 Kafka 消费者组列表,但我不能. 我使用zookeeper python客户端(kazoo)但消费者组列表为空,因为这种方法适用于旧消费者,我们没有使用旧消费者. 如何使用python代码获取消费者组列表? ./kafka-consumer-groups.sh -bootstrap-server localhost:9092 -list 解决方案
..
我有一个 SOAP Web 服务,它发送 kafka 请求消息并等待 kafka 响应消息(例如,consumer.poll(10000)). 每次调用 Web 服务时,它都会创建一个新的 Kafka Producer 和一个新的 Kafka Consumer. 每次我调用 Web 服务时,消费者都会收到相同的消息(例如,具有相同偏移量的消息). 我使用的是 Kafka 0.9
..
因为它可以是标题中的客人,有没有办法在java中获取特定主题的消费者列表?直到现在我才能得到这样的主题列表 final ListTopicsResult listTopicsResult = adminClient.listTopics();KafkaFuture>kafkaFuture = listTopicsResult.names();设置地图 = kafkaFuture.get();
..
我有一个单节点 Kafka 代理,它在单节点 kubernetes 环境中的 pod 内运行.我将此图像用于 kafka:https://hub.docker.com/r/wurstmeister/卡夫卡 kafka 版本 = 1.1.0 Kubernetes 集群在服务器上的虚拟机内运行.VM 在活动接口 ens32 上具有以下 IP - 192.168.3.102 Kafka
..
鉴于以下设置: Kafka v0.11.0.0 3 个经纪人 2 个主题,每个主题有 2 个分区,复制因子为 3 2 个消费者组,每个主题一个 3 个包含消费者的服务器 服务器包含两个消费者,每个主题一个,这样: 服务器 A consumer-A1 在组 topic-1-group 消费 topic-1 consumer-A2 在组 topic-2-group 消费
..
我的 kafka 主题总共有 10 条记录/消息,2 个分区,每个分区有 5 条消息.我的消费者组有 2 个消费者,每个消费者已经分别从他们分配的分区中读取了 5 条消息.现在,我想从开始/开始(偏移量 0)重新处理/读取来自我的主题的消息. 我停止了我的 kafka 消费者并运行以下命令将消费者组偏移重置为 0. ./kafka-consumer-groups.sh --group cg
..
从 spring-docs,我可以看到 MANUAL - 消息侦听器负责确认()确认;之后,应用与 BATCH 相同的语义. MANUAL_IMMEDIATE - 当侦听器调用 Acknowledgment.acknowledge() 方法时立即提交偏移量. 但是如果侦听器提交偏移量到底有什么区别.MANUAL 模式 有哪些额外的步骤 解决方案 MANUAL - 在处理完最
..
是否可以根据消息被摄取的时间段来消费来自 Kafka 的消息? 示例:我希望将所有消息提取到今天 0900-1000 之间的主题(现在是 1200). 如果只有一种方法可以指定开始时间,那很好 - 我的消费者可以在到达结束时间后停止处理消息. 我可以看到从给定偏移量请求消息的方法,以及获取第一个可用偏移量和最早可用偏移量的方法,但不是给定时间后的所有消息. 解决方案 您可
..
我想在使用 kafka-console-consumer 命令行工具时使用我的自定义 value.deserializer.像这样: ./kafka-console-consumer --bootstrap-server kafka2:29092 \--property value.deserializer=My.Custom.KafkaDeserializer \--主题测试 但它无法找到我
..
我正在尝试用 Java 创建一个消费者客户端.我意识到 poll() 函数已贬值.听卡夫卡的话题有哪些选择? 我的代码: KafkaConsumerkc = 新的 KafkaConsumer(props2);kc.subscribe(Collections.singletonList(topicName));而(真){ConsumerRecords记录 = kc.poll(100);for
..
假设我有两个 Kafka 集群,我正在使用 mirror maker 将主题从一个集群镜像到另一个集群.我知道消费者有一个嵌入式生产者来向 Kafka 集群中的 __consumer-offset 主题提交偏移量.我需要知道如果主 Kafka 集群出现故障会发生什么?我们是否也同步 __consumer-offset 主题?因为辅助集群可能有不同数量的代理和其他设置,我认为. 请问Kafka
..
启动 confluent-dotnet 消费者时,在调用 subscribe 和后续轮询后,似乎需要很长时间才能从服务器接收到“已分配分区"事件,以及消息(大约 10-15 秒). 一开始我以为有自动创建主题的开销,但是不管消费者的主题/消费者组是否已经存在,时间都是一样的. 我用这个配置启动我的消费者,其余的代码与融合高级消费者示例中的相同: var kafkaConfig =
..
我在生成或使用数据时没有看到任何失败,但是在生产中存在大量重复消息.对于一个收到大约 100k 条消息的小主题,有大约 4k 重复,尽管就像我说的没有失败,最重要的是没有实现重试逻辑或设置配置值. 我还检查了那些重复消息的偏移值,每个消息都有不同的值,告诉我问题出在生产者身上. 任何帮助将不胜感激 解决方案 阅读有关 kafka 中消息传递的更多信息: https://k
..
我需要消费来自不同 kafka 主题的消息, 我是否应该为每个主题创建不同的消费者实例,然后根据分区数量启动一个新的处理线程.或 我应该从单个消费者实例订阅所有主题,并且应该启动不同的处理线程 谢谢&问候,梅加 解决方案 唯一的规则是你必须考虑 Kafka 做什么和不保证什么: Kafka 只保证单个主题/分区的消息顺序.编辑:这也意味着如果您的单个主题消费者出于某种原
..
我正在为 kafka 消费者组件编写测试用例并模拟 kafkaConsumer.poll(),它返回 ConsumerRecords 的实例.我想初始化 ConsumerRecords 并在模拟中使用它,但是 ConsumerRecords 的构造函数期望我在测试中没有的实际 kafka 主题.我认为的一种方法是保留对象的序列化副本并反序列化以初始化 ConsumerRecords.有没有其他方法
..