kafka-producer-api相关内容

消费者和生产者因错误而失败:“在读取响应之前断开了与 0 的连接"

我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天几次)消费者和生产者在得到响应时卡住了,即使 Kafka 在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由于重新 ..
发布时间:2021-11-15 00:03:59 其他开发

使用 __consumer_offsets 杀死节点导致消费者没有消息消费

我有 3 个节点(nodes0、node1、node2)Kafka 集群(broker0、broker1、broker2),复制因子为 2,Zookeeper(使用与 Kafka tar 一起打包的 zookeeper)在不同的节点(节点 4)上运行. 我在启动 zookper 和剩余节点后启动了代理 0.在 broker 0 日志中可以看到它正在读取 __consumer_offsets ..
发布时间:2021-11-15 00:00:18 其他开发

Kafka:如何连接 kafka-console-consumer 以获取远程代理主题内容?

我在 ec2 上的一台机器上设置了一个 kafka zookeeper 和 3 个代理,端口为 9092..9094,我正在尝试使用另一台机器上的主题内容.端口 2181 (zk)、9092、9093 和 9094(服务器)对消费者机器开放.我什至可以做一个 bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remo ..
发布时间:2021-11-14 23:55:35 其他开发

KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量

我正在尝试在 Kafka Processor 中实现一个事务,以确保我不会两次重新处理相同的消息.给定一条消息 (A),我需要创建一个将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A).从文档中我发现了 Producer 方法 sendOffsetsToTransaction 似乎只有在成功时才能在事务中提交偏移量.这是我的 Processor 的 process() ..
发布时间:2021-11-12 03:43:04 其他开发

Kafkastreams.allMetadata() 方法返回空列表

所以我正在尝试使用 Kafka 流进行交互式查询.我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上).我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka. 我已经设置了这样的主题 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 - ..
发布时间:2021-11-12 03:40:47 其他开发

Kafka Stream 在重新平衡时重新处理旧消息

我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在 ..
发布时间:2021-11-12 03:40:13 其他开发

我可以在与 Kafka Broker 相同的机器上运行 Kafka Streams 应用程序吗?

我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它 ..
发布时间:2021-11-12 03:39:30 其他开发

Kafka Streams:如何更改记录时间戳(0.11.0)?

我正在使用 FluentD(v.12 最后一个稳定版本)向 Kafka 发送消息.但是 FluentD 使用的是旧的 KafkaProducer,因此记录时间戳始终设置为 -1.因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 的时间点. 我真正感兴趣的时间戳是由 fluentd 在消息中发送的: "timestamp" ..
发布时间:2021-11-12 03:37:54 其他开发

Kafka Streams 在生成主题时不会将偏移量增加 1

我已经实现了一个简单的 Kafka 死信记录处理器. 当使用控制台生产者产生的记录时,它工作得很好. 但是,我发现我们的 Kafka Streams 应用程序并不能保证向接收器主题生成记录,即每生成一条记录,偏移量就会增加 1. 死信处理器背景: 我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录.当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续 ..
发布时间:2021-11-12 03:35:28 其他开发

Kafka多分区排序

我知道无法在 Kafka 中对多个分区进行排序,并且分区排序仅对组内的单个使用者(对于单个分区)有保证.但是,使用 Kafka Streams 0.10 现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者方面,可以说使用 Kafka Streams 0.10 现在有可能吗?假设我们收到所有消息,我们是否可以不根据消费的时间戳对所有分区进行排序,然后将它 ..
发布时间:2021-11-12 03:34:39 其他开发

Kafka 连接教程停止工作

我在此链接中遵循第 7 步(使用 Kafka Connect 导入/导出数据): http://kafka.apache.org/documentation.html#quickstart 它运行良好,直到我删除了“test.txt"文件.主要是因为这就是 log4j 文件的工作方式.一段时间后,文件将被旋转 - 我的意思是 - 它将被重命名 &将开始写入具有相同名称的新文件. ..
发布时间:2021-11-12 03:28:57 其他开发

Kafka Leader选举什么时候举行?

Kafka High Level Producer 何时以及多久选举一次领导人?是在发送每条消息之前执行还是仅在创建连接时执行一次? 解决方案 每个 Broker 都有一个关于主题列表(和分区)及其领导者的信息,每当新领导者出现时,动物园管理员都会更新这些信息选择或分区数量发生变化时. 因此,当生产者调用其中一个代理时,它会使用此信息列表进行响应.一旦生产者收到此信息,它就会缓存它并 ..
发布时间:2021-11-12 03:26:33 其他开发

如何从kafka中的文件读取日志?

我想在 kafka 中读取 Apache 日志,然后进一步处理到 Spark Streaming.我是 kafka 的新手.据我所知,我必须编写一个生产者类来读取日志文件. 解决方案 您可以通过创建一个连接器来实​​现此目的,该连接器将日志文件的每一行都导入到 Kafka 主题中.在此处查看示例: https://docs.confluent.io/current/connect/d ..
发布时间:2021-11-12 03:23:32 其他开发

主线程睡眠少于 1000 时无法产生消息

当我使用Kafka的Java API时,如果我让我的主线程睡眠少于2000ns,它无法产生任何消息.我真的很想知道为什么会发生这种情况? 这是我的制作人: 公共类生产者{私人最终 KafkaProducer生产者;私人最终字符串主题;公共生产者(字符串主题,字符串 [] args){//......//......生产者 = 新的 KafkaProducer(props);this.t ..
发布时间:2021-11-12 03:21:31 其他开发

在模式注册中,消费者的模式可能与生产者的不同,这实际上意味着什么

在向 Kafka 生成 AVRO 数据时,Avro 序列化程序在写入数据时使用的字节数组中写入相同的模式 ID. Kafka 消费者根据收到的字节数组中的架构 ID 从架构注册表中获取架构.所以相同的架构 ID 用于生产者和消费者等架构中. 但是为什么很多文章包括这个one说消费者的模式可能与生产者的不同. 请帮助我理解这一点. 解决方案 Kafka 消费者根据架构 ..
发布时间:2021-11-12 03:21:09 其他开发

使用 Windows Subsystem For Linux 运行的 Kafka 的连接超时

我在我的 Windows 10 笔记本电脑上的 Windows SubSystem for Linux 下安装了 Kafka 1.1.0 和 Zookeeper 3.4.12.我可以在 ubuntu 中生成和使用消息,但是当我想从 Windows 生成消息(使用 java 程序或工具 kafka-console-producer.bat)时,出现以下错误: [2018-05-11 15:31 ..
发布时间:2021-11-12 03:20:00 其他开发

使用主题启用 ACL 的 Kafka java 生产者和消费者

我对 kafka ACL 配置有点困惑,我们为生产者和消费者配置授权.有各种示例显示使用命令行生成/使用消息.我们是否需要任何额外的配置来使用 JAVA api 向/从安全的 kafka 主题生成/使用消息. 解决方案 如果您想了解安全 Kafka 服务器的配置细节,Confluent docs 有很好的描述.您可以在文档中注意到您通过文件 client.properties 设置某些属性 ..
发布时间:2021-11-12 03:19:04 其他开发