kafka-consumer-api相关内容
我正在使用基于注解的 spring kafka 监听器来消费 kafka 消息,代码如下 使用员工对象 Class Employee{私人字符串名称;私有字符串地址;私人对象帐户;//获取者//二传手} Account 对象在运行时决定它是 Saving Account 还是 Current Account 等. Class SavingAcc{私人 BigDecimal 余额
..
我有一个要求,我必须对 Kafka 使用同步请求-回复模式,因此我使用 ReplyingKafkaTemplate. 作为实现的一部分,有一个生产者正在推送关于一个主题的请求消息(input-message-topic1),但作为回报,我期待来自两个主题的响应(output-message-topic1 和 output-message-topic2),我必须进一步汇总和处理. 问题:
..
我第一次在开发基于 Kafka 的 Spring Boot 应用程序.我的要求是使用 spring 批处理创建一个包含所有记录的输出文件.我创建了一个 spring 批处理作业,其中集成了一个扩展 KafkaItemReader 的自定义类.我现在不想提交偏移量,因为我可能需要返回从已经消耗的偏移量中读取一些记录.我的消费者配置有这些属性; enable.auto.commit: 假自动偏移
..
我正在使用 kafka 消费者组管理来处理我的消息. 我的消息的处理时间各不相同.所以我将最大轮询间隔设置为 20 分钟,最大记录数为 20.并且我使用了 5 个分区和 5 个消费者实例,除了上述两个之外,还有默认配置值. 但我仍然间歇性地收到以下错误: [Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] 尝
..
我第一次在开发基于 Kafka 的 Spring Boot 应用程序.我的要求是使用 spring 批处理创建一个包含所有记录的输出文件.我创建了一个 spring 批处理作业,其中集成了一个扩展 KafkaItemReader 的自定义类.我现在不想提交偏移量,因为我可能需要返回从已经消耗的偏移量中读取一些记录.我的消费者配置有这些属性; enable.auto.commit: 假自动偏移
..
我正在尝试使用 Kafka 制作一个小型 PoC.但是,在 Java 中创建消费者时,此消费者不会收到任何消息.即使当我使用相同的 url/topic 启动 kafka-console-consumer.sh 时,我也会收到消息.有谁知道我可能做错了什么?此代码由 GET API 调用. public List接收消息(){log.info("从 kafka 中检索消息");val props
..
我正在尝试从我制作的一个简单的生产者中读取数据.出于某种原因,每当我运行消费者时,它都看不到/生成我生成的任何数据.任何人都可以就下一步做什么给我任何指导吗? 我在下面包含了我的生产者和消费者的代码: 制作人: 公共类AvroProducer {公共静态无效主(字符串 [] args){String bootstrapServers = "localhost:9092";String
..
我正在使用 Influxdb 2.0 版,并希望将 kafka 消费者遗留插件与其连接.我想知道我应该如何配置kafka consumer legacy的这个配置文件以及将它放在服务器/本地机器上的哪个路径? InfluxDB Sink Connector for Confluent Platform 不支持此版本从以下链接得知: Confluent 的 InfluxDB Sink C
..
我想从socket中获取数据并放到kafka topic中,这样我的flink程序就可以从topic中读取数据并进行处理.我可以在一个节点上做到这一点.但是我想要一个 kafka 集群,它至少有三个不同的节点(不同的 IP 地址)并从套接字轮询数据以在节点之间分配它.我不知道如何做到这一点并更改此代码.我的简单程序如下: 公共类 WordCount {public static void mai
..
我有一个要求,我必须对 Kafka 使用同步请求-回复模式,因此我使用 ReplyingKafkaTemplate. 作为实现的一部分,有一个生产者正在推送关于一个主题的请求消息(input-message-topic1),但作为回报,我期待来自两个主题的响应(output-message-topic1 和 output-message-topic2),我必须进一步汇总和处理. 问题:
..
我正在寻找一种使用 kafka 向我的 docker 显示我的 API (localhost) 的方法. 我的制作人(下图)工作得很有魅力.我知道是因为当我打印 res.text 时,我有一个输出. 导入json进口请求从 kafka 导入 KafkaProducer导入时间# 获取数据res = requests.get('http://127.0.0.1:5000/twitter'
..
我正在尝试从单个生产者发布两个不同主题的消息. 这里我创建了两个主题: @Bean公共新话题 multi1() {返回 TopicBuilder.name("multi1").partitions(1).build();}@豆角,扁豆公共新话题 multi2() {返回 TopicBuilder.name("multi2").partitions(1).build();} 这是我向两个
..
我已经用这个命令手动创建了主题 test: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 并使用此命令: bin/kafka-console-producer.sh --broker-list localhost:9092 -
..
我正在尝试使用函数“mapValues"将在我的原始流“textlines"中获得的字符串值转换为 newStream.然后将我在 newStream 中获得的任何内容流式传输到名为“testoutput"的主题上.但是每次消息实际上通过转换块时,我都会收到一个 NullPointerException,错误仅指向 kafka 流库.不知道发生了什么:(( 附言当我从原始流分叉/创建新的 k
..
这是“Zookeeper 在哪里做的后续问题存储Kafka集群和相关信息?"基于 Armando Ballaci 提供的答案. 现在很明显,消费者偏移量存储在 Kafka 集群中一个名为 __consumer_offsets 的特殊主题中.没关系,我只是想知道这些偏移量的检索是如何工作的. Topics 不像 RDBS,我们可以基于某个谓词查询任意数据.例如 - 如果数据存储在 RDB
..
为了让我的消费者保持活跃(非常长的可变长度处理),我在后台线程中实现了一个空的 poll() 调用,如果我在 polls() 之间花费太多时间,它将阻止代理重新平衡.我已将轮询间隔设置得很长,但我不想一直增加它以进行越来越长的处理. 轮询无记录的正确方法是什么?目前我正在调用 poll(),然后重新寻找在 poll call() 中返回的每个分区的最早偏移量,以便主线程在处理完之前的消息后可
..
寻找设计我的 Kafka 消费者的最佳方法.基本上我想看看什么是避免数据丢失的最佳方法,以防万一处理消息期间的异常/错误. 我的用例如下. a) 我使用 SERVICE 来处理消息的原因是 - 将来我计划编写一个 ERROR PROCESSOR 应用程序,它将在一天结束时运行,它将尝试处理失败的消息(不是所有消息,但由于缺少父级等任何依赖项而失败的消息再次出现. b) 我想确保消
..
如果我创建 2 个 kafka 消费者实例 传递相同的属性 订阅同一主题 这 2 个消费者实例(在 diff group Id 中)是否具有相似的分区结构,或者可能不同? 即,如果我执行 .assignment() 我会得到相同的结果 我的实际问题陈述,我将在其中使用此验证 在我的应用程序中,我在特定状态下获得了代理的偏移量(这是通过我的第一个 kafka 消费者
..
我在 Rest 控制器中使用 ReplyingKafkaTemplate 来返回同步响应.我也在设置标题 REPLY_TOPIC.对于监听器微服务部分, @KafkaListener(topics = "${kafka.topic.request-topic}")@发给公共模型监听(模型 请求)抛出 InterruptedException {SumModel 模型 = request.getR
..
每当我的消费者从 Kafka 请求一个新批次时,它总是请求 1MB 的数据,然后它似乎请求下一个 1MB,依此类推.有没有人知道接收20MB的批次需要什么配置和编程步骤? 解决方案 您可以将消费者属性中的属性 max.partition.fetch.bytes 设置为您想要的值(默认为 1MB). 此外,此值必须等于或大于代理配置中的 max.message.size 属性,以确保您
..