kafka-consumer-api相关内容

fetch.max.wait.ms 与 poll() 方法的参数

在我提出问题之前,我想指出一个类似的问题已经被问到 here 但尚未回答,所以我再次询问.请不要将此标记为重复,因为前面提到的问题没有任何答案. 我对 fetch.max.wait.ms 和 consumer.poll() 有疑问.这是我在研究上述配置时发现的 poll() 方法采用超时参数.这指定了 poll 返回所需的时间,有无数据 如果您将 fetch.max.wait.m ..
发布时间:2021-11-12 02:15:02 其他开发

如何以编程方式检查 Kafka Broker 是否已启动并在 Python 中运行

我正在尝试使用来自 Kafka 主题的消息.我在 confluent_kafka 消费者周围使用了一个包装器.在开始消费消息之前,我需要检查是否建立了连接. 我读到消费者很懒惰,所以我需要执行一些操作才能建立连接.但是我想在不执行consume 或poll 操作的情况下检查连接建立. 此外,我尝试给出一些错误的配置,以查看民意调查的响应.我得到的回应是: b'Broker: 没有更多 ..
发布时间:2021-11-12 02:14:26 Python

kafka-python 中的多处理

我一直在使用 python-kaka 模块从 kafka 代理消费.我想从同一主题中并行使用 'x' 个分区.文档有这个: # 使用 0.9 kafka brokers 并行使用多个消费者# 通常你会在不同的服务器/进程/CPU 上运行每个消费者1 = KafkaConsumer('我的主题',group_id='我的组',bootstrap_servers='my.server.com')消费 ..
发布时间:2021-11-12 02:14:18 其他开发

Kafka 高级消费者 error_code=15

当尝试使用高级消费者(使用全新的消费者组)从 Kafka 消费时,消费者永远不会开始运行.当我将日志记录级别切换到调试时,我可以看到以下两行一遍又一遍地重复: DEBUG [AbstractCoordinator] 09:43:51.192:发送组 CompletelyNewConsumerGroupThatIneverUsedBefore 的协调器请求到代理 172.30.3.61:30000 ..
发布时间:2021-11-12 02:13:54 其他开发

Kafka 消费者(组)触发再平衡的条件

我正在浏览 Kafka 的消费者配置. https://kafka.apache.org/documentation/#newconsumerconfigs 触发重新平衡的参数是什么?.例如,以下参数将 ?.我们需要更改或默认的任何其他参数就足够了 connections.max.idle.ms 在此配置指定的毫秒数后关闭空闲连接.长540000中 我们还有三个不同的主题 ..
发布时间:2021-11-12 02:13:40 其他开发

Kafka 一个分区的多个消费者

我有一个将消息写入主题/分区的生产者.为了保持排序,我想使用单个分区,我希望 12 个消费者从这个单个分区读取所有消息(没有消费者组,所有消息都应该发送给所有消费者).这是可以实现的吗?我读过一些论坛,每个分区只有一个消费者可以阅读. 解决方案 您可以使用 SimpleConsumer 来实现您的要求 - 没有消费者组,所有消费者都可以读取单个分区.然而,这种方法意味着您必须自己处理偏移存 ..
发布时间:2021-11-12 02:13:27 其他开发

是否可以使用Kafka传输文件?

