kafka-consumer-api相关内容

Kafka最优保留和删除策略

我对 kafka 相当陌生,所以如果这个问题微不足道,请原谅我.为了计时测试,我有一个非常简单的设置,如下所示: 机器 A -> 写入主题 1 (Broker) -> 机器 B 从主题 1 读取机器 B -> 将刚刚读取的消息写入主题 2 (Broker) -> 机器 A 从主题 2 中读取 现在我在无限循环中发送大约 1400 字节的消息,很快填满了我的小代理上的空间.我正在尝试为 ..

Zookeeper 如何从 __consumer_offsets 主题中检索消费者偏移量?

这是“Zookeeper 在哪里做的后续问题存储Kafka集群和相关信息?"基于 Armando Ballaci 提供的答案. 现在很明显,消费者偏移量存储在 Kafka 集群中一个名为 __consumer_offsets 的特殊主题中.没关系,我只是想知道这些偏移量的检索是如何工作的. Topic 不像 RDBS,我们可以基于某个谓词查询任意数据.例如 - 如果数据存储在 RDBM ..

zookeeper 在哪里存放 kafka 集群和相关信息?

通过说集群信息,我指的是像 这样的信息 订阅的消费者/消费者群体 读取和提交的偏移量 分区的领导者和追随者 服务器上的主题等 zookeeper 是将这些信息保存在它自己的数据库中(尽管我从来没有听说过 Zookeeper 有任何自己的数据库)还是将这些信息存储在 Kafka 集群中的某些主题等? 编辑:以及后续问题:Zookeeper 如何从 __consumer_offs ..

Kafka 如何使用所需的偏移量提交消费者偏移量

我有 Kafka Stream 应用程序.我的应用程序正在成功处理事件. 如何使用所需的偏移量更改 Kafka 提交的消费者偏移量以重新处理/跳过事件.我尝试了如何更改主题的起始偏移量?.但我收到“节点不存在:"错误.请帮帮我. 解决方案 您所指的问题/答案基于较旧的 Kafka 版本.从 Kafka 0.9 开始,偏移量不再提交给 ZooKeeper,而是存储在一个名为 offse ..

消费者和生产者因错误而失败:“在读取响应之前断开了与 0 的连接"

我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天几次)消费者和生产者在得到响应时卡住了,即使 Kafka 在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由于重新 ..

使用 __consumer_offsets 杀死节点导致消费者没有消息消费

我有 3 个节点(nodes0、node1、node2)Kafka 集群(broker0、broker1、broker2),复制因子为 2,Zookeeper(使用与 Kafka tar 一起打包的 zookeeper)在不同的节点(节点 4)上运行. 我在启动 zookper 和剩余节点后启动了代理 0.在 broker 0 日志中可以看到它正在读取 __consumer_offsets ..

kafka消费机器需要运行zookeeper吗?

所以我的问题是:如果我有一台运行 Kafka(和 zookeeper)的服务器,而另一台机器只消费消息,那么消费者机器是否也需要运行 zookeeper?还是服务器负责所有? 解决方案 没有 Zookeeper 在 Kafka 中的作用是: 代理注册:(集群成员资格)具有心跳机制以保持列表最新 存储主题配置:存在哪些主题,每个主题有多少个分区有,副本在哪里,谁是首选领导者,I ..
发布时间:2021-11-14 23:58:24 其他开发

Kafka:如何连接 kafka-console-consumer 以获取远程代理主题内容?

我在 ec2 上的一台机器上设置了一个 kafka zookeeper 和 3 个代理,端口为 9092..9094,我正在尝试使用另一台机器上的主题内容.端口 2181 (zk)、9092、9093 和 9094(服务器)对消费者机器开放.我什至可以做一个 bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remo ..

如何在Nifi中查看Kafka的消费消息?

我已经启动了一个 Nifi 进程(消费 Kafka)并将其连接到一个主题.它正在运行,但我无法(不知道)在哪里可以查看消息? 解决方案 ConsumeKafka 处理器运行并为每条消息生成流文件.只有当您将处理器连接到其他组件(如另一个处理器或输出端口)时,您才能可视化正在传输的数据. 对于初学者,你可以试试这个: 将 ConsumeKafka 与 LogAttribute 或 ..
发布时间:2021-11-12 03:50:48 其他开发

如果我在 Kafka 中有事务性生产者,我可以使用 Kafka Streams 读取一次消息吗?

我想拥有 Exactly-once 语义,但我不想使用 Consumer 读取消息.我宁愿使用 Kafka Streams AP 阅读消息.如果我将 processing.guarantee=exactly_once 添加到 Stream 配置中,是否会保留恰好一次语义? 解决方案 Exactly-once 处理基于读-处理-写模式.Kafka Streams 使用这种模式,因此,如果您编 ..

如何转换/分叉 Kafka 流并将其发送到特定主题?

我正在尝试使用函数“mapValues"将在我的原始流“textlines"中获得的字符串值转换为 newStream.然后将我在 newStream 中获得的任何内容流式传输到名为“testoutput"的主题上.但是每次消息实际上通过转换块时,我都会收到一个 NullPointerException,错误仅指向 kafka 流库.不知道发生了什么:(( 附言当我从原始流分叉/创建新的 k ..

Apache kafka 选项中的高、中、低重要性级别有什么区别?

我想知道关于 kafka 选项 Importnace 级别的区别.Apache kafka 有 3 个级别. org.apache.kafka.commong.config.ConfigDef.Importance 公共枚举重要性{高、中、低} 这三个有什么区别? 解决方案 此枚举用作 Kafka 社区的指示器,为用户在许多配置上提供一些指导.您将在配置说明中看到它们. 没 ..

Kafka Stream 在重新平衡时重新处理旧消息

我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在 ..

我可以在与 Kafka Broker 相同的机器上运行 Kafka Streams 应用程序吗?

我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它 ..