kafka-consumer-api相关内容
我正在尝试使用 Spark Direct Stream 获取并存储 Kafka 中特定消息的偏移量.查看 Spark 文档很容易获取每个分区的范围偏移量,但我需要的是在完全扫描队列后存储主题的每条消息的起始偏移量. 解决方案 是的,你可以使用 MessageAndMetadata 版本的 createDirectStream 允许您访问消息元数据. 您可以在此处找到返回 tuple3
..
在创建多个消费者(使用 Kafka 0.9 java API)并启动每个线程后,我收到以下异常 Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: 由于组重新平衡,提交无法完成类 com.messagehub.consumer.Consumer 正在关闭.org.
..
我正在与 Kafka 合作并尝试通过遵循此文章.唯一的区别是我创建了自己的抽象类处理程序以简化设计. 下面是我的抽象类: public 抽象类 Consumer 实现 Runnable {私有最终属性 consumerProps;私有最终字符串消费者名称;公共消费者(字符串消费者名称,属性消费者道具){this.consumerName = 消费者名称;this.consumerProps
..
根据此处的讨论,我使用以下步骤启用外部客户端(基于 kafkajs) 在 OpenShift 上连接到 Strimzi.这些步骤来自这里. 启用外部路由 kafka-persistent-single.yaml 修改为如下所示. apiVersion: kafka.strimzi.io/v1beta1种类:卡夫卡元数据:名称:我的集群规格:卡夫卡:版本:2.3.0复制品:1听众:清楚
..
当我尝试运行它时,我已经尝试根据时间戳从 Kafka 主题获取偏移量,它抛出空指针错误, MaptimestampsToSearch = new HashMap();对于(主题分区分区:分区){timestampsToSearch.put(partition, startTimestamp);}MapoutOffsets = consumer.offsetsForTimes(timestamps
..
我在我的框架中使用了 Kafka 生产者 - 消费者模型.在消费者端消费的记录稍后会被索引到 elasticsearch 上.这里我有一个用例,如果 ES 关闭,我将不得不暂停 kafka 消费者直到 ES 启动,一旦它启动,我需要恢复消费者并从我上次离开的地方消费记录.我认为@KafkaListener 无法实现这一点.任何人都可以请给我一个解决方案吗?我发现我需要为此编写自己的 KafkaLi
..
我们的 kafka 系统崩溃了,因为没有可用的磁盘空间.消费者是使用 Kafka Streams API 的 Spring 启动应用程序.现在每个消费者应用程序都显示以下错误: java.io.FileNotFoundException:/tmp/kafka-streams/908a79bc-92e7-4f9c-a63a-5030cf4d3555/streams.device-identific
..
为了实现 Kafka 消费者对消息的一次处理,我一次提交一条消息,如下所示 public void commitOneRecordConsumer(long seconds) {KafkaConsumer消费者 = consumerConfigFactory.getConsumerConfig();尝试 {在跑步的时候) {ConsumerRecords记录 = 消费者.poll(1000);尝
..
我正在设置新的 Kafka 集群,出于测试目的,我创建了具有 1 个分区和 3 个副本的主题. 现在,我通过生产者并行地发送消息,比如每秒 50K 条消息.我在一个组内创建了一个消费者,它每秒只能获取 30K 条消息. 我可以更改主题级别、分区级别、消费者级别的配置. 我正在通过 grafana + prometheus 监控一切. 知道哪些配置或其他东西可以帮助我消费更多
..
当我使用 kafka 0.9.0.1 运行以下命令时,我收到此警告 [1].你能告诉我我的主题有什么问题吗?(我正在与在 ec2 中运行的 kafka 代理交谈) #./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC? [1] [2016-04-06
..
我有一个 kafka 设置,其中包括一个到 prometheus 的 jmx 导出器.我正在寻找一个指标,它根据主题和 groupid 给出偏移滞后.我正在运行 kafka 2.2.0. 一些在线资源指向一个名为 kafka.consumer 的指标,但我的设置中没有这样的指标. 来自我的 jmx 终端: $>域#以下域名可用JMI实现com.sun.management语言程序ja
..
起初集群中的所有broker都可以正常启动和工作,但有时其中一个broker会遇到问题.并且会出现一些现象: 整个集群挂了,生产者和消费者也不工作,因此网络流量从监视器降为零; 使用kafka-topic.sh描述主题消息,每个副本都可以,即使是异常brokerid,zk中的信息也正常; 从/proc/sys/fs/file-nr读取的异常broker上的文件描述号逐渐增加 netst
..
我正在开始使用最新的 Kafka 文档 http://kafka.apache.org/documentation.html.但是当我尝试使用新的消费者 API 时遇到了一些问题.我已经通过以下步骤完成了这项工作: 1.添加新的依赖 org.apache.kafkakafka-clients
..
我希望能够通过属性读取主题,而无需在 Kafka 侦听器注释上指定任何内容.不使用 Spring Boot. 我尝试通过“主题"键直接从属性对象读取主题.这会产生错误:IllegalStateException:topics, topicPattern, or topicPartitions must be provided. //某个类@KafkaListener公共无效侦听器(列表消息
..
我有一个案例,我有包含 Element 对象的 Information 对象.如果我存储一个 Information 对象,它将尝试根据唯一值字段查找预先存在的 Element 对象,否则插入它们.Information 对象和 Element 对象暂时无法删除.添加父项需要两个预先存在的 Element 对象.我计划使用三个主题:CreateElement、CreateInformation、A
..
这是我要安装的扩展:https://github.com/EVODelavega/phpkafka 传递给队列的消息应该是 JSON 格式. 目前,我收到安装错误:1. 说明要求我安装 librdkafka.2.上一步的安装链接是this.我无法使用第 1 和第 4 种方法进行安装.这是错误: 在默认路径中检查 librdkafka/rdkafka.h"... 未找到配置:错误:请重
..
我遇到了高级 kafka 消费者 (0.8.2.0) 的问题 - 在消耗了一些数据后,我们的一个消费者停止了.重新启动后,它会消耗一些消息并再次停止,没有错误/异常或警告. 经过一番调查,我发现消费者的问题是这个异常: ERROR c.u.u.e.impl.kafka.KafkaConsumer - 使用消息流时出错:kafka.message.InvalidMessageExceptio
..
我有 3 个节点(nodes0、node1、node2)Kafka 集群(broker0、broker1、broker2),复制因子为 2,Zookeeper(使用与 Kafka tar 一起打包的 zookeeper)在不同的节点(节点 4)上运行. 我在启动 zookper 和剩余节点后启动了代理 0.在 broker 0 日志中可以看到它正在读取 __consumer_offsets
..
我们是否有任何选择,例如从 Kafka 主题中获取最近的 10/20/等消息.我可以看到 --from-beginning 选项以从主题中获取所有消息,但如果我只想获取第一条、最后一条、中间条或最新条 10 条消息.我们有一些选择吗? 解决方案 前 N 个消息 您可以使用 --max-messages N 来获取主题的前 N 条消息. 例如,要获取前 10 条消息,请运行 b
..
我更改了监听 Kafka 主题的 Web 服务的消费者组 ID.现在,旧的组 ID 仍然注册到主题,但没有具有该组 ID 的消费者.因此,它是滞后的.如何从特定主题中删除特定的消费者群体? 我试过了: kafka-consumer-groups --bootstrap-server kafka01.myserver.com:9092 --topic notification-topic
..