我每天都会生成数千个文件,我想使用 Kafka 流式传输这些文件.当我尝试读取文件时,每一行都被视为一条单独的消息. 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息,以及如何将 Kafka 主题中的每条消息写入单独的文件中. 解决方案 您可以编写自己的序列化器/反序列化器来处理文件.例如: 制作人道具: props.put(ProducerConfig.KEY ..
发布时间:2021-11-12 02:13:00 其他开发

如何从kafka服务器获取主题中的所有消息

我想从服务器的主题中获取所有消息. 例如: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning 当使用上面的控制台命令时,我希望能够从一开始就获取一个主题中的所有消息,但我无法使用 java 代码从一开始就消耗一个主题中的所有消息. 解决方案 ..
发布时间:2021-11-12 02:12:57 其他开发

kafka-python 消费者未收到消息

我在使用 KafaConsumer 时遇到问题,无法从头读取,或从任何其他显式偏移量读取. 为同一主题的消费者运行命令行工具,我确实看到带有 --from-beginning 选项的消息,否则它会挂起 $ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginnin ..
发布时间:2021-11-12 02:12:42 Python

Kafka 消费者异常和偏移提交

我一直在尝试为 Spring Kafka 做一些 POC 工作.具体来说,我想尝试在处理 Kafka 中的消息时处理错误的最佳实践是什么. 我想知道是否有人能够提供帮助: 分享围绕 Kafka 消费者应该做什么的最佳实践出现故障时 帮助我了解 AckMode Record 是如何工作的,以及如何在 listener 方法中抛出异常时防止提交到 Kafka 偏移队列. 2 的代码 ..
发布时间:2021-11-12 02:12:28 Java开发

为什么Kafka消费者性能很慢?

我有一个简单的主题,一个简单的 Kafka 消费者和生产者,使用默认配置. 程序很简单,我有两个线程. 在生产者中,它不断发送 16 个字节的数据. 在消费者端,它不断接收. 我发现生产者的吞吐量大约为 10MB/s,这很好. 但消费者的吞吐量仅为 0.2MB/s.我已经禁用了所有调试日志,但这并没有让它变得更好.测试在本地机器上运行.任何机构都知道出了什么问题?谢谢 ..

如何将主题动态传递给 kafka 侦听器?

从几天开始,我正在尝试将主题动态传递给 Kafka 侦听器的方法,而不是通过来自 Java DSL 的键使用它们.周围的任何人以前都这样做过,或者可以说明实现这一目标的最佳方法是什么? 解决方案 您不能“动态地将主题传递给 Kafka 监听器";您必须以编程方式创建一个侦听器容器. ..
发布时间:2021-11-12 02:11:46 Java开发

简单的 Kafka 消费者没有收到消息

我是 Kafka 的新手,正在运行一个简单的 kafka 消费者/生产者示例,如 KafkaConsumer 和 KafkaProducer.当我从终端运行消费者时,消费者正在接收消息,但我无法使用 Java 代码进行侦听.我也在 StackoverFlow 上搜索过类似的问题(链接:Link1、Link2) 并尝试了该解决方案,但似乎没有任何效果为我工作.Kafka版本:kafka_2.10-0 ..
发布时间:2021-11-12 02:11:28 Java开发

使用 Kafka Streams DSL 时如何处理错误和不提交

对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制提交与否.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka 会多次重新传递此消息,直到问题得到解决. 但是在使用其更高级别的流 DSL API 时如何控制是否提交消息? 资源: http://docs.confluent.io/2.1.0-alpha1/streams/ ..

Kafka 消费者默认组 ID

我正在使用 Apache Kafka 及其 Java 客户端,我看到消息在属于同一组的不同 Kafka 使用者之间进行负载平衡(即共享相同的组 ID). 在我的应用程序中,我需要所有消费者阅读所有消息. 所以我有几个问题: 如果我没有在 Consumer Properties 中设置任何 group id,那么 Kafka Consumer 会被赋予什么 group id? ..
发布时间:2021-11-12 02:10:26 其他开发

Kafka:多个实例中的单个消费者组

我正在为我们的应用程序实施基于 Kafka 的解决方案.根据 Kafka 文档,我的理解是消费者组中的一个消费者(这是一个线程)在内部映射到订阅主题中的一个分区. 假设我有一个包含 40 个分区的主题,并且我有一个在 4 个实例中运行的高级使用者.我不希望一个实例消耗另一个实例消耗的相同消息.但是如果一个实例宕机,其他三个实例应该能够处理所有消息. 我应该使用每个实例 10 个线程的同 ..
发布时间:2021-11-12 02:10:12 Java开发

卡夫卡消费者卡在(重新)加入组中

如果 kafka(版本 0.10)消费者尝试重新加入消费者组,它的默认行为是什么.我正在为一个消费者组使用单个消费者,但似乎重新加入时受到了打击.每 10 分钟后,它会在消费者日志中打印以下行. 2016-08-11 13:54:53,803 INFO o.a.k.c.c.i.ConsumerCoordinator [pool-5-thread-1] ****撤销先前分配的分区**** [] ..
发布时间:2021-11-12 02:10:09 其他开发