spring-kafka相关内容
我想从一条消息(来自同一主题)中反序列化不同的对象,并根据对象类型将其保存/更新到数据库中的相应表.正如加里提到的这里我可以使用标题来提供我的对象必须反序列化为哪种类型的信息.您能否提供有关如何实现这一目标的示例? 解决方案 参见 sample-02
..
在我的 spring kafka 应用程序中,我想根据某个调度程序的输入在运行时触发消费者.调度程序将告诉侦听器它可以从哪个主题开始消费消息.有带有自定义 ConcurrentKafkaListenerContainerFactory 类的 springboot 应用程序.我需要执行三项任务: 关闭容器,在成功读取主题上的所有可用消息后. 它将在数据库或文件系统中存储当前偏移量. 下次消
..
我只是好奇 Spring Kafka 中的批处理侦听器模式是否比非批处理侦听器模式提供更好的性能?如果我们正在处理异常,那么我们仍然需要在 Batch-listener 模式下处理每条记录.非批处理似乎不易出错、稳定且可定制. 请分享您对此的看法,因为我没有找到任何好的比较. 解决方案 这完全取决于你的听众对数据做了什么. 如果它循环处理每条记录,则没有任何好处;您也可以让容器
..
我们有一个问题,即当 Kafka 代理必须离线时,没有任何消费者服务对此有任何了解并继续运行. 我们尝试在新的 Kafka 实例中列出消费者,但没有看到那里列出现有消费者.列出的所有消费者都是新创建的消费者. 每次遇到此问题时,我们都必须手动终止所有现有的消费者服务,这很不方便. 问题 - 消费者如何知道它不再列在 Kafka 集群中,因此它应该自行终止? 附言我们使用 S
..
我有一个要求,我必须对 Kafka 使用同步请求-回复模式,因此我使用 ReplyingKafkaTemplate. 作为实现的一部分,有一个生产者正在推送关于一个主题的请求消息(input-message-topic1),但作为回报,我期待来自两个主题的响应(output-message-topic1 和 output-message-topic2),我必须进一步汇总和处理. 问题:
..
我正在尝试从单个生产者发布两个不同主题的消息. 这里我创建了两个主题: @Bean公共新话题 multi1() {返回 TopicBuilder.name("multi1").partitions(1).build();}@豆角,扁豆公共新话题 multi2() {返回 TopicBuilder.name("multi2").partitions(1).build();} 这是我向两个
..
在我的 spirng 启动项目中,我有一个模型类 (FunctionModel): public class FunctionModel {受保护的整数 ID;受保护的字符串名称;@ApiModelProperty(value = "函数标识符")公共整数 getId() {返回标识;}公共无效setId(整数ID){this.id = id;}@ApiModelProperty(value =
..
我有一个 springboot 应用程序,它可以监听 kafka 消息并将它们转换为对象 @KafkaListener(topics = "test", groupId = "group_id")公共无效消费(字符串消息)抛出 IOException {ObjectMapper objectMapper = new ObjectMapper();Hostel hostel = objectMap
..
我在 Rest 控制器中使用 ReplyingKafkaTemplate 来返回同步响应.我也在设置标题 REPLY_TOPIC.对于监听器微服务部分, @KafkaListener(topics = "${kafka.topic.request-topic}")@发给公共模型监听(模型 请求)抛出 InterruptedException {SumModel 模型 = request.getR
..
我正在尝试从单个分区主题安排我的消费过程.我可以使用 endpointlistenerregistry.start() 启动它,但我想在消耗了当前分区中的所有消息后停止它,即当我到达当前分区中的最后一个偏移量时.在我完成消费并关闭它之后,生产进入主题.我应该如何确保我已经阅读了所有消息,直到我启动调度程序并停止我的消费者?我正在为消费者使用 @Kafkalistener. 解决方案 设置
..
我正在 Spring Kafka 中的 Listener 上进行反序列化.但这假设类型信息是由 Spring Kafka 生产者包含或发送的.在我的例子中,Json 是由 Debezium MySQLConnector 发送的,它没有添加这个元数据.所以我想把它添加到请求中.我了解它放置在 JsonSerializer 中某个位置的请求中,并且我查看了源代码,但无法弄清楚如何在序列化期间将元数据类
..
我正在使用 websocket 协议和 STOMP 作为消息协议为网络平台开发聊天模块. 这是我第一次使用任何消息代理,而 Kafka 是在公司(我正在为其工作的)网络平台上使用的,我猜是用于其他模块.之前我刚开始使用 RabbitMQ,现在我必须切换到 Kafka.我在RabbitMQ的网站上看到一整篇关于如何使用STOMP的文章,但是在Kafka的官网上却没有. 但我已经探索了其他
..
我有一个应用程序,它使用 Kafka 在实例之间同步数据,因此它从 Kafka 生成和使用数据,此外,该应用程序正在使用 Kafka 主题并将该数据转换并流式传输到另一个主题供客户端使用. 我的应用程序有两个用于故障转移的集群.通过 Kafka 文档,我发现了这个https://docs.spring.io/spring-kafka/docs/current/reference/html/#
..
我正在尝试构建一个简单的 Spring Boot Kafka Consumer 来使用来自 kafka 主题的消息,但是没有消息被使用,因为 KafkaListener 方法没有被触发. 我在其他答案中看到,以确保 AUTO_OFFSET_RESET_CONFIG 设置为“最早",并且 GROUP_ID_CONFIG 是唯一的,我这样做了,但是仍然没有触发 KafkaListenerMeth
..
我们希望以这样的方式配置 Spring Kafka 侦听器,如果任何外部服务关闭,我们不希望丢失从 Kafka 消费的消息.我们希望将其还原,直到它被成功处理. 能否请您帮助我进行配置以实现相同的目的. 批量消费消息如何处理. 我们使用的是 Kafka 0.9 解决方案 我认为 重试最符合您的要求: 为了重试传递,提供了方便的侦听器适配器 - RetryingMes
..
我正在使用 spring-kafka 2.3.6.RELEASE 库来消费许多 kafka 主题.在界面允许的情况下,我正在使用单个侦听器来使用所有主题.我已经注意到,如果单个主题无法授权,KafkaConsumer 实例将挂起其线程,停止所有主题的消费,直到重试间隔结束. 这对我来说似乎很脆弱,特别是对于专注于弹性和并行性的技术. 有没有其他人观察到这种行为,如果有,除了编写我们自己
..
我正在尝试从单个分区主题安排我的消费过程.我可以使用 endpointlistenerregistry.start() 启动它,但我想在消耗了当前分区中的所有消息后停止它,即当我到达当前分区中的最后一个偏移量时.在我完成消费并关闭它之后,生产进入主题.我应该如何确保我已经阅读了所有消息,直到我启动调度程序并停止我的消费者?我正在为消费者使用 @Kafkalistener. 解决方案 设置
..
我有一个具有函数式编程风格的 Spring Cloud Kafka Streams 的工作设置.有两个用例,通过 application.properties 进行配置.它们都单独工作,但是一旦我同时激活它们,我就会收到第二个用例的输出流的序列化错误: 线程异常“ActivitiesAppId-05296224-5ea1-412a-aee4-1165870b5c75-StreamThread-1
..
正如我所见,Kafka 模板在内部使用了 Kafka 生产者.我只想知道确切的区别是什么.此外,与 Kafka 生产者相比,我发现 Kafka 模板中有许多可用的 send() 方法. 请帮我解决.如果有人知道更多. 解决方案 生产者是模式,而 KafkaTemplate 包装了一个生产者实例,并提供了向 Kafka 主题发送消息的便捷方法.(来源) Kafka Producer
..
我见过一些例子,我们有一个 java 配置类,我们定义了多个 KafkaListenerContainer 并将所需的 containerType 传递给 @kafkaListener.但我正在探索是否有任何方法可以通过 appication.yml/properties 使用 Spring Boot 自动 Kafka 配置来实现相同的目标. 解决方案 否;Boot 只会自动配置一组基础设
..