kafka-consumer-api相关内容

消费者组如何在 kafka 中工作?

您好,我正在研究 kafka CLI,以清楚了解 kafka 的工作原理.我对消费者群体感到困惑.我创建了三个分区的主题.我将创建生产者以向主题提供一些数据.我第一次添加了一些数据,如下所示. kafka-console-producer --broker-list 127.0.0.1:9092 --topic users>用户1kafka-console-producer --broker-l ..
发布时间:2021-11-12 03:06:39 其他开发

Confluent Kafka Consumer Configuration - session.timeout.ms 和 max.poll.interval.ms 有什么关系?

我试图了解以下两个融合的消费者配置的默认值如何协同工作. max.poll.interval.ms - 根据融合文档,默认值为 300,000 毫秒 session.timeout.ms - 根据融合文档,默认值为 10,000 毫秒 heartbeat.interval.ms - 根据融合文档,默认值为 3,000 毫秒 假设我在配置中使用这些默认值.现在我有一个问题. ..
发布时间:2021-11-12 03:06:33 其他开发

Kafka 消费者 offsetForTimes 方法仅返回少数分区偏移位置而不是全部

我有一个有 8 个分区的 kafka 主题,从单个消费者订阅该主题,并且我为消费者提供了唯一的消费者组.现在我尝试只使用来自所有分区的最近消息(在我的情况下是从当前时间开始 3 分钟前).我使用了如下所示的 offsetForTimes 方法. ListpartitionInfos = consumer.partitionsFor(topic);列表topicPartions = partiti ..
发布时间:2021-11-12 03:06:05 其他开发

Kafka 配置 min.insync.replicas 不起作用

这是我学习 kafka 的早期阶段.我正在检查本地机器中的每个 kafka 属性/概念. 所以我遇到了这个属性min.insync.replicas,这是我的理解.如果我误解了什么,请纠正我. 将消息发送到主题后,必须将该消息写入至少 min.insync.replicas 数量的关注者. min.insync.replicas 还包括领导者. 如果可用的 live brokers ..
发布时间:2021-11-12 03:05:59 其他开发

Kafka - 知道消费者是否是最新的

我将 Kafka 0.9.0 与原生 Java 消费者客户端一起使用.如果我有 1 个主题和 1 个分区有人可以告诉我,如果我这样做:寻求结束(我的主题);民意调查(x); 我只会得到最后一个记录,所以我会知道我在最后一个位置? 解决方案 是的,你只会得到最后(最新)的记录,因为 seekToEnd() 方法“延迟评估",因此直到 poll() 被调用时才会计算结束.当然,当 pol ..
发布时间:2021-11-12 03:05:11 其他开发

如何让消费者从 Kafka 请求超过 1MB 的记录

每当我的消费者从 Kafka 请求一个新批次时,它总是请求 1MB 的数据,然后它似乎请求下一个 1MB,依此类推.有没有人知道接收20MB的批次需要什么配置和编程步骤? 解决方案 您可以将消费者属性中的属性 max.partition.fetch.bytes 设置为您想要的值(默认为 1MB). 此外,此值必须等于或大于代理配置中的 max.message.size 属性,以确保您 ..
发布时间:2021-11-12 03:04:47 其他开发

Apache kafka 选项中的高、中、低重要性级别有什么区别?

我想知道关于 kafka 选项 Importnace 级别的区别.Apache kafka 有 3 个级别. org.apache.kafka.commong.config.ConfigDef.Importance 公共枚举重要性{高、中、低} 这三个有什么区别? 解决方案 此枚举用作 Kafka 社区的指示器,为用户在许多配置上提供一些指导.您将在配置说明中看到它们. 没 ..

在处理来自 Kafka 的消息时避免数据丢失

寻找设计我的 Kafka 消费者的最佳方法.基本上我想看看什么是避免数据丢失的最佳方法,以防万一处理消息期间的异常/错误. 我的用例如下. a) 我使用 SERVICE 来处理消息的原因是 - 将来我计划编写一个 ERROR PROCESSOR 应用程序,它将在一天结束时运行,它将尝试处理失败的消息(不是所有消息,但由于缺少父级等任何依赖项而失败的消息再次出现. b) 我想确保消 ..
发布时间:2021-11-12 03:03:07 其他开发

