kafka-consumer-api相关内容

在 Kafka 消费者配置中将 max.poll.interval.ms 设置为大于 request.timeout.ms 的负面影响是什么

根据 Kafka 文档; 新的 Java Consumer 现在支持来自后台的心跳线.有一个新的配置 max.poll.interval.ms控制消费者之前轮询调用之间的最长时间将主动离开组(默认为 5 分钟).的价值配置 request.timeout.ms 必须始终大于max.poll.interval.ms 因为这是 JoinGroup 的最长时间当消费者重新平衡时,请求可以在服务器上 ..
发布时间:2021-11-12 02:49:27 Java开发

Kafka以相反的顺序消费消息

我使用 Kafka 0.10,我有一个主题 logs,我的 IoT 设备将它们的日志发布到其中,我的消息的关键是 device-id ,所以所有的同一台设备的日志在同一个分区. 我有一个 api /devices/{id}/tail-logs 需要在调用时显示一个设备的最后 N 个日志. 目前我以一种非常低效的方式(但有效)实现了它,因为我从包含设备日志的分区的开头(即最旧的日志)开始 ..
发布时间:2021-11-12 02:49:14 其他开发

Kafka 如何保证消费者跨分区处理的消息排序?

来源:https://kafka.apache.org/intro “通过在主题内具有并行性(分区)的概念,Kafka 能够提供排序保证和负载平衡在消费者进程池上.这是通过分配将主题中的分区分配给消费者组中的消费者,以便每个分区由组中的一个消费者使用.经过这样做我们确保消费者是唯一的读者分区并按顺序消费数据." 这只是意味着每个消费者都会按顺序处理消息,但是在同一个消费者组中的消费者之 ..
发布时间:2021-11-12 02:49:06 其他开发

Spark Structured Streaming with Secure Kafka throwing : 无权访问组异常

为了在我的项目中使用结构化流,我正在 hortonworks 2.6.3 环境中测试 spark 2.2.0 和 Kafka 0.10.1 与 Kerberos 的集成,我在示例代码下运行以检查集成.我能够在 Spark 本地模式下在 IntelliJ 上运行以下程序而没有任何问题,但是当在 Hadoop 集群上移动到纱线集群/客户端模式时,相同的程序会抛出以下异常. 我知道我可以为 gro ..

Kafka Java Consumer 已经关闭

我刚刚开始使用 Kafka.我正面临着消费者的一个小问题.我用Java编写了一个消费者. 我收到此异常 - IllegalStateException 此使用者已关闭. 我在以下行中遇到异常: ConsumerRecords消费者记录 = 消费者.poll(1000); 这是在我的消费者因某些异常而崩溃后开始发生的,当我再次尝试运行它时,它给了我这个异常. 完整代码如下: ..
发布时间:2021-11-12 02:48:04 Java开发

从 Kafka 0.11.0.1 中的 _transaction_state 主题读取数据

我想读取事务的元数据(Kafka 0.11.0.1 支持),以便我可以确定特定事务 ID 的事务是否已提交.目前我正在从 _transactional_state 主题获取键和值,但它采用某种编码格式.以下是我在轮询 __transaction_state 主题时收到的一些相同的键/值:键 = 10000000mmm,值 = +' ) 解决方案 您可以在 kafka/tools/DumpL ..

如何在特定偏移量到特定偏移量中使用来自 kafka 主题的数据?

我需要将特定的偏移量消耗到特定的结束偏移量!!consumer.seek() 从特定偏移量读取数据,但我需要将数据从偏移量检索到 tooffset !!任何帮助将不胜感激,提前致谢. ConsumerRecords记录 = 消费者.poll(100);如果(标志){consumer.seek(new TopicPartition("topic-1", 0), 90);标志 = 假;} 解决方 ..
发布时间:2021-11-12 02:47:04 Java开发

消费者和生产者失败并出现错误:“在读取响应之前断开了与 0 的连接"

我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天中几次)消费者和生产者在获得响应时卡住了,即使 Kafka 正在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接)在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由 ..

max.poll.records 如何影响消费者投票

max.poll.records 最近在 kafka 消费者配置中更改为 500,但我想知道这如何影响消费者投票.是否只是可以获取的最大记录数的上限,还是消费者等待获取 500 条记录. 解决方案 ma​​x.poll.records : Yes from new consumer 这个属性默认改为 500,这意味着消费者可以轮询最少 1 到每次轮询最多 500 条记录,这意味着消费者不会 ..
发布时间:2021-11-12 02:46:43 其他开发

