spring-kafka相关内容

自定义对象映射器的问题

我正在尝试生成一些消息到Kafka主题,我想自定义Jackson对象映射器以将我的LocalDateTime序列化为这样的字符串2021-07-08T16:43:02Z 但这两个都不 @Configuration public class WebConfiguration { ... @Bean public ObjectMapper objectMappe ..
发布时间:2022-05-06 18:20:01 Java开发

Spring Kafka@DltHandler注释方法在非阻塞重试实现中未正确接收标头

我正在尝试用Spring Kafka实现一个非阻塞的Retries。根据文档here,在完成在@KafkaListnener中设置的所有尝试之后,我们可以设置一个处理程序方法来处理来自DLT主题的消息。我打算捕获DLT处理程序方法上的一些标头,如以下代码所示: @DltHandler fun processaDlt( @Payload mensagem: String ..
发布时间:2022-05-06 18:08:11 其他开发

Kafka消费者ClassNotFoundException异常

(在开始提问之前,我的英语可能不足以清楚地描述所有的情况。如果您不明白,请告诉我。) 我正在尝试通过Kafka将数据对象从A Spring项目(生产者)发送到B Spring项目(消费者)。 问题是A和B中的数据对象具有不同的类路径。因此,B项目数据类无法映射A项目的字段。 但两个对象具有相同的字段。因此,我希望从A项目获取Object作为B项目的参数。 错误消息 Lis ..
发布时间:2022-05-06 17:57:43 Java开发

目标解析程序返回不存在的分区

我正在使用Spring-Kafka消费来自Confluent Kafka的消息,我正在使用RetryTopicConfiguration Bean来配置主题和回退策略。我的应用程序运行正常,但我在日志中看到许多类似下面的警告日志,我想知道我的配置是否不正确。 DeadLetterPublishingRecovererFactory$1 : Destination resolver retur ..
发布时间:2022-05-06 17:36:11 其他开发

在Spring Kafka中检测到Broker断开连接

我正在尝试为我的卡夫卡消费者写一份健康检查。当应用程序启动并运行时,我关闭了Kafka,我看到了很多 Connection to node 1001 (/127.0.0.1:9092) could not be established. Broker may not be available. 在日志中,但没有ApplicationEvent、调用了错误处理程序,没有任何内容。似乎无法 ..
发布时间:2022-05-06 17:19:09 其他开发

每个Http请求有多个Kafka生产者实例

我有一个REST终结点,可以由多个用户同时调用。该REST终结点调用事务性Kafka生成器。 我的理解是,如果我们使用Transaction,我不能同时使用同一个Kafka Producer实例。 如何高效地为每个HTTP请求创建新的Kafka Producer实例? //Kafka Transaction enabled producerProps.put(ProducerConfi ..

Producer-Only和Read-Process-Write-ProducerFencedException的TransactionID前缀

背景:我们一直在仅生产者事务中获取ProducerFencedException,并希望为我们的前缀引入唯一性以防止此问题。 在此讨论中,Gary提到,在读-处理-写的情况下,前缀在所有实例中以及在每次重新启动后都必须相同。 How to choose Kafka transaction id for several applications, hosted in Kubernetes? ..
发布时间:2022-05-06 16:52:33 其他开发

批量记录处理后如何提交卡夫卡抵销

我正在使用spring-kafka并使用Kafka主题中的批量记录,并提交偏移量AbstractMessageListenerContainer.AckMode.BATCH。 在我的例子中,处理批处理记录需要时间(大约20秒),而使用者线程等待批处理过程完成,然后再次执行轮询(在这次轮询时提交偏移量)。在本例中,我将List记录分配给一个线程(名称:ProcessThread),该线程将处理所有 ..
发布时间:2022-05-06 16:29:59 Java开发

春季卡夫卡消费者2个卡夫卡集群

我有两个集群设置:站点1的集群1(3个代理)和站点2的集群2(3个代理)。 使用Spring Kafka(1.3.6)Consumer(一台机器),并通过@KafkaListener注释监听消息。 如何为每个集群(c1和c2)实例化多个KafkaListenerContainerFactory并同时侦听这两个集群的数据。 我的侦听器应该同时使用来自两个群集的消息。 推荐答案 卡夫 ..
发布时间:2022-05-06 16:14:54 其他开发

寻找春天的时间戳-卡夫卡

我正在尝试寻找时间戳功能,但由于某些原因,它不适合我。 在我的Producer中,我有下一个代码: ProducerRecord producer = new ProducerRecord("topic", 0, System.currentTimeMillis() - 10000, "key", obj); kafkaTemplate.send(prod ..
发布时间:2022-05-06 16:03:44 Java开发

春季-卡夫卡制片人在所有经纪人停机时重试

我正在使用Springboot2.3.5.RELEASE和SpringKafka 2.6.3。我正在尝试做一个简单的卡夫卡生产者重试POC,这应该会导致生产者重试时,经纪人关闭或如果有一个异常抛出之前,消息被发送到经纪人。 以下生产者配置用于启用重试的幂等生产者。 // Producer configuration @Bean public Map prod ..
发布时间:2022-05-06 15:55:35 其他开发

Spring Boot-在应用程序.Properties中获取Spring的主机名-Kafka客户端ID

我正在使用Spring-Kafka和Boot进行一个项目,我想在应用程序中获取spring.kafka.Consumer er.Client-ID属性的主机名,以便在服务器端日志中区分我的每个消费者,如果出现问题。 我有办法做到这一点吗?我查看了Spring Boot参考指南和java.lang.System类,但没有找到有用的指针。 推荐答案 有一种方法--将Environme ..
发布时间:2022-05-06 15:39:19 其他开发

如何重试反应堆中出现故障的耗材记录-卡夫卡

我正在尝试Reactive-Kafka消费消息。其他一切都很好,但我想为失败的消息添加一个重试(2)。默认情况下,Spring-Kafka已经重试失败记录3次,我想使用Reactive-Kafka实现相同的记录。 我正在使用Spring-Kafka作为Active-Kafka的包装器。以下是我的消费者模板: reactiveKafkaConsumerTemplate ..
发布时间:2022-05-06 15:27:38 其他开发