apache-kafka相关内容
我有一个问题,我让一个阿帕奇Kafka消费者在Spring Boot中消费了3个不同的主题。但我需要先使用第一个主题中的所有数据,然后再使用以下主题中的数据,有什么方法可以做到这一点吗?或者你会一直以同样的方式阅读它们吗? @Component public class KafkaTestListener { @KafkaListener(topics = "${message.topic
..
我刚接触Kafka,并且使用@KafkaListener(Spring)来定义Kafka消费者。 我想检查是否可以在运行时手动将分区分配给使用者。 例如,当应用程序启动时,我不想使用任何数据。我目前正在使用@KafkaListener(autoStartup=false ... )用于该目的。 在某个时刻,我应该(从应用程序的另一部分)收到包含要处理的分区ID的通知,因此我希望跳过该分区的
..
我正在使用Spring Kafka实现同步请求回复模式。 堆栈: org.springframework.cloud:spring-cloud-dependencies:2020.0.2 org.springfrawork.kafka:Spring-kafka io.confluent:kafka-avro-serializer:6.2.0 Java 11 我有一个包
..
(在开始提问之前,我的英语可能不足以清楚地描述所有的情况。如果您不明白,请告诉我。) 我正在尝试通过Kafka将数据对象从A Spring项目(生产者)发送到B Spring项目(消费者)。 问题是A和B中的数据对象具有不同的类路径。因此,B项目数据类无法映射A项目的字段。 但两个对象具有相同的字段。因此,我希望从A项目获取Object作为B项目的参数。 错误消息 Lis
..
我有一个模板,如下所示: @Autowired private ReplyingKafkaTemplate xxx2ReplyingKafkaTemplate; 我的发送包装方法如下所示: public RequestReplyFuture sen
..
我想看看能否通过Docker容器中的docker-compose连接Spring Cloud Stream Kafka,但是我被卡住了,还没有找到解决方案,请帮帮我。 我正在从Spring Microservices In Action开始工作;我现在找不到任何帮助。 Docker-与卡夫卡和ZooKeeper作曲: version: '2' services: zookee
..
我有一个REST终结点,可以由多个用户同时调用。该REST终结点调用事务性Kafka生成器。 我的理解是,如果我们使用Transaction,我不能同时使用同一个Kafka Producer实例。 如何高效地为每个HTTP请求创建新的Kafka Producer实例? //Kafka Transaction enabled producerProps.put(ProducerConfi
..
我正在尝试寻找时间戳功能,但由于某些原因,它不适合我。 在我的Producer中,我有下一个代码: ProducerRecord producer = new ProducerRecord("topic", 0, System.currentTimeMillis() - 10000, "key", obj); kafkaTemplate.send(prod
..
我正在使用Kafka_2.11-2.1.1 以及使用Spring2.1.0.RELEASE的制片人。 我在向Kafka主题发送消息时使用了Spring,我的制作人生成了很多TimeoutExceptions org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND-
..
我正在使用spring-Kafka-2.1.5和spring-boot-2.0.5处理微服务 第一个服务将向Kafka生成一些消息,第二个服务将消费这些消息,而消费时我遇到了问题 Caused by: java.lang.IllegalArgumentException: The class 'com.service1.model.TopicMessage' is not in the
..
我有一个新的Spring Boot 2.6.3 Java 11应用程序,它具有Spring Kafka依赖项(使用start.spring.io生成)。 默认使用Kafka 3.0.0。我想将卡夫卡版本改为3.1.0并添加 3.1.0 到属性部分。大多数Kafka库现在在版本3.1.0中都已解决,但不是所有(最重要的是,
..
在我工作的公司,我们在没有身份验证的情况下使用Spring for Kafka,最近我们做了一些实验来设置Kafka中的安全性,并且我们在短时间内启用了身份验证,这导致了我们微服务中所有消费者/生产者的崩溃!(微服务保持正常) 异常: Authorization Exception and no authorizationExceptionRetryInterval set org.
..
我正在尝试使用Kafka-avro-控制台-Producer 5.4.0-ccs,而不自动注册模式。我尝试使用: --producer-property auto.register.schemas=false 和 --property auto.register.schemas=false 但无论如何它都在注册架构。该属性似乎正确:https://github.com/co
..
我正在尝试使用Go中的/linkedIn/goavro包将Avro编码的数据生成Kafka主题。目标是能够使用不同的客户端使用主题。 首先,我按如下方式注册架构: curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{"name":"test_topic2"
..
Kafka模式注册中心提供了一种很好的方式来使用公共数据契约序列化和反序列化Kafka中的数据。但是,数据约定(.avsc文件)是生产者和消费者之间的粘合剂。 一旦生产者生成了.avsc文件,就可以将其签入到生产者端的版本控制中。根据语言的不同,它还会自动生成类。 但是, 使用者获取架构定义以供参考的最佳机制是什么?有没有类似swaggerHub或Avro的典型API文档门户? 如
..
我使用的是Spring Cloud Stream和Kafka Stream。假设我有一个处理器,它是一个将字符串的KStream转换为CityProgrammes的KStream的函数。它调用一个API来按名称查找City,并调用另一个转换来查找该城市附近的任何事件。 现在的问题是转换过程中发生任何错误,整个应用程序都会停止。我想把这一条特别的信息发送给DLQ,然后继续前进。我已经读了几天了
..
我们有一个Streams应用程序,它使用源主题中的消息,执行一些处理并将结果转发到目标主题。 消息的结构由某些Avro架构控制。 当开始使用消息时,如果架构尚未缓存,应用程序将尝试从架构注册表中检索它。如果由于任何原因架构注册表不可用(例如网络故障),则当前正在处理的消息将丢失,因为默认处理程序是名为LogAndContinueExceptionHandler的处理程序。 o.a.k
..
我们可以在不聚合的情况下应用Kafka窗口操作吗?我需要过去10分钟内的所有数据(而不是计数)?我所看到的是,大多数示例使用了窗口滑动的聚合。 推荐答案 您可以使用KafkaConsumer#offsetsForTimes方法获取分区到偏移量映射中10分钟前的偏移量。 使用该信息,循环遍历映射并seek您的使用者到这些分区偏移量。 然后开始轮询,直到记录时间戳超出您请求的时
..
我正在寻找使用模式注册中心的kafka-stream。我有谷歌,但找不到合适的教程。 推荐答案 文档在此处 https://docs.confluent.io/current/streams/developer-guide/datatypes.html#avro 这是依赖项 io.confluent
..
我正在尝试了解Kafka Streams API的体系结构,在documentation: 中遇到了这一点 应用程序的处理器拓扑通过将其分解为多个任务来进行扩展 将处理器拓扑分解为任务的所有标准是什么?只是流/主题中的分区数还是更多。 然后任务可以根据分配的分区实例化它们自己的处理器拓扑 有没有人能举个例子解释一下上面的意思?如果创建任务的目的只是为了扩展,它们不应该都具有相
..