kafka-consumer-api相关内容

Spring Kafka Consumer 将消息作为 LinkedHashMap 消费,因此自动将 BigDecimal 转换为 double

我正在使用基于注解的 spring kafka 监听器来消费 kafka 消息,代码如下 使用员工对象 Class Employee{私人字符串名称;私有字符串地址;私人对象帐户;//获取者//二传手} Account 对象在运行时决定它是 Saving Account 还是 Current Account 等. Class SavingAcc{私人 BigDecimal 余额 ..

配置 ReplyingKafkaTemplate 以获取来自多个主题的响应

我有一个要求,我必须对 Kafka 使用同步请求-回复模式,因此我使用 ReplyingKafkaTemplate. 作为实现的一部分,有一个生产者正在推送关于一个主题的请求消息(input-message-topic1),但作为回报,我期待来自两个主题的响应(output-message-topic1 和 output-message-topic2),我必须进一步汇总和处理. 问题: ..
发布时间:2021-11-12 03:13:45 其他开发

如果在读取消息过程中发生任何异常,如何管理弹簧批处理作业中使用的 KafkaItemReader 中的偏移量

我第一次在开发基于 Kafka 的 Spring Boot 应用程序.我的要求是使用 spring 批处理创建一个包含所有记录的输出文件.我创建了一个 spring 批处理作业,其中集成了一个扩展 KafkaItemReader 的自定义类.我现在不想提交偏移量,因为我可能需要返回从已经消耗的偏移量中读取一些记录.我的消费者配置有这些属性; enable.auto.commit: 假自动偏移 ..
发布时间:2021-11-12 03:13:42 其他开发

Kafka 消费者组再平衡

我正在使用 kafka 消费者组管理来处理我的消息. 我的消息的处理时间各不相同.所以我将最大轮询间隔设置为 20 分钟,最大记录数为 20.并且我使用了 5 个分区和 5 个消费者实例,除了上述两个之外,还有默认配置值. 但我仍然间歇性地收到以下错误: [Consumer clientId=consumer-3, groupId=amc_dashboard_analytics] 尝 ..
发布时间:2021-11-12 03:13:01 其他开发

如果在读取消息过程中发生任何异常,如何管理弹簧批处理作业中使用的 KafkaItemReader 中的偏移量

我第一次在开发基于 Kafka 的 Spring Boot 应用程序.我的要求是使用 spring 批处理创建一个包含所有记录的输出文件.我创建了一个 spring 批处理作业,其中集成了一个扩展 KafkaItemReader 的自定义类.我现在不想提交偏移量,因为我可能需要返回从已经消耗的偏移量中读取一些记录.我的消费者配置有这些属性; enable.auto.commit: 假自动偏移 ..
发布时间:2021-11-12 03:12:58 其他开发

卡夫卡消费者没有返回任何记录

我正在尝试使用 Kafka 制作一个小型 PoC.但是,在 Java 中创建消费者时,此消费者不会收到任何消息.即使当我使用相同的 url/topic 启动 kafka-console-consumer.sh 时,我也会收到消息.有谁知道我可能做错了什么?此代码由 GET API 调用. public List接收消息(){log.info("从 kafka 中检索消息");val props ..
发布时间:2021-11-12 03:11:39 Java开发

为什么我的 Java 使用者不会读取我创建的数据?

我正在尝试从我制作的一个简单的生产者中读取数据.出于某种原因,每当我运行消费者时,它都看不到/生成我生成的任何数据.任何人都可以就下一步做什么给我任何指导吗? 我在下面包含了我的生产者和消费者的代码: 制作人: 公共类AvroProducer {公共静态无效主(字符串 [] args){String bootstrapServers = "localhost:9092";String ..
发布时间:2021-11-12 03:11:36 Java开发

带有 kafka 的 InfluxDB 2.0

我正在使用 Influxdb 2.0 版,并希望将 kafka 消费者遗留插件与其连接.我想知道我应该如何配置kafka consumer legacy的这个配置文件以及将它放在服务器/本地机器上的哪个路径? InfluxDB Sink Connector for Confluent Platform 不支持此版本从以下链接得知: Confluent 的 InfluxDB Sink C ..
发布时间:2021-11-12 03:11:25 其他开发

在kafka集群节点间分配数据socket

我想从socket中获取数据并放到kafka topic中,这样我的flink程序就可以从topic中读取数据并进行处理.我可以在一个节点上做到这一点.但是我想要一个 kafka 集群,它至少有三个不同的节点(不同的 IP 地址)并从套接字轮询数据以在节点之间分配它.我不知道如何做到这一点并更改此代码.我的简单程序如下: 公共类 WordCount {public static void mai ..

配置 ReplyingKafkaTemplate 以获取来自多个主题的响应

我有一个要求,我必须对 Kafka 使用同步请求-回复模式,因此我使用 ReplyingKafkaTemplate. 作为实现的一部分,有一个生产者正在推送关于一个主题的请求消息(input-message-topic1),但作为回报,我期待来自两个主题的响应(output-message-topic1 和 output-message-topic2),我必须进一步汇总和处理. 问题: ..
发布时间:2021-11-12 03:10:22 其他开发

没有来自 Kafka Consumer Python 的数据 - 消费者一直在听,但什么也没有出来

我正在寻找一种使用 kafka 向我的 docker 显示我的 API (localhost) 的方法. 我的制作人(下图)工作得很有魅力.我知道是因为当我打印 res.text 时,我有一个输出. 导入json进口请求从 kafka 导入 KafkaProducer导入时间# 获取数据res = requests.get('http://127.0.0.1:5000/twitter' ..

如何将消息从单个生产者发布到两个主题并在 Kafka 中的单个侦听器中消费

我正在尝试从单个生产者发布两个不同主题的消息. 这里我创建了两个主题: @Bean公共新话题 multi1() {返回 TopicBuilder.name("multi1").partitions(1).build();}@豆角,扁豆公共新话题 multi2() {返回 TopicBuilder.name("multi2").partitions(1).build();} 这是我向两个 ..

如何转换/分叉 Kafka 流并将其发送到特定主题?

我正在尝试使用函数“mapValues"将在我的原始流“textlines"中获得的字符串值转换为 newStream.然后将我在 newStream 中获得的任何内容流式传输到名为“testoutput"的主题上.但是每次消息实际上通过转换块时,我都会收到一个 NullPointerException,错误仅指向 kafka 流库.不知道发生了什么:(( 附言当我从原始流分叉/创建新的 k ..

Zookeeper 如何从 __consumer_offsets 主题中检索消费者偏移量?

这是“Zookeeper 在哪里做的后续问题存储Kafka集群和相关信息?"基于 Armando Ballaci 提供的答案. 现在很明显,消费者偏移量存储在 Kafka 集群中一个名为 __consumer_offsets 的特殊主题中.没关系,我只是想知道这些偏移量的检索是如何工作的. Topics 不像 RDBS,我们可以基于某个谓词查询任意数据.例如 - 如果数据存储在 RDB ..

Kafka 轮询无记录的正确方法

为了让我的消费者保持活跃(非常长的可变长度处理),我在后台线程中实现了一个空的 poll() 调用,如果我在 polls() 之间花费太多时间,它将阻止代理重新平衡.我已将轮询间隔设置得很长,但我不想一直增加它以进行越来越长的处理. 轮询无记录的正确方法是什么?目前我正在调用 poll(),然后重新寻找在 poll call() 中返回的每个分区的最早偏移量,以便主线程在处理完之前的消息后可 ..
发布时间:2021-11-12 03:08:33 其他开发

在处理来自 Kafka 的消息时避免数据丢失

寻找设计我的 Kafka 消费者的最佳方法.基本上我想看看什么是避免数据丢失的最佳方法,以防万一处理消息期间的异常/错误. 我的用例如下. a) 我使用 SERVICE 来处理消息的原因是 - 将来我计划编写一个 ERROR PROCESSOR 应用程序,它将在一天结束时运行,它将尝试处理失败的消息(不是所有消息,但由于缺少父级等任何依赖项而失败的消息再次出现. b) 我想确保消 ..
发布时间:2021-11-12 03:08:30 其他开发

同一主题的 2 个 kafka 消费者的分区结构

如果我创建 2 个 kafka 消费者实例 传递相同的属性 订阅同一主题 这 2 个消费者实例(在 diff group Id 中)是否具有相似的分区结构,或者可能不同? 即,如果我执行 .assignment() 我会得到相同的结果 我的实际问题陈述,我将在其中使用此验证 在我的应用程序中,我在特定状态下获得了代理的偏移量(这是通过我的第一个 kafka 消费者 ..
发布时间:2021-11-12 03:08:27 Java开发

如何让消费者从 Kafka 请求超过 1MB 的记录

每当我的消费者从 Kafka 请求一个新批次时,它总是请求 1MB 的数据,然后它似乎请求下一个 1MB,依此类推.有没有人知道接收20MB的批次需要什么配置和编程步骤? 解决方案 您可以将消费者属性中的属性 max.partition.fetch.bytes 设置为您想要的值(默认为 1MB). 此外,此值必须等于或大于代理配置中的 max.message.size 属性,以确保您 ..
发布时间:2021-11-12 03:07:12 其他开发