kafka-producer-api相关内容

我们可以在春靴中使用多个卡夫卡模板吗?

在我的Spring Boot Kafka发布应用程序中,我希望提供对以字符串(Json)或字节格式发布消息的支持,因为我希望同时支持json和avro。但是春装中的卡夫卡模板让我们只能定义其中的一个模板。有没有办法同时使用两个模板或任何其他方式来同时支持JSON和Avro? KafkaTemplate只适用于字符串,但我也想发布Avro,它应该类似于Kafka ..
发布时间:2022-07-18 17:13:45 其他开发

阿帕奇-卡夫卡有1亿个主题

我正在尝试用ApacheKafka替换兔子MQ,在规划时,我遇到了几个概念性规划问题。 首先,我们对每用户队列策略使用Rabb MQ,这意味着每个用户使用一个队列。这符合我们的需要,因为每个用户代表要与该特定用户一起完成的一些工作,并且如果该用户导致问题,则队列对于其他用户永远不会有问题,因为队列是分开的(问题意味着队列中的消息将使用http请求被分派给用户。如果用户拒绝接收消息(服务器可能会关 ..
发布时间:2022-07-16 19:53:48 其他开发

每个Http请求有多个Kafka生产者实例

我有一个REST终结点,可以由多个用户同时调用。该REST终结点调用事务性Kafka生成器。 我的理解是,如果我们使用Transaction,我不能同时使用同一个Kafka Producer实例。 如何高效地为每个HTTP请求创建新的Kafka Producer实例? //Kafka Transaction enabled producerProps.put(ProducerConfi ..

无法从Windows生成运行在WSL 2上的卡夫卡主题

我在Ubuntu WSL2上成功运行了最新的Kafka。我可以在我在WSL上运行的Ubuntu中启动ZooKeeper、Kafka服务器、创建主题、控制台生产和控制台消费。但是,当我进入Windows上的IntelliJ并创建一个简单的Java生成器时,它似乎无法连接到代理 版本和主机名 Java version: 1.8 Kafka Version: 2.6 ..

为什么骆驼卡夫卡生产者很慢?

我正在使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生产者需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送单个消息需要 100 毫秒. 安装简述3 kafka 集群 16Core 32GB RAM 示例代码 String endpoint="kafka:test?topic=test&brokers=nodekfa ..
发布时间:2022-01-19 08:52:11 其他开发

Kafka最优保留和删除策略

我对 kafka 相当陌生,所以如果这个问题微不足道,请原谅我.为了计时测试,我有一个非常简单的设置,如下所示: 机器 A -> 写入主题 1 (Broker) -> 机器 B 从主题 1 读取机器 B -> 将刚刚读取的消息写入主题 2 (Broker) -> 机器 A 从主题 2 中读取 现在我在无限循环中发送大约 1400 字节的消息,很快填满了我的小代理上的空间.我正在尝试为 ..

消费者和生产者因错误而失败:“在读取响应之前断开了与 0 的连接"

我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天几次)消费者和生产者在得到响应时卡住了,即使 Kafka 在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由于重新 ..

使用 __consumer_offsets 杀死节点导致消费者没有消息消费

我有 3 个节点(nodes0、node1、node2)Kafka 集群(broker0、broker1、broker2),复制因子为 2,Zookeeper(使用与 Kafka tar 一起打包的 zookeeper)在不同的节点(节点 4)上运行. 我在启动 zookper 和剩余节点后启动了代理 0.在 broker 0 日志中可以看到它正在读取 __consumer_offsets ..

Kafka:如何连接 kafka-console-consumer 以获取远程代理主题内容?

我在 ec2 上的一台机器上设置了一个 kafka zookeeper 和 3 个代理,端口为 9092..9094,我正在尝试使用另一台机器上的主题内容.端口 2181 (zk)、9092、9093 和 9094(服务器)对消费者机器开放.我什至可以做一个 bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remo ..

如果我在 Kafka 中有事务性生产者,我可以使用 Kafka Streams 读取一次消息吗?

我想拥有 Exactly-once 语义,但我不想使用 Consumer 读取消息.我宁愿使用 Kafka Streams AP 阅读消息.如果我将 processing.guarantee=exactly_once 添加到 Stream 配置中,是否会保留恰好一次语义? 解决方案 Exactly-once 处理基于读-处理-写模式.Kafka Streams 使用这种模式,因此,如果您编 ..

KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量

我正在尝试在 Kafka Processor 中实现一个事务,以确保我不会两次重新处理相同的消息.给定一条消息 (A),我需要创建一个将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A).从文档中我发现了 Producer 方法 sendOffsetsToTransaction 似乎只有在成功时才能在事务中提交偏移量.这是我的 Processor 的 process() ..

Kafka Stream 在重新平衡时重新处理旧消息

我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在 ..

我可以在与 Kafka Broker 相同的机器上运行 Kafka Streams 应用程序吗?

我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它 ..

Kafka Streams:如何更改记录时间戳(0.11.0)?

我正在使用 FluentD(v.12 最后一个稳定版本)向 Kafka 发送消息.但是 FluentD 使用的是旧的 KafkaProducer,因此记录时间戳始终设置为 -1.因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 的时间点. 我真正感兴趣的时间戳是由 fluentd 在消息中发送的: "timestamp" ..

Kafka Streams 在生成主题时不会将偏移量增加 1

我已经实现了一个简单的 Kafka 死信记录处理器. 当使用控制台生产者产生的记录时,它工作得很好. 但是,我发现我们的 Kafka Streams 应用程序并不能保证向接收器主题生成记录,即每生成一条记录,偏移量就会增加 1. 死信处理器背景: 我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录.当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续 ..

Kafka多分区排序

我知道无法在 Kafka 中对多个分区进行排序,并且分区排序仅对组内的单个使用者(对于单个分区)有保证.但是,使用 Kafka Streams 0.10 现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者方面,可以说使用 Kafka Streams 0.10 现在有可能吗?假设我们收到所有消息,我们是否可以不根据消费的时间戳对所有分区进行排序,然后将它 ..