kafka-consumer-api相关内容

Kafka 消费者偏移量提交检查以避免提交较小的偏移量

我们假设我们有一个消费者发送一个提交偏移量 10 的请求.如果存在通信问题并且代理没有收到请求,当然也没有响应.之后,我们让另一个消费者处理另一批并成功提交偏移量 20. 问:我想知道是否有一种方法或属性可以处理,以便在我们提交偏移量 20 之前,我们可以检查日志中的前一个偏移量是否已提交? 解决方案 你所描述的场景只有在使用异步提交时才会发生. 请记住,一个特定的 Topic ..
发布时间:2021-11-12 02:56:35 其他开发

Kafka 连接教程停止工作

我在此链接中遵循第 7 步(使用 Kafka Connect 导入/导出数据): http://kafka.apache.org/documentation.html#quickstart 它运行良好,直到我删除了“test.txt"文件.主要是因为这就是 log4j 文件的工作方式.一段时间后,文件将被旋转 - 我的意思是 - 它将被重命名 &将开始写入具有相同名称的新文件. ..

Python KafkaConsumer 从时间戳开始消费消息

我打算跳过主题的开头,只读取从某个时间戳到结尾的消息.关于如何实现这一目标的任何提示? 解决方案 我猜你在使用 kafka-python (https://github.com/dpkp/kafka-python) 正如你提到的“KafkaConsumer". 您可以使用 offsets_for_times() 方法来检索与时间戳匹配的偏移量.https://kafka-python ..

如何从多分区的 Kafka 主题中按顺序(按时间戳顺序)使用数据

我知道当一个主题有多个分区时,Kafka 将无法保证数据的排序.但我的问题是:- 我需要对一个事件主题(生成事件的用户活动)进行多个分区,因为我希望多个消费者组使用该主题中的数据.但有时我需要引导整个数据,即从头到尾读取完整的数据,并从 Kafka 中的历史消息重建我的事件图,然后我失去了造成问题的排序.一种方法可能是在 Map-Reduce 范式中处理它,我根据时间映射数据并对其进行排序和使用. ..
发布时间:2021-11-12 02:54:49 其他开发

Kafka 一遍又一遍地重播消息 - 心跳会话已过期 - 将协调器标记为死

使用 python kafka api 从一个主题中读取消息,其中只有少数消息.Kafka 不断地一遍又一遍地重放队列中的消息. 它从我的主题收到一条消息(每个消息内容都回来),然后抛出 ERROR - Heartbeat session expired - 标记协调器死了 并继续遍历其余消息并继续重播它们.更多日志: kafka.coordinator - ERROR - Heartbe ..
发布时间:2021-11-12 02:54:22 Python

Kafka 偏移管理

我们正在使用 Kafka 0.10...我在网上(和文档中)看到一些关于当 enable.auto.commit 为 TRUE 时如何在 kafka 中管理偏移量的相互矛盾的信息.检索消息的同一个 poll() 方法是否也按配置的时间间隔处理提交? 如果我在单线程应用程序中从 poll 检索消息,则在 SAME 线程中处理消息直至完成(包括处理错误),这意味着在我的处理完成之前不会再次调用 ..
发布时间:2021-11-12 02:53:31 其他开发

卡夫卡主题查看器?

我想调试一些 Kafka 主题,以便我知道这里是消费者还是生产者有错. 是否有 Kafka 的 UI,我可以在其中查看主题包含哪些消息?转储器也不错,这样我就可以自己搜索东西了. 解决方案 我将支持 Yoni Gibb 对 Landoop 产品的建议.我也在开发中使用它并发现它非常有用;尽管您可能需要围绕超时和大小调整一些设置才能查看所有消息.易于安装,只需拉取 Docker 镜像. ..
发布时间:2021-11-12 02:52:59 其他开发

Kafka Mirror Maker:同步 __consumer_offsets 主题重复项

遵循此处提到的解决方案 kafka-mirror-maker-失败复制消费者偏移主题.我能够在 DC1(实时 Kafka 集群)和 DC2(备份 Kafka 集群)集群中启动镜像制造商而没有任何错误. 看起来它也可以跨 DC2 集群从 DC1 集群同步 __consumer_offsets 主题. 问题 如果我关闭 DC1 的消费者并将相同的消费者(相同的 group_id)指向 ..
发布时间:2021-11-12 02:52:41 其他开发

无法使用 kafka-avro-console-consumer 读取 avro 消息.SerializationException:未知的魔法字节

我正在编写一个 REST 代理,比如融合的休息代理.它接受一个 JSON 负载、模式主题和 id,然后将 JSON 负载作为 Avro 对象写入流中.当我使用 kafka-avro-console-consumer 读取消息时,出现“未知幻字节"错误. 这是我的 kafka 生产者配置: properties.put("client.id", LocalHostUtils.getLoca ..

偏移量的异步自动提交失败

我有一个关于 Kafka 自动提交机制的问题.我正在使用启用了自动提交的 Spring-Kafka.作为一个实验,我在系统空闲时断开了消费者与 Kafka 的连接 30 秒(主题中没有新消息,没有消息正在处理).重新连接后,我收到了一些消息,如下所示: 偏移量异步自动提交 {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leade ..
发布时间:2021-11-12 02:51:31 其他开发

将协调器标记为组死亡(Kafka)

我的一项服务在几个小时前停止工作.我已经重新启动了服务器,但是这个消费者进程没有注册到 Kafka.这是消费者启动时显示的内容: 2017-04-21 10:22:54.887 INFO 18036 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator :撤销先前为组 mysql-conversions-group 分配的分区 ..
发布时间:2021-11-12 02:51:25 其他开发

Kafka如何同时实现分布式处理和高可用?

我有一个由 n 个分区组成的主题.为了进行分布式处理,我创建了两个在不同机器上运行的进程.他们订阅具有相同分组 id 的主题并分配 n/2 个线程,每个线程处理单个流(每个进程 n/2 个分区). 有了这个,我将实现负载分配,但现在如果进程 1 崩溃,那么进程 2 无法使用来自分配给进程 1 的分区的消息,因为它在开始时只侦听 n/2 个流. 否则,如果我为 HA 配置并在两个进程上启 ..

偏移量的异步自动提交失败

我有一个关于 Kafka 自动提交机制的问题.我正在使用启用了自动提交的 Spring-Kafka.作为一个实验,我在系统空闲时断开了消费者与 Kafka 的连接 30 秒(主题中没有新消息,没有消息正在处理).重新连接后,我收到了一些消息,如下所示: 偏移量异步自动提交 {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leade ..
发布时间:2021-11-12 02:50:46 其他开发

Kafka 节点突然从偏移量 0 开始消耗

有时,kafka-node 消费者从偏移量 0 开始消费,而其默认行为仅消费较新的消息.然后它不会切换回其默认行为.你知道如何解决这个问题吗?会发生什么,它的行为会突然改变吗?代码非常简单,无需更改代码即可实现. var kafka = require("kafka-node") ;消费者 = kafka.Consumer;客户端 = 新 kafka.KafkaClient();消费者 = 新消 ..
发布时间:2021-11-12 02:50:20 其他开发

将协调器标记为组死亡(Kafka)

我的一项服务在几个小时前停止工作.我已经重新启动了服务器,但是这个消费者进程没有注册到 Kafka.这是消费者启动时显示的内容: 2017-04-21 10:22:54.887 INFO 18036 --- [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator :撤销先前为组 mysql-conversions-group 分配的分区 ..
发布时间:2021-11-12 02:50:02 其他开发

在什么情况下endOffset >lastMsg.offset + 1?

Kafka 为一个分区返回 endOffset 15,但可以从中消费的最后一条消息的偏移量为 13,而不是我期望的 14.我想知道为什么. Kafka 文档 阅读 在默认的read_uncommitted隔离级别下,结束偏移量是高水印(即最后成功复制的消息的偏移量加一).对于 read_committed 消费者,结束偏移量是最后一个稳定偏移量 (LSO),它是高水印中的最小值和任何打 ..
发布时间:2021-11-12 02:49:59 其他开发