kafka-producer-api相关内容
在我的Spring Boot Kafka发布应用程序中,我希望提供对以字符串(Json)或字节格式发布消息的支持,因为我希望同时支持json和avro。但是春装中的卡夫卡模板让我们只能定义其中的一个模板。有没有办法同时使用两个模板或任何其他方式来同时支持JSON和Avro? KafkaTemplate只适用于字符串,但我也想发布Avro,它应该类似于Kafka
..
我正在尝试用ApacheKafka替换兔子MQ,在规划时,我遇到了几个概念性规划问题。 首先,我们对每用户队列策略使用Rabb MQ,这意味着每个用户使用一个队列。这符合我们的需要,因为每个用户代表要与该特定用户一起完成的一些工作,并且如果该用户导致问题,则队列对于其他用户永远不会有问题,因为队列是分开的(问题意味着队列中的消息将使用http请求被分派给用户。如果用户拒绝接收消息(服务器可能会关
..
我正尝试在批量模式下使用具有以下属性的Kafka Connect JDBC源连接器。 connector.class=io.confluent.connect.jdbc.JdbcSourceConnector timestamp.column.name=timestamp connection.password=XXXXX validate.non.null=false tasks.max=
..
我有一个REST终结点,可以由多个用户同时调用。该REST终结点调用事务性Kafka生成器。 我的理解是,如果我们使用Transaction,我不能同时使用同一个Kafka Producer实例。 如何高效地为每个HTTP请求创建新的Kafka Producer实例? //Kafka Transaction enabled producerProps.put(ProducerConfi
..
我正在使用Kafka_2.11-2.1.1 以及使用Spring2.1.0.RELEASE的制片人。 我在向Kafka主题发送消息时使用了Spring,我的制作人生成了很多TimeoutExceptions org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND-
..
我在Ubuntu WSL2上成功运行了最新的Kafka。我可以在我在WSL上运行的Ubuntu中启动ZooKeeper、Kafka服务器、创建主题、控制台生产和控制台消费。但是,当我进入Windows上的IntelliJ并创建一个简单的Java生成器时,它似乎无法连接到代理 版本和主机名 Java version: 1.8 Kafka Version: 2.6
..
我正在使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生产者需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送单个消息需要 100 毫秒. 安装简述3 kafka 集群 16Core 32GB RAM 示例代码 String endpoint="kafka:test?topic=test&brokers=nodekfa
..
我对 kafka 相当陌生,所以如果这个问题微不足道,请原谅我.为了计时测试,我有一个非常简单的设置,如下所示: 机器 A -> 写入主题 1 (Broker) -> 机器 B 从主题 1 读取机器 B -> 将刚刚读取的消息写入主题 2 (Broker) -> 机器 A 从主题 2 中读取 现在我在无限循环中发送大约 1400 字节的消息,很快填满了我的小代理上的空间.我正在尝试为
..
我有以下 kafka 制作人 Api 程序,我是 kafka 本身的新手.下面的代码从 API 之一获取数据并将消息发送到 kafka 主题. package kafka_Demo;导入 java.util.Properties;导入 java.io.BufferedReader;导入 java.io.InputStream;导入 java.io.InputStreamReader;导入 org
..
我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天几次)消费者和生产者在得到响应时卡住了,即使 Kafka 在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由于重新
..
我有 3 个节点(nodes0、node1、node2)Kafka 集群(broker0、broker1、broker2),复制因子为 2,Zookeeper(使用与 Kafka tar 一起打包的 zookeeper)在不同的节点(节点 4)上运行. 我在启动 zookper 和剩余节点后启动了代理 0.在 broker 0 日志中可以看到它正在读取 __consumer_offsets
..
我在 ec2 上的一台机器上设置了一个 kafka zookeeper 和 3 个代理,端口为 9092..9094,我正在尝试使用另一台机器上的主题内容.端口 2181 (zk)、9092、9093 和 9094(服务器)对消费者机器开放.我什至可以做一个 bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remo
..
我想拥有 Exactly-once 语义,但我不想使用 Consumer 读取消息.我宁愿使用 Kafka Streams AP 阅读消息.如果我将 processing.guarantee=exactly_once 添加到 Stream 配置中,是否会保留恰好一次语义? 解决方案 Exactly-once 处理基于读-处理-写模式.Kafka Streams 使用这种模式,因此,如果您编
..
我正在尝试在 Kafka Processor 中实现一个事务,以确保我不会两次重新处理相同的消息.给定一条消息 (A),我需要创建一个将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A).从文档中我发现了 Producer 方法 sendOffsetsToTransaction 似乎只有在成功时才能在事务中提交偏移量.这是我的 Processor 的 process()
..
所以我正在尝试使用 Kafka 流进行交互式查询.我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上).我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka. 我已经设置了这样的主题 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 -
..
我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在
..
我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它
..
我正在使用 FluentD(v.12 最后一个稳定版本)向 Kafka 发送消息.但是 FluentD 使用的是旧的 KafkaProducer,因此记录时间戳始终设置为 -1.因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 的时间点. 我真正感兴趣的时间戳是由 fluentd 在消息中发送的: "timestamp"
..
我已经实现了一个简单的 Kafka 死信记录处理器. 当使用控制台生产者产生的记录时,它工作得很好. 但是,我发现我们的 Kafka Streams 应用程序并不能保证向接收器主题生成记录,即每生成一条记录,偏移量就会增加 1. 死信处理器背景: 我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录.当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续
..
我知道无法在 Kafka 中对多个分区进行排序,并且分区排序仅对组内的单个使用者(对于单个分区)有保证.但是,使用 Kafka Streams 0.10 现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者方面,可以说使用 Kafka Streams 0.10 现在有可能吗?假设我们收到所有消息,我们是否可以不根据消费的时间戳对所有分区进行排序,然后将它
..