kafka-consumer-api相关内容

是否可以在 Kafka+SparkStreaming 中获取特定的消息偏移量?

我正在尝试使用 Spark Direct Stream 获取并存储 Kafka 中特定消息的偏移量.查看 Spark 文档很容易获取每个分区的范围偏移量,但我需要的是在完全扫描队列后存储主题的每条消息的起始偏移量. 解决方案 是的,你可以使用 MessageAndMetadata 版本的 createDirectStream 允许您访问消息元数据. 您可以在此处找到返回 tuple3 ..

来自 kafka 消费者的 InstanceAlreadyExistsException

我正在与 Kafka 合作并尝试通过遵循此文章.唯一的区别是我创建了自己的抽象类处理程序以简化设计. 下面是我的抽象类: public 抽象类 Consumer 实现 Runnable {私有最终属性 consumerProps;私有最终字符串消费者名称;公共消费者(字符串消费者名称,属性消费者道具){this.consumerName = 消费者名称;this.consumerProps ..
发布时间:2021-11-12 02:24:30 Java开发

Strimzi - 连接外部客户端

根据此处的讨论,我使用以下步骤启用外部客户端(基于 kafkajs) 在 OpenShift 上连接到 Strimzi.这些步骤来自这里. 启用外部路由 kafka-persistent-single.yaml 修改为如下所示. apiVersion: kafka.strimzi.io/v1beta1种类:卡夫卡元数据:名称:我的集群规格:卡夫卡:版本:2.3.0复制品:1听众:清楚 ..

如何暂停 kafka 消费者?

我在我的框架中使用了 Kafka 生产者 - 消费者模型.在消费者端消费的记录稍后会被索引到 elasticsearch 上.这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者直到 ES 启动,一旦它启动,我需要恢复消费者并从我上次离开的地方消费记录.我认为@KafkaListener 无法实现这一点.任何人都可以请给我一个解决方案吗?我发现我需要为此编写自己的 KafkaLi ..
发布时间:2021-11-12 02:21:51 Java开发

Kafka 减少消费者的延迟

我正在设置新的 Kafka 集群,出于测试目的,我创建了具有 1 个分区和 3 个副本的主题. 现在,我通过生产者并行地发送消息,比如每秒 50K 条消息.我在一个组内创建了一个消费者,它每秒只能获取 30K 条消息. 我可以更改主题级别、分区级别、消费者级别的配置. 我正在通过 grafana + prometheus 监控一切. 知道哪些配置或其他东西可以帮助我消费更多 ..
发布时间:2021-11-12 02:21:10 其他开发

获取关联 ID 为 1 的元数据时出现警告错误:{MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

当我使用 kafka 0.9.0.1 运行以下命令时,我收到此警告 [1].你能告诉我我的主题有什么问题吗?(我正在与在 ec2 中运行的 kafka 代理交谈) #./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC? [1] [2016-04-06 ..

如何通过jmx监控kafka中的消费者滞后?

我有一个 kafka 设置,其中包括一个到 prometheus 的 jmx 导出器.我正在寻找一个指标,它根据主题和 groupid 给出偏移滞后.我正在运行 kafka 2.2.0. 一些在线资源指向一个名为 kafka.consumer 的指标,但我的设置中没有这样的指标. 来自我的 jmx 终端: $>域#以下域名可用JMI实现com.sun.management语言程序ja ..
发布时间:2021-11-12 02:19:50 其他开发

kafka 新版本 2.1.0 broker 无故挂起

起初集群中的所有broker都可以正常启动和工作,但有时其中一个broker会遇到问题.并且会出现一些现象: 整个集群挂了,生产者和消费者也不工作,因此网络流量从监视器降为零; 使用kafka-topic.sh描述主题消息,每个副本都可以,即使是异常brokerid,zk中的信息也正常; 从/proc/sys/fs/file-nr读取的异常broker上的文件描述号逐渐增加 netst ..
发布时间:2021-11-12 02:18:56 其他开发

如何使用工厂为特定主题配置 Spring Kafka Listener?

我希望能够通过属性读取主题,而无需在 Kafka 侦听器注释上指定任何内容.不使用 Spring Boot. 我尝试通过“主题"键直接从属性对象读取主题.这会产生错误:IllegalStateException:topics, topicPattern, or topicPartitions must be provided. //某个类@KafkaListener公共无效侦听器(列表消息 ..
发布时间:2021-11-12 02:16:09 Java开发

如何配置 Kafka 主题,以便 RDMS 和图等数据库可以以事件的形式使用互连的实体模式

我有一个案例,我有包含 Element 对象的 Information 对象.如果我存储一个 Information 对象,它将尝试根据唯一值字段查找预先存在的 Element 对象,否则插入它们.Information 对象和 Element 对象暂时无法删除.添加父项需要两个预先存在的 Element 对象.我计划使用三个主题:CreateElement、CreateInformation、A ..
发布时间:2021-11-12 02:16:00 其他开发

如何为 PHP 安装 Kafka 的扩展?

这是我要安装的扩展:https://github.com/EVODelavega/phpkafka 传递给队列的消息应该是 JSON 格式. 目前,我收到安装错误:1. 说明要求我安装 librdkafka.2.上一步的安装链接是this.我无法使用第 1 和第 4 种方法进行安装.这是错误: 在默认路径中检查 librdkafka/rdkafka.h"... 未找到配置:错误:请重 ..
发布时间:2021-11-12 02:15:48 PHP

具有高级消费者的 Apache Kafka:跳过损坏的消息

我遇到了高级 kafka 消费者 (0.8.2.0) 的问题 - 在消耗了一些数据后,我们的一个消费者停止了.重新启动后,它会消耗一些消息并再次停止,没有错误/异常或警告. 经过一番调查,我发现消费者的问题是这个异常: ERROR c.u.u.e.impl.kafka.KafkaConsumer - 使用消息流时出错:kafka.message.InvalidMessageExceptio ..
发布时间:2021-11-12 02:15:46 其他开发

使用 __consumer_offsets 杀死节点导致消费者没有消息消费

我有 3 个节点(nodes0、node1、node2)Kafka 集群(broker0、broker1、broker2),复制因子为 2,Zookeeper(使用与 Kafka tar 一起打包的 zookeeper)在不同的节点(节点 4)上运行. 我在启动 zookper 和剩余节点后启动了代理 0.在 broker 0 日志中可以看到它正在读取 __consumer_offsets ..

如何从 Kafka 主题中获取最近的消息

我们是否有任何选择,例如从 Kafka 主题中获取最近的 10/20/等消息.我可以看到 --from-beginning 选项以从主题中获取所有消息,但如果我只想获取第一条、最后一条、中间条或最新条 10 条消息.我们有一些选择吗? 解决方案 前 N 个消息 您可以使用 --max-messages N 来获取主题的前 N 条消息. 例如,要获取前 10 条消息,请运行 b ..
发布时间:2021-11-12 02:15:19 其他开发