kafka-producer-api相关内容

Kafka Stream 在重新平衡时重新处理旧消息

我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在 ..

在 kafka 中处理经纪人

我在异步模式下使用 kafka 生产者,但是当所有代理都关闭时,它的行为就像同步一样,它会一直等到 metadata.fetch.timeout.ms 过期,这对我来说是 60 秒.我的第一个问题,这是正常行为还是我做错了什么? 因为我的逻辑中的事务应该在最多 100 毫秒内完成,所以这个超时值对我来说是一个非常大的延迟.也许将 metadata.fetch.timeout.ms 设置为 1 ..
发布时间:2021-11-12 02:38:28 其他开发

我可以在与 Kafka Broker 相同的机器上运行 Kafka Streams 应用程序吗?

我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它 ..

如何在 Kafka 0.10 中找到主题分区的偏移范围?

我使用的是 Kafka 0.10.0.在处理之前,我想知道一个分区中记录的大小. 在 0.9.0.1 版本中,我曾经通过使用以下代码来查找分区的 latest 和 earliest 偏移量之间的差异.新版本在检索consumer#position方法时卡住了. package org.apache.kafka.example.utils;导入 java.util.ArrayList;导入 ..
发布时间:2021-11-12 02:37:09 其他开发

Kafka Producer 配置重试策略

Kafka Producer 配置的哪些参数需要更改,以便生产者应该:1)重试n次2) n间隔后用于相同的消息,以防代理宕机. 我需要处理与此相关的情况:https://github.com/rsyslog/rsyslog/issues/1052 解决方案 您可以将“重试"设置为 n(次数).但这还不够,您还需要研究其他配置,这些配置也可能因此受到影响或使其无效. 1) 如果您 ..

当引导服务器关闭时,带有 transactionIdPrefix 的 DefaultKafkaProducerFactory 无限等待

嗨, 我正在使用 spring-kafka 1.3.0.RELEASE 创建一个事务性生产者.当引导服务器关闭时,DefaultKafkaProducerFactory 无休止地等待,直到引导服务器启动. 我做错了什么?我可以设置超时和/或其他类似的属性吗? 这是我重现场景的代码示例: public static void main(String[] args) {final ..
发布时间:2021-11-12 02:31:04 Java开发

我可以在与 Kafka Broker 相同的机器上运行 Kafka Streams 应用程序吗?

我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它 ..

基于时间的桶记录(kafka-hdfs-connector)

我正在尝试使用 Confluent 平台提供的 kafka-hdfs-connector 将数据从 Kafka 复制到 Hive 表中.虽然我能够成功做到这一点,但我想知道如何根据时间间隔对传入的数据进行存储.例如,我想每 5 分钟创建一个新分区. 我尝试了 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner 和 partit ..

使用 Java 更新 kafka 中特定主题的 TTL

更新主题的 TTL,以便记录在主题中保留 10 天.我必须通过让所有其他主题 TTL 保持相同的当前配置来为特定主题执行此操作,我必须使用 java 执行此操作,因为我将主题推送到 kafka 通过 Java.我正在设置以下属性以将主题推送到 kafka Properties props = new Properties();props.put("bootstrap.servers", KAFK ..
发布时间:2021-11-12 02:29:14 Java开发

来自 Kafka Producer 的过多控制台消息

您如何控制 Kafka 生产者或消费者的控制台日志记录级别?我在 Scala 中使用 Kafka 0.9 API. 每次调用 KafkaProducer 上的 send 时,控制台都会给出如下输出.这是否表明我没有正确设置 KafkaProducer,而不仅仅是日志记录过多的问题? 17:52:21.236 [pool-10-thread-7] INFO o.a.k.c.producer. ..
发布时间:2021-11-12 02:27:07 其他开发

卡夫卡有重复的消息

我在生成或使用数据时没有看到任何失败,但是在生产中存在大量重复消息.对于一个收到大约 100k 条消息的小主题,有大约 4k 重复,尽管就像我说的没有失败,最重要的是没有实现重试逻辑或设置配置值. 我还检查了那些重复消息的偏移值,每个消息都有不同的值,告诉我问题出在生产者身上. 任何帮助将不胜感激 解决方案 阅读有关 kafka 中消息传递的更多信息: https://k ..
发布时间:2021-11-12 02:26:20 其他开发

Kafka Leader选举什么时候举行?

Kafka High Level Producer 何时以及多久选举一次领导人?是在发送每条消息之前执行还是仅在创建连接时执行一次? 解决方案 每个 Broker 都有一个关于主题列表(和分区)及其领导者的信息,每当新领导者出现时,动物园管理员都会更新这些信息选择或分区数量发生变化时. 因此,当生产者调用其中一个代理时,它会使用此信息列表进行响应.一旦生产者收到此信息,它就会缓存它并 ..

Kafka + Spark Streaming:1 秒的恒定延迟

EDIT2:最后,我使用 Java 制作了自己的生产者,并且运行良好,所以问题出在 Kafka-console-producer.kafka-console-consumer 运行良好. 编辑:我已经尝试过 0.9.0.1 版本并且具有相同的行为. 我正在完成我的学士期末项目,即 Spark Streaming 和 Flink 之间的比较.在这两个框架之前,我使用 Kafka 和脚本来 ..

保证 Kafka Producers 唯一的全局事务

在最新版本的 Kafka 0.11.0.0 中,Apache 团队引入了幂等生产者和事务.是否可以保证我们要记录的一整套消息(例如 100 万条)仅在最后提交?我希望,例如,如果生产者与经纪人失去连接并且无法恢复它,消费者将看不到任何消息.是否可以? 解决方案 是的,可以在生产者中使用 Transactions.您启动一个事务,发布所有消息,然后提交该事务.所有消息一次写入 Kafka,但 ..
发布时间:2021-11-12 02:23:15 其他开发