spring-kafka相关内容
我正在尝试生成一些消息到Kafka主题,我想自定义Jackson对象映射器以将我的LocalDateTime序列化为这样的字符串2021-07-08T16:43:02Z 但这两个都不 @Configuration public class WebConfiguration { ... @Bean public ObjectMapper objectMappe
..
我正在尝试用Spring Kafka实现一个非阻塞的Retries。根据文档here,在完成在@KafkaListnener中设置的所有尝试之后,我们可以设置一个处理程序方法来处理来自DLT主题的消息。我打算捕获DLT处理程序方法上的一些标头,如以下代码所示: @DltHandler fun processaDlt( @Payload mensagem: String
..
(在开始提问之前,我的英语可能不足以清楚地描述所有的情况。如果您不明白,请告诉我。) 我正在尝试通过Kafka将数据对象从A Spring项目(生产者)发送到B Spring项目(消费者)。 问题是A和B中的数据对象具有不同的类路径。因此,B项目数据类无法映射A项目的字段。 但两个对象具有相同的字段。因此,我希望从A项目获取Object作为B项目的参数。 错误消息 Lis
..
我有一个模板,如下所示: @Autowired private ReplyingKafkaTemplate xxx2ReplyingKafkaTemplate; 我的发送包装方法如下所示: public RequestReplyFuture sen
..
我正在使用Spring-Kafka消费来自Confluent Kafka的消息,我正在使用RetryTopicConfiguration Bean来配置主题和回退策略。我的应用程序运行正常,但我在日志中看到许多类似下面的警告日志,我想知道我的配置是否不正确。 DeadLetterPublishingRecovererFactory$1 : Destination resolver retur
..
我正在尝试为我的卡夫卡消费者写一份健康检查。当应用程序启动并运行时,我关闭了Kafka,我看到了很多 Connection to node 1001 (/127.0.0.1:9092) could not be established. Broker may not be available. 在日志中,但没有ApplicationEvent、调用了错误处理程序,没有任何内容。似乎无法
..
我想看看能否通过Docker容器中的docker-compose连接Spring Cloud Stream Kafka,但是我被卡住了,还没有找到解决方案,请帮帮我。 我正在从Spring Microservices In Action开始工作;我现在找不到任何帮助。 Docker-与卡夫卡和ZooKeeper作曲: version: '2' services: zookee
..
我有一个REST终结点,可以由多个用户同时调用。该REST终结点调用事务性Kafka生成器。 我的理解是,如果我们使用Transaction,我不能同时使用同一个Kafka Producer实例。 如何高效地为每个HTTP请求创建新的Kafka Producer实例? //Kafka Transaction enabled producerProps.put(ProducerConfi
..
背景:我们一直在仅生产者事务中获取ProducerFencedException,并希望为我们的前缀引入唯一性以防止此问题。 在此讨论中,Gary提到,在读-处理-写的情况下,前缀在所有实例中以及在每次重新启动后都必须相同。 How to choose Kafka transaction id for several applications, hosted in Kubernetes?
..
我想知道kafkaTemplate.end(Theme,Key,Message)方法会不会调用提供的自定义分区分区()方法? 推荐答案 好的,KafkaTemplate完全基于ApacheKafka客户端Producer。最终代码如下所示: producer.send(producerRecord, buildCallback(producerRecord, producer,
..
我正在使用spring-kafka并使用Kafka主题中的批量记录,并提交偏移量AbstractMessageListenerContainer.AckMode.BATCH。 在我的例子中,处理批处理记录需要时间(大约20秒),而使用者线程等待批处理过程完成,然后再次执行轮询(在这次轮询时提交偏移量)。在本例中,我将List记录分配给一个线程(名称:ProcessThread),该线程将处理所有
..
在一个Spring Boot应用程序中,我使用一个用@KafkaListener注释的类作为消息监听器。我想向我的应用程序中添加一个Consumer RebalanceLister,以便在重新平衡时管理缓存数据。 如何将Consumer RebalanceListener添加到ConCurentKafkaListenerContainerFactory。documentation表示应该在C
..
我有两个集群设置:站点1的集群1(3个代理)和站点2的集群2(3个代理)。 使用Spring Kafka(1.3.6)Consumer(一台机器),并通过@KafkaListener注释监听消息。 如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory并同时侦听这两个集群的数据。 我的侦听器应该同时使用来自两个群集的消息。 推荐答案 卡夫
..
我正在尝试寻找时间戳功能,但由于某些原因,它不适合我。 在我的Producer中,我有下一个代码: ProducerRecord producer = new ProducerRecord("topic", 0, System.currentTimeMillis() - 10000, "key", obj); kafkaTemplate.send(prod
..
我正在使用Springboot2.3.5.RELEASE和SpringKafka 2.6.3。我正在尝试做一个简单的卡夫卡生产者重试POC,这应该会导致生产者重试时,经纪人关闭或如果有一个异常抛出之前,消息被发送到经纪人。 以下生产者配置用于启用重试的幂等生产者。 // Producer configuration @Bean public Map prod
..
所以我正在将一个微服务从Spring框架迁移到Spring Boot 2.3.10.RELEASE。但当我尝试创建集成测试并启动它时,出现了一个隐晦的错误: Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.kafka.core
..
我正在使用Spring-Kafka和Boot进行一个项目,我想在应用程序中获取spring.kafka.Consumer er.Client-ID属性的主机名,以便在服务器端日志中区分我的每个消费者,如果出现问题。 我有办法做到这一点吗?我查看了Spring Boot参考指南和java.lang.System类,但没有找到有用的指针。 推荐答案 有一种方法--将Environme
..
我正在尝试Reactive-Kafka消费消息。其他一切都很好,但我想为失败的消息添加一个重试(2)。默认情况下,Spring-Kafka已经重试失败记录3次,我想使用Reactive-Kafka实现相同的记录。 我正在使用Spring-Kafka作为Active-Kafka的包装器。以下是我的消费者模板: reactiveKafkaConsumerTemplate
..
我正在使用Kafka_2.11-2.1.1 以及使用Spring2.1.0.RELEASE的制片人。 我在向Kafka主题发送消息时使用了Spring,我的制作人生成了很多TimeoutExceptions org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND-
..
我只是想检查这是已知行为还是我做错了什么。 I使用JsonDeserializer使用自定义类型映射配置生产者和消费者。 消费者失败, org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition ticket-1 at offset 1. If
..