apache-kafka相关内容

如何在春靴中按顺序消费卡夫卡话题

我有一个问题,我让一个阿帕奇Kafka消费者在Spring Boot中消费了3个不同的主题。但我需要先使用第一个主题中的所有数据,然后再使用以下主题中的数据,有什么方法可以做到这一点吗?或者你会一直以同样的方式阅读它们吗? @Component public class KafkaTestListener { @KafkaListener(topics = "${message.topic ..
发布时间:2022-05-06 19:10:38 其他开发

春季的卡夫卡消费者我可以通过编程重新分配分区吗?

我刚接触Kafka,并且使用@KafkaListener(Spring)来定义Kafka消费者。 我想检查是否可以在运行时手动将分区分配给使用者。 例如,当应用程序启动时,我不想使用任何数据。我目前正在使用@KafkaListener(autoStartup=false ... )用于该目的。 在某个时刻,我应该(从应用程序的另一部分)收到包含要处理的分区ID的通知,因此我希望跳过该分区的 ..
发布时间:2022-05-06 18:43:20 Java开发

Kafka消费者ClassNotFoundException异常

(在开始提问之前,我的英语可能不足以清楚地描述所有的情况。如果您不明白,请告诉我。) 我正在尝试通过Kafka将数据对象从A Spring项目(生产者)发送到B Spring项目(消费者)。 问题是A和B中的数据对象具有不同的类路径。因此,B项目数据类无法映射A项目的字段。 但两个对象具有相同的字段。因此,我希望从A项目获取Object作为B项目的参数。 错误消息 Lis ..
发布时间:2022-05-06 17:57:43 Java开发

每个Http请求有多个Kafka生产者实例

我有一个REST终结点,可以由多个用户同时调用。该REST终结点调用事务性Kafka生成器。 我的理解是,如果我们使用Transaction,我不能同时使用同一个Kafka Producer实例。 如何高效地为每个HTTP请求创建新的Kafka Producer实例? //Kafka Transaction enabled producerProps.put(ProducerConfi ..

寻找春天的时间戳-卡夫卡

我正在尝试寻找时间戳功能,但由于某些原因,它不适合我。 在我的Producer中,我有下一个代码: ProducerRecord producer = new ProducerRecord("topic", 0, System.currentTimeMillis() - 10000, "key", obj); kafkaTemplate.send(prod ..
发布时间:2022-05-06 16:03:44 Java开发

卡夫卡客户(消费者/生产商)在崩溃后恢复

在我工作的公司,我们在没有身份验证的情况下使用Spring for Kafka,最近我们做了一些实验来设置Kafka中的安全性,并且我们在短时间内启用了身份验证,这导致了我们微服务中所有消费者/生产者的崩溃!(微服务保持正常) 异常: Authorization Exception and no authorizationExceptionRetryInterval set org. ..
发布时间:2022-05-06 13:42:41 其他开发

如何在团队之间共享Avro架构定义

Kafka模式注册中心提供了一种很好的方式来使用公共数据契约序列化和反序列化Kafka中的数据。但是,数据约定(.avsc文件)是生产者和消费者之间的粘合剂。 一旦生产者生成了.avsc文件,就可以将其签入到生产者端的版本控制中。根据语言的不同,它还会自动生成类。 但是, 使用者获取架构定义以供参考的最佳机制是什么?有没有类似swaggerHub或Avro的典型API文档门户? 如 ..

Spring Cloud Kafka Streams中的错误处理

我使用的是Spring Cloud Stream和Kafka Stream。假设我有一个处理器,它是一个将字符串的KStream转换为CityProgrammes的KStream的函数。它调用一个API来按名称查找City,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是转换过程中发生任何错误,整个应用程序都会停止。我想把这一条特别的信息发送给DLQ,然后继续前进。我已经读了几天了 ..

如何避免使用Kafka流丢失消息

我们有一个Streams应用程序,它使用源主题中的消息,执行一些处理并将结果转发到目标主题。 消息的结构由某些Avro架构控制。 当开始使用消息时,如果架构尚未缓存,应用程序将尝试从架构注册表中检索它。如果由于任何原因架构注册表不可用(例如网络故障),则当前正在处理的消息将丢失,因为默认处理程序是名为LogAndContinueExceptionHandler的处理程序。 o.a.k ..
发布时间:2022-04-18 17:03:15 Java开发

无聚合滑动窗口操作在卡夫卡中的应用

我们可以在不聚合的情况下应用Kafka窗口操作吗?我需要过去10分钟内的所有数据(而不是计数)?我所看到的是,大多数示例使用了窗口滑动的聚合。 推荐答案 您可以使用KafkaConsumer#offsetsForTimes方法获取分区到偏移量映射中10分钟前的偏移量。 使用该信息,循环遍历映射并seek您的使用者到这些分区偏移量。 然后开始轮询,直到记录时间戳超出您请求的时 ..
发布时间:2022-04-18 17:00:24 其他开发

Kafka Streams API中的任务有什么用途

我正在尝试了解Kafka Streams API的体系结构,在documentation: 中遇到了这一点 应用程序的处理器拓扑通过将其分解为多个任务来进行扩展 将处理器拓扑分解为任务的所有标准是什么?只是流/主题中的分区数还是更多。 然后任务可以根据分配的分区实例化它们自己的处理器拓扑 有没有人能举个例子解释一下上面的意思?如果创建任务的目的只是为了扩展,它们不应该都具有相 ..
发布时间:2022-04-18 16:53:42 其他开发