将 CometD 客户端与 Kafka 生产者连接起来

是否可以将 CometD 客户端与 Kafka 生产者连接起来?有什么建议吗? 目前我在 python 中有一个 CometD 客户端,它从 Salesforce 对象中实时提取数据. 现在我想将该数据推送到 Kafka 生产者.有可能这样做吗?以及如何? 解决方案 已解决. 通过使用https://github.com/dkmadigan/python-bayeux-c ..

Kafka 轮询无记录的正确方法

为了让我的消费者保持活跃(非常长的可变长度处理),我在后台线程中实现了一个空的 poll() 调用,如果我在 polls() 之间花费太多时间,它将阻止代理重新平衡.我已将轮询间隔设置得很长,但我不想一直增加它以进行越来越长的处理. 轮询无记录的正确方法是什么?目前我正在调用 poll(),然后重新寻找在 poll call() 中返回的每个分区的最早偏移量,以便主线程在处理完之前的消息后可 ..
发布时间:2021-11-12 03:00:06 其他开发

在 kafka 中读取特定时间戳的消息

我想在 kafka 中读取从特定时间开始的所有消息.假设我想阅读 0600 到 0800 之间的所有消息 从 Kafka 请求两个时间戳之间的消息建议使用 offsetsForTimes 作为解决方案. 该解决方案的问题是:如果说我的消费者每天在 1300 开启.消费者当天不会阅读任何消息,这实际上意味着在 0600 时/之后没有提交偏移量,这意味着 offsetsForTimes( ..
发布时间:2021-11-12 02:59:22 其他开发

zookeeper 在哪里存放 kafka 集群和相关信息?

通过说集群信息,我指的是像 这样的信息 订阅的消费者/消费者群体 读取和提交的偏移量 分区的领导者和追随者 服务器上的主题等 zookeeper 是将这些信息保存在它自己的数据库中(尽管我从来没有听说过 Zookeeper 有任何自己的数据库)还是将这些信息存储在 Kafka 集群中的某些主题等? 编辑:以及后续问题:Zookeeper 如何从 __consumer_offs ..

Kafka Mirror Maker:同步 __consumer_offsets 主题重复项

遵循此处提到的解决方案 kafka-mirror-maker-失败复制消费者偏移主题.我能够在 DC1(实时 Kafka 集群)和 DC2(备份 Kafka 集群)集群中启动镜像制造商而没有任何错误. 看起来它也可以跨 DC2 集群从 DC1 集群同步 __consumer_offsets 主题. 问题 如果我关闭 DC1 的消费者并将相同的消费者(相同的 group_id)指向 ..
发布时间:2021-11-12 02:58:02 其他开发

Kafka 一遍又一遍地重播消息 - 心跳会话已过期 - 将协调器标记为死

使用 python kafka api 从一个主题中读取消息,其中只有少数消息.Kafka 不断地一遍又一遍地重放队列中的消息. 它从我的主题收到一条消息(每个消息内容都回来),然后抛出 ERROR - Heartbeat session expired - 标记协调器死了 并继续遍历其余消息并继续重播它们.更多日志: kafka.coordinator - ERROR - Heartbe ..
发布时间:2021-11-12 02:57:36 Python

如何在kafka中定义多个序列化器?

比如说,我发布和使用不同类型的 java 对象.对于每个对象,我必须定义自己的序列化器实现.我们如何在“serializer.class"属性下提供 kafka 消费者/生产者属性文件中的所有实现? 解决方案 我们对不同主题中的不同对象有类似的设置,但在一个主题中始终使用相同的对象类型.我们使用 ByteArrayDeserializer 随 Java API 0.9.0.1 一起提供,这 ..
发布时间:2021-11-12 02:56:53 其他开发

Kafka简单消费者间歇性丢失消息

我有一个 Kafka 应用程序,我在其中使用 kafka-console-consumer.sh 消费消息,如下所示: $./kafka-console-consumer.sh --zookeeper zookeeperhost:2181 --topic myTopic 它提供了我通过 Kafka 消费者写给 Kafka 代理的所有消息,没有任何遗漏. 最近我在一个不同的环境中部署了该应 ..
发布时间:2021-11-12 02:56:38 Java开发