max.poll.records 如何影响消费者投票

max.poll.records 最近在 kafka 消费者配置中更改为 500,但我想知道这如何影响消费者投票.是否只是可以获取的最大记录数的上限,还是消费者等待获取 500 条记录. 解决方案 ma​​x.poll.records : Yes from new consumer 这个属性默认改为 500,这意味着消费者可以轮询最少 1 到每次轮询最多 500 条记录,这意味着消费者不会 ..
发布时间:2021-11-12 02:46:15 其他开发

Consumer.endOffsets 在 Kafka 中是如何工作的?

假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交偏移和结束偏移.类似于 Kafka 控制台消费者组脚本的工作方式,但它适用于所有组. 类似的东西 单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量) 消费者消费者;静止的 {消费者 = createConsumer() ..
发布时间:2021-11-12 02:46:12 Java开发

Apache Kafka 是否提供异步订阅回调 API?

我的项目将 Apache Kafka 视为一种潜在的替代基于 JMS 的陈旧消息传递方法.为了让这个过渡尽可能顺利,如果替换队列系统(Kafka)有一个异步订阅机制,类似于我们当前项目的使用 MessageListener 和 MessageConsumer 订阅主题并接收异步通知.如果 Kafka 不严格遵守 JMS API,我不太在意,但相反,如果我不需要,我宁愿不重新设计我们的整个发布-订阅 ..

Kafka:如何获取主题的上次修改时间,即添加到主题任何分区的最后一条消息

我们的用例是从 kafka 中删除陈旧/未使用的主题,即如果一个主题(在所有分区上)在过去 7 天内没有任何新消息,那么我们会将其视为陈旧/未使用并删除它. > 许多谷歌结果建议在消息中添加时间戳,然后对其进行解析.对于新主题 &消息,soultion 会起作用,但我们现有的主题 &消息中没有任何时间戳. 我怎样才能让它工作?. 解决方案 kafka.api.OffsetRequ ..
发布时间:2021-11-12 02:45:25 其他开发

Kafka 消费者组抵消保留

在该组中的所有消费者都失败后,kafka 将存储该消费者组的偏移量多长时间?这个有配置变量吗? 解决方案 正确的属性名称是: offsets.retention.minutes 来自 https://kafka.apache.org/documentation/#brokerconfigs ..
发布时间:2021-11-12 02:45:08 其他开发

卡夫卡消费者寻求开始

我没有使用分区发布到 Kafka 主题.ProducerRecord(String topic, K key, V value) 在消费者方面,我想从头说起.seekToBeginning(集合分区) 是否可以在不使用分区的情况下寻求开始?Kafka 是否分配了默认分区? https://kafka.apache.org/0102/javadoc/org/apache/kafka ..
发布时间:2021-11-12 02:44:52 其他开发

kafka 是否有任何默认的 Web UI

我有几个关于 Kafka 的问题. 1) Kafka 是否有默认的 Web UI? 2) 我们如何优雅地关闭独立的 kafka 服务器,kafka 控制台-消费者/控制台生产者. 任何解决方案将不胜感激. 谢谢. 解决方案 1) 没有 Kafka 没有默认 UI. 然而,有许多第三方工具可以以图形方式显示 Kafka 资源.只需在 Google 上搜索 kaf ..
发布时间:2021-11-12 02:44:40 其他开发

从副本消费

Kafka 将主题的每个分区复制到指定的复制因子. 据我所知,所有写入和读取请求都路由到分区的领导者.有没有办法从追随者而不是领导者那里消费? Kafka 中的复制是否仅用于故障转移? 解决方案 在 Kafka 2.3 及更早版本中,您只能从领导者那里消费——这是设计使然.复制仅用于容错. 如果leader失败,其中一个follower会被选为新leader. 查看 ..
发布时间:2021-11-12 02:44:37 其他开发

保留期后的卡夫卡偏移

我有一个带有 1 个分区的 kafka 主题.如果其中有 100 条消息,则偏移量将从 0.99 开始. 根据 kafka 保留政策,所有消息将在指定时间段后被清除. 一旦所有消息都被清除(保留期后),我将向该主题发送 100 条新消息.现在,消息的新偏移量从哪里开始?是从 100 开始还是从 0 开始?? 我想了解新的偏移量是 100-199 还是 0-99? 解决方案 ..
发布时间:2021-11-12 02:44:34 其他开发