kafka-consumer-api相关内容
您好,我正在研究 kafka CLI,以清楚了解 kafka 的工作原理.我对消费者群体感到困惑.我创建了三个分区的主题.我将创建生产者以向主题提供一些数据.我第一次添加了一些数据,如下所示. kafka-console-producer --broker-list 127.0.0.1:9092 --topic users>用户1kafka-console-producer --broker-l
..
我试图了解以下两个融合的消费者配置的默认值如何协同工作. max.poll.interval.ms - 根据融合文档,默认值为 300,000 毫秒 session.timeout.ms - 根据融合文档,默认值为 10,000 毫秒 heartbeat.interval.ms - 根据融合文档,默认值为 3,000 毫秒 假设我在配置中使用这些默认值.现在我有一个问题.
..
我有一个有 8 个分区的 kafka 主题,从单个消费者订阅该主题,并且我为消费者提供了唯一的消费者组.现在我尝试只使用来自所有分区的最近消息(在我的情况下是从当前时间开始 3 分钟前).我使用了如下所示的 offsetForTimes 方法. ListpartitionInfos = consumer.partitionsFor(topic);列表topicPartions = partiti
..
这是我学习 kafka 的早期阶段.我正在检查本地机器中的每个 kafka 属性/概念. 所以我遇到了这个属性min.insync.replicas,这是我的理解.如果我误解了什么,请纠正我. 将消息发送到主题后,必须将该消息写入至少 min.insync.replicas 数量的关注者. min.insync.replicas 还包括领导者. 如果可用的 live brokers
..
我正在实现 Kafka 消费者类来接收消息.我只想每次都收到新消息.因此,我将 enable.auto.commit 设置为 true.然而,偏移似乎根本没有改变.尽管主题、消费者组和分区一直是相同的. 这是我的消费者代码: consumerConfig.put("bootstrap.servers", bootstrap);consumerConfig.put("group.id",
..
我将 Kafka 0.9.0 与原生 Java 消费者客户端一起使用.如果我有 1 个主题和 1 个分区有人可以告诉我,如果我这样做:寻求结束(我的主题);民意调查(x); 我只会得到最后一个记录,所以我会知道我在最后一个位置? 解决方案 是的,你只会得到最后(最新)的记录,因为 seekToEnd() 方法“延迟评估",因此直到 poll() 被调用时才会计算结束.当然,当 pol
..
每当我的消费者从 Kafka 请求一个新批次时,它总是请求 1MB 的数据,然后它似乎请求下一个 1MB,依此类推.有没有人知道接收20MB的批次需要什么配置和编程步骤? 解决方案 您可以将消费者属性中的属性 max.partition.fetch.bytes 设置为您想要的值(默认为 1MB). 此外,此值必须等于或大于代理配置中的 max.message.size 属性,以确保您
..
我想知道关于 kafka 选项 Importnace 级别的区别.Apache kafka 有 3 个级别. org.apache.kafka.commong.config.ConfigDef.Importance 公共枚举重要性{高、中、低} 这三个有什么区别? 解决方案 此枚举用作 Kafka 社区的指示器,为用户在许多配置上提供一些指导.您将在配置说明中看到它们. 没
..
寻找设计我的 Kafka 消费者的最佳方法.基本上我想看看什么是避免数据丢失的最佳方法,以防万一处理消息期间的异常/错误. 我的用例如下. a) 我使用 SERVICE 来处理消息的原因是 - 将来我计划编写一个 ERROR PROCESSOR 应用程序,它将在一天结束时运行,它将尝试处理失败的消息(不是所有消息,但由于缺少父级等任何依赖项而失败的消息再次出现. b) 我想确保消
..
我一直在尝试使用来自 Kafka-exporter 的 Metrics 来实现 Kubernetes HPA.Hpa 支持 Prometheus,因此我们尝试将指标写入 prometheus 实例.从那里开始,我们不清楚要执行的步骤.有没有文章详细解释一下? 我关注了 https://medium.com/google-cloud/kubernetes-hpa-autoscaling-wit
..
是否可以将 CometD 客户端与 Kafka 生产者连接起来?有什么建议吗? 目前我在 python 中有一个 CometD 客户端,它从 Salesforce 对象中实时提取数据. 现在我想将该数据推送到 Kafka 生产者.有可能这样做吗?以及如何? 解决方案 已解决. 通过使用https://github.com/dkmadigan/python-bayeux-c
..
为了让我的消费者保持活跃(非常长的可变长度处理),我在后台线程中实现了一个空的 poll() 调用,如果我在 polls() 之间花费太多时间,它将阻止代理重新平衡.我已将轮询间隔设置得很长,但我不想一直增加它以进行越来越长的处理. 轮询无记录的正确方法是什么?目前我正在调用 poll(),然后重新寻找在 poll call() 中返回的每个分区的最早偏移量,以便主线程在处理完之前的消息后可
..
我在 Rest 控制器中使用 ReplyingKafkaTemplate 来返回同步响应.我也在设置标题 REPLY_TOPIC.对于监听器微服务部分, @KafkaListener(topics = "${kafka.topic.request-topic}")@发给公共模型监听(模型 请求)抛出 InterruptedException {SumModel 模型 = request.getR
..
我想在 kafka 中读取从特定时间开始的所有消息.假设我想阅读 0600 到 0800 之间的所有消息 从 Kafka 请求两个时间戳之间的消息建议使用 offsetsForTimes 作为解决方案. 该解决方案的问题是:如果说我的消费者每天在 1300 开启.消费者当天不会阅读任何消息,这实际上意味着在 0600 时/之后没有提交偏移量,这意味着 offsetsForTimes(
..
尝试使用 confluent_kafka.AvroConsumer 来消费来自给定时间戳的消息. if 标志:# 创建一个列表topic_partitions_to_search = 列表(map(lambda p: TopicPartition('my_topic2', p, int(time.time())), range(0, 1)))print("用 %s 搜索偏移量" % topic_
..
通过说集群信息,我指的是像 这样的信息 订阅的消费者/消费者群体 读取和提交的偏移量 分区的领导者和追随者 服务器上的主题等 zookeeper 是将这些信息保存在它自己的数据库中(尽管我从来没有听说过 Zookeeper 有任何自己的数据库)还是将这些信息存储在 Kafka 集群中的某些主题等? 编辑:以及后续问题:Zookeeper 如何从 __consumer_offs
..
遵循此处提到的解决方案 kafka-mirror-maker-失败复制消费者偏移主题.我能够在 DC1(实时 Kafka 集群)和 DC2(备份 Kafka 集群)集群中启动镜像制造商而没有任何错误. 看起来它也可以跨 DC2 集群从 DC1 集群同步 __consumer_offsets 主题. 问题 如果我关闭 DC1 的消费者并将相同的消费者(相同的 group_id)指向
..
使用 python kafka api 从一个主题中读取消息,其中只有少数消息.Kafka 不断地一遍又一遍地重放队列中的消息. 它从我的主题收到一条消息(每个消息内容都回来),然后抛出 ERROR - Heartbeat session expired - 标记协调器死了 并继续遍历其余消息并继续重播它们.更多日志: kafka.coordinator - ERROR - Heartbe
..
比如说,我发布和使用不同类型的 java 对象.对于每个对象,我必须定义自己的序列化器实现.我们如何在“serializer.class"属性下提供 kafka 消费者/生产者属性文件中的所有实现? 解决方案 我们对不同主题中的不同对象有类似的设置,但在一个主题中始终使用相同的对象类型.我们使用 ByteArrayDeserializer 随 Java API 0.9.0.1 一起提供,这
..
我有一个 Kafka 应用程序,我在其中使用 kafka-console-consumer.sh 消费消息,如下所示: $./kafka-console-consumer.sh --zookeeper zookeeperhost:2181 --topic myTopic 它提供了我通过 Kafka 消费者写给 Kafka 代理的所有消息,没有任何遗漏. 最近我在一个不同的环境中部署了该应
..