kafka-consumer-api相关内容
在我提出问题之前,我想指出一个类似的问题已经被问到 here 但尚未回答,所以我再次询问.请不要将此标记为重复,因为前面提到的问题没有任何答案. 我对 fetch.max.wait.ms 和 consumer.poll() 有疑问.这是我在研究上述配置时发现的 poll() 方法采用超时参数.这指定了 poll 返回所需的时间,有无数据 如果您将 fetch.max.wait.m
..
我正在尝试使用来自 Kafka 主题的消息.我在 confluent_kafka 消费者周围使用了一个包装器.在开始消费消息之前,我需要检查是否建立了连接. 我读到消费者很懒惰,所以我需要执行一些操作才能建立连接.但是我想在不执行consume 或poll 操作的情况下检查连接建立. 此外,我尝试给出一些错误的配置,以查看民意调查的响应.我得到的回应是: b'Broker: 没有更多
..
我一直在使用 python-kaka 模块从 kafka 代理消费.我想从同一主题中并行使用 'x' 个分区.文档有这个: # 使用 0.9 kafka brokers 并行使用多个消费者# 通常你会在不同的服务器/进程/CPU 上运行每个消费者1 = KafkaConsumer('我的主题',group_id='我的组',bootstrap_servers='my.server.com')消费
..
如何使用直接流 API 为 kafka Spark 流指定消费者组 ID. HashMapkafkaParams = new HashMap();kafkaParams.put("metadata.broker.list", brokers);kafkaParams.put("auto.offset.reset", "最大");kafkaParams.put("group.id", "app1"
..
当尝试使用高级消费者(使用全新的消费者组)从 Kafka 消费时,消费者永远不会开始运行.当我将日志记录级别切换到调试时,我可以看到以下两行一遍又一遍地重复: DEBUG [AbstractCoordinator] 09:43:51.192:发送组 CompletelyNewConsumerGroupThatIneverUsedBefore 的协调器请求到代理 172.30.3.61:30000
..
我正在浏览 Kafka 的消费者配置. https://kafka.apache.org/documentation/#newconsumerconfigs 触发重新平衡的参数是什么?.例如,以下参数将 ?.我们需要更改或默认的任何其他参数就足够了 connections.max.idle.ms 在此配置指定的毫秒数后关闭空闲连接.长540000中 我们还有三个不同的主题
..
我有一个将消息写入主题/分区的生产者.为了保持排序,我想使用单个分区,我希望 12 个消费者从这个单个分区读取所有消息(没有消费者组,所有消息都应该发送给所有消费者).这是可以实现的吗?我读过一些论坛,每个分区只有一个消费者可以阅读. 解决方案 您可以使用 SimpleConsumer 来实现您的要求 - 没有消费者组,所有消费者都可以读取单个分区.然而,这种方法意味着您必须自己处理偏移存
..
我是Kafka新手,我用kafka通过logstash收集netflow(可以),我想从kafka发送数据到elasticsearch,但是有一些问题. 我的问题是如何将 Kafka 与 Elasticsearch 连接起来?netflow 到 kafka logstash 配置: 输入{UDP{主机 =>“120.127.XXX.XX"端口 =>5556编解码器 =>网络流量}}筛选{}输出
..
我每天都会生成数千个文件,我想使用 Kafka 流式传输这些文件.当我尝试读取文件时,每一行都被视为一条单独的消息. 我想知道如何将每个文件的内容作为 Kafka 主题中的单个消息,以及如何将 Kafka 主题中的每条消息写入单独的文件中. 解决方案 您可以编写自己的序列化器/反序列化器来处理文件.例如: 制作人道具: props.put(ProducerConfig.KEY
..
我想从服务器的主题中获取所有消息. 例如: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning 当使用上面的控制台命令时,我希望能够从一开始就获取一个主题中的所有消息,但我无法使用 java 代码从一开始就消耗一个主题中的所有消息. 解决方案
..
我在使用 KafaConsumer 时遇到问题,无法从头读取,或从任何其他显式偏移量读取. 为同一主题的消费者运行命令行工具,我确实看到带有 --from-beginning 选项的消息,否则它会挂起 $ ./kafka-console-consumer.sh --zookeeper {localhost:port} --topic {topic_name} --from-beginnin
..
我一直在尝试为 Spring Kafka 做一些 POC 工作.具体来说,我想尝试在处理 Kafka 中的消息时处理错误的最佳实践是什么. 我想知道是否有人能够提供帮助: 分享围绕 Kafka 消费者应该做什么的最佳实践出现故障时 帮助我了解 AckMode Record 是如何工作的,以及如何在 listener 方法中抛出异常时防止提交到 Kafka 偏移队列. 2 的代码
..
我有一个简单的主题,一个简单的 Kafka 消费者和生产者,使用默认配置. 程序很简单,我有两个线程. 在生产者中,它不断发送 16 个字节的数据. 在消费者端,它不断接收. 我发现生产者的吞吐量大约为 10MB/s,这很好. 但消费者的吞吐量仅为 0.2MB/s.我已经禁用了所有调试日志,但这并没有让它变得更好.测试在本地机器上运行.任何机构都知道出了什么问题?谢谢
..
从几天开始,我正在尝试将主题动态传递给 Kafka 侦听器的方法,而不是通过来自 Java DSL 的键使用它们.周围的任何人以前都这样做过,或者可以说明实现这一目标的最佳方法是什么? 解决方案 您不能“动态地将主题传递给 Kafka 监听器";您必须以编程方式创建一个侦听器容器.
..
我是 Kafka 的新手,正在运行一个简单的 kafka 消费者/生产者示例,如 KafkaConsumer 和 KafkaProducer.当我从终端运行消费者时,消费者正在接收消息,但我无法使用 Java 代码进行侦听.我也在 StackoverFlow 上搜索过类似的问题(链接:Link1、Link2) 并尝试了该解决方案,但似乎没有任何效果为我工作.Kafka版本:kafka_2.10-0
..
对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制提交与否.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka 会多次重新传递此消息,直到问题得到解决. 但是在使用其更高级别的流 DSL API 时如何控制是否提交消息? 资源: http://docs.confluent.io/2.1.0-alpha1/streams/
..
我正在使用 Apache Kafka 及其 Java 客户端,我看到消息在属于同一组的不同 Kafka 使用者之间进行负载平衡(即共享相同的组 ID). 在我的应用程序中,我需要所有消费者阅读所有消息. 所以我有几个问题: 如果我没有在 Consumer Properties 中设置任何 group id,那么 Kafka Consumer 会被赋予什么 group id?
..
我正在为我们的应用程序实施基于 Kafka 的解决方案.根据 Kafka 文档,我的理解是消费者组中的一个消费者(这是一个线程)在内部映射到订阅主题中的一个分区. 假设我有一个包含 40 个分区的主题,并且我有一个在 4 个实例中运行的高级使用者.我不希望一个实例消耗另一个实例消耗的相同消息.但是如果一个实例宕机,其他三个实例应该能够处理所有消息. 我应该使用每个实例 10 个线程的同
..
如果 kafka(版本 0.10)消费者尝试重新加入消费者组,它的默认行为是什么.我正在为一个消费者组使用单个消费者,但似乎重新加入时受到了打击.每 10 分钟后,它会在消费者日志中打印以下行. 2016-08-11 13:54:53,803 INFO o.a.k.c.c.i.ConsumerCoordinator [pool-5-thread-1] ****撤销先前分配的分区**** []
..
我有使用 Apache Kafka 2.11-0.10.1.0 的 Java 8 应用程序.我需要使用 seek 功能来poll 来自分区的旧消息.但是,我遇到了 No current assignment for partition 的异常,每次我尝试 seekByOffset 时都会发生这种情况.这是我的班级,负责seek主题到指定的时间戳: import org.apache.kafka.
..