kafka-consumer-api相关内容
根据 Kafka 文档; 新的 Java Consumer 现在支持来自后台的心跳线.有一个新的配置 max.poll.interval.ms控制消费者之前轮询调用之间的最长时间将主动离开组(默认为 5 分钟).的价值配置 request.timeout.ms 必须始终大于max.poll.interval.ms 因为这是 JoinGroup 的最长时间当消费者重新平衡时,请求可以在服务器上
..
我使用 Kafka 0.10,我有一个主题 logs,我的 IoT 设备将它们的日志发布到其中,我的消息的关键是 device-id ,所以所有的同一台设备的日志在同一个分区. 我有一个 api /devices/{id}/tail-logs 需要在调用时显示一个设备的最后 N 个日志. 目前我以一种非常低效的方式(但有效)实现了它,因为我从包含设备日志的分区的开头(即最旧的日志)开始
..
来源:https://kafka.apache.org/intro “通过在主题内具有并行性(分区)的概念,Kafka 能够提供排序保证和负载平衡在消费者进程池上.这是通过分配将主题中的分区分配给消费者组中的消费者,以便每个分区由组中的一个消费者使用.经过这样做我们确保消费者是唯一的读者分区并按顺序消费数据." 这只是意味着每个消费者都会按顺序处理消息,但是在同一个消费者组中的消费者之
..
为了在我的项目中使用结构化流,我正在 hortonworks 2.6.3 环境中测试 spark 2.2.0 和 Kafka 0.10.1 与 Kerberos 的集成,我在示例代码下运行以检查集成.我能够在 Spark 本地模式下在 IntelliJ 上运行以下程序而没有任何问题,但是当在 Hadoop 集群上移动到纱线集群/客户端模式时,相同的程序会抛出以下异常. 我知道我可以为 gro
..
我刚刚开始使用 Kafka.我正面临着消费者的一个小问题.我用Java编写了一个消费者. 我收到此异常 - IllegalStateException 此使用者已关闭. 我在以下行中遇到异常: ConsumerRecords消费者记录 = 消费者.poll(1000); 这是在我的消费者因某些异常而崩溃后开始发生的,当我再次尝试运行它时,它给了我这个异常. 完整代码如下:
..
我想读取事务的元数据(Kafka 0.11.0.1 支持),以便我可以确定特定事务 ID 的事务是否已提交.目前我正在从 _transactional_state 主题获取键和值,但它采用某种编码格式.以下是我在轮询 __transaction_state 主题时收到的一些相同的键/值:键 = 10000000mmm,值 = +' ) 解决方案 您可以在 kafka/tools/DumpL
..
我需要将特定的偏移量消耗到特定的结束偏移量!!consumer.seek() 从特定偏移量读取数据,但我需要将数据从偏移量检索到 tooffset !!任何帮助将不胜感激,提前致谢. ConsumerRecords记录 = 消费者.poll(100);如果(标志){consumer.seek(new TopicPartition("topic-1", 0), 90);标志 = 假;} 解决方
..
我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天中几次)消费者和生产者在获得响应时卡住了,即使 Kafka 正在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接)在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由
..
max.poll.records 最近在 kafka 消费者配置中更改为 500,但我想知道这如何影响消费者投票.是否只是可以获取的最大记录数的上限,还是消费者等待获取 500 条记录. 解决方案 max.poll.records : Yes from new consumer 这个属性默认改为 500,这意味着消费者可以轮询最少 1 到每次轮询最多 500 条记录,这意味着消费者不会
..
max.poll.records 最近在 kafka 消费者配置中更改为 500,但我想知道这如何影响消费者投票.是否只是可以获取的最大记录数的上限,还是消费者等待获取 500 条记录. 解决方案 max.poll.records : Yes from new consumer 这个属性默认改为 500,这意味着消费者可以轮询最少 1 到每次轮询最多 500 条记录,这意味着消费者不会
..
假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出滞后、提交偏移和结束偏移.类似于 Kafka 控制台消费者组脚本的工作方式,但它适用于所有组. 类似的东西 单个消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量) 消费者消费者;静止的 {消费者 = createConsumer()
..
我的项目将 Apache Kafka 视为一种潜在的替代基于 JMS 的陈旧消息传递方法.为了让这个过渡尽可能顺利,如果替换队列系统(Kafka)有一个异步订阅机制,类似于我们当前项目的使用 MessageListener 和 MessageConsumer 订阅主题并接收异步通知.如果 Kafka 不严格遵守 JMS API,我不太在意,但相反,如果我不需要,我宁愿不重新设计我们的整个发布-订阅
..
我们的用例是从 kafka 中删除陈旧/未使用的主题,即如果一个主题(在所有分区上)在过去 7 天内没有任何新消息,那么我们会将其视为陈旧/未使用并删除它. > 许多谷歌结果建议在消息中添加时间戳,然后对其进行解析.对于新主题 &消息,soultion 会起作用,但我们现有的主题 &消息中没有任何时间戳. 我怎样才能让它工作?. 解决方案 kafka.api.OffsetRequ
..
在该组中的所有消费者都失败后,kafka 将存储该消费者组的偏移量多长时间?这个有配置变量吗? 解决方案 正确的属性名称是: offsets.retention.minutes 来自 https://kafka.apache.org/documentation/#brokerconfigs
..
所以我正在尝试使用 Kafka 流进行交互式查询.我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上).我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka. 我已经设置了这样的主题 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 -
..
我没有使用分区发布到 Kafka 主题.ProducerRecord(String topic, K key, V value) 在消费者方面,我想从头说起.seekToBeginning(集合分区) 是否可以在不使用分区的情况下寻求开始?Kafka 是否分配了默认分区? https://kafka.apache.org/0102/javadoc/org/apache/kafka
..
我正在做 Python Kafka 消费者(尝试在 http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html).当我运行以下代码时,它会一直运行,即使所有消息都被消耗了.我希望消费者在消费完所有消息后停止.怎么做?我也不知道如何使用 stop() 函数(它在基类 kafka.consumer.base.Cons
..
我有几个关于 Kafka 的问题. 1) Kafka 是否有默认的 Web UI? 2) 我们如何优雅地关闭独立的 kafka 服务器,kafka 控制台-消费者/控制台生产者. 任何解决方案将不胜感激. 谢谢. 解决方案 1) 没有 Kafka 没有默认 UI. 然而,有许多第三方工具可以以图形方式显示 Kafka 资源.只需在 Google 上搜索 kaf
..
Kafka 将主题的每个分区复制到指定的复制因子. 据我所知,所有写入和读取请求都路由到分区的领导者.有没有办法从追随者而不是领导者那里消费? Kafka 中的复制是否仅用于故障转移? 解决方案 在 Kafka 2.3 及更早版本中,您只能从领导者那里消费——这是设计使然.复制仅用于容错. 如果leader失败,其中一个follower会被选为新leader. 查看
..
我有一个带有 1 个分区的 kafka 主题.如果其中有 100 条消息,则偏移量将从 0.99 开始. 根据 kafka 保留政策,所有消息将在指定时间段后被清除. 一旦所有消息都被清除(保留期后),我将向该主题发送 100 条新消息.现在,消息的新偏移量从哪里开始?是从 100 开始还是从 0 开始?? 我想了解新的偏移量是 100-199 还是 0-99? 解决方案
..