kafka-producer-api相关内容
虽然为字段定义了默认值,kafka-avro-console-producer 完全忽略它: $ kafka-avro-console-producer --broker-list localhost:9092 --topic test-avro \--property schema.registry.url=http://localhost:8081 --property \value.sch
..
我有一个 Kafka Streams 应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题. 这是我的Kafka集群的配置: 5 个 Kafka 经纪人Kafka 主题 - 15 个分区和复制因子 3. 我的 Kafka Streams 应用程序与我的 Kafka 代理运行在同一台机器上. 每小时消耗/产生几百万条记录.每当我关闭代理时,应用程序都会进入重新平衡状态,并且在
..
逻辑是,从自定义源获取数据的流式作业必须同时写入 Kafka 和 HDFS. 我编写了一个(非常)基本的 Kafka 生产者来执行此操作,但是整个流工作都挂在 send 方法上. class KafkaProducer(val kafkaBootstrapServers: String, val kafkaTopic: String, val sslCertificatePath: Str
..
我在 10.242.44.55 上的私有网络中有一个带有 Zookeeper 和 Kafka 的服务器.我已将网关上的端口从 [public_ip]:39092 转发到 10.242.44.55:9092.我从 另一个问题: listeners=INTERNAL://:9092,EXTERNAL://:39092Adverted.listeners=INTERNAL://10.242.44.55
..
我正在按照官方文档实施kakf-connect 从文件中读取数据. 我让 kafka 完美运行.生产者和消费者发送和接收消息. 但是,当我运行以下命令时: sudo ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties 我收到以下错
..
我在异步模式下使用 kafka 生产者,但是当所有代理都关闭时,它的行为就像同步一样,它会一直等到 metadata.fetch.timeout.ms 过期,这对我来说是 60 秒.我的第一个问题,这是正常行为还是我做错了什么? 因为我的逻辑中的事务应该在最多 100 毫秒内完成,所以这个超时值对我来说是一个非常大的延迟.也许将 metadata.fetch.timeout.ms 设置为 1
..
我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它
..
我使用的是 Kafka 0.10.0.在处理之前,我想知道一个分区中记录的大小. 在 0.9.0.1 版本中,我曾经通过使用以下代码来查找分区的 latest 和 earliest 偏移量之间的差异.新版本在检索consumer#position方法时卡住了. package org.apache.kafka.example.utils;导入 java.util.ArrayList;导入
..
Kafka Producer 配置的哪些参数需要更改,以便生产者应该:1)重试n次2) n间隔后用于相同的消息,以防代理宕机. 我需要处理与此相关的情况:https://github.com/rsyslog/rsyslog/issues/1052 解决方案 您可以将“重试"设置为 n(次数).但这还不够,您还需要研究其他配置,这些配置也可能因此受到影响或使其无效. 1) 如果您
..
嗨, 我正在使用 spring-kafka 1.3.0.RELEASE 创建一个事务性生产者.当引导服务器关闭时,DefaultKafkaProducerFactory 无休止地等待,直到引导服务器启动. 我做错了什么?我可以设置超时和/或其他类似的属性吗? 这是我重现场景的代码示例: public static void main(String[] args) {final
..
我有一个 Kafka Streams 应用程序,它从几个主题中获取数据并将数据连接起来并将其放入另一个主题中. Kafka 配置: 5 个 kafka 代理Kafka 主题 - 15 个分区和 3 个复制因子. 注意:我在运行 Kafka Broker 的同一台机器上运行 Kafka Streams 应用程序. 每小时消耗/产生数百万条记录.每当我关闭任何 kafka 经纪人时,它
..
我正在尝试使用 Confluent 平台提供的 kafka-hdfs-connector 将数据从 Kafka 复制到 Hive 表中.虽然我能够成功做到这一点,但我想知道如何根据时间间隔对传入的数据进行存储.例如,我想每 5 分钟创建一个新分区. 我尝试了 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner 和 partit
..
更新主题的 TTL,以便记录在主题中保留 10 天.我必须通过让所有其他主题 TTL 保持相同的当前配置来为特定主题执行此操作,我必须使用 java 执行此操作,因为我将主题推送到 kafka 通过 Java.我正在设置以下属性以将主题推送到 kafka Properties props = new Properties();props.put("bootstrap.servers", KAFK
..
我有一个单节点 Kafka 代理,它在单节点 kubernetes 环境中的 pod 内运行.我将此图像用于 kafka:https://hub.docker.com/r/wurstmeister/卡夫卡 kafka 版本 = 1.1.0 Kubernetes 集群在服务器上的虚拟机内运行.VM 在活动接口 ens32 上具有以下 IP - 192.168.3.102 Kafka
..
我有以下 kafka 制作人 Api 程序,我是 kafka 本身的新手.下面的代码从 API 之一获取数据并将消息发送到 kafka 主题. package kafka_Demo;导入 java.util.Properties;导入 java.io.BufferedReader;导入 java.io.InputStream;导入 java.io.InputStreamReader;导入 org
..
您如何控制 Kafka 生产者或消费者的控制台日志记录级别?我在 Scala 中使用 Kafka 0.9 API. 每次调用 KafkaProducer 上的 send 时,控制台都会给出如下输出.这是否表明我没有正确设置 KafkaProducer,而不仅仅是日志记录过多的问题? 17:52:21.236 [pool-10-thread-7] INFO o.a.k.c.producer.
..
我在生成或使用数据时没有看到任何失败,但是在生产中存在大量重复消息.对于一个收到大约 100k 条消息的小主题,有大约 4k 重复,尽管就像我说的没有失败,最重要的是没有实现重试逻辑或设置配置值. 我还检查了那些重复消息的偏移值,每个消息都有不同的值,告诉我问题出在生产者身上. 任何帮助将不胜感激 解决方案 阅读有关 kafka 中消息传递的更多信息: https://k
..
Kafka High Level Producer 何时以及多久选举一次领导人?是在发送每条消息之前执行还是仅在创建连接时执行一次? 解决方案 每个 Broker 都有一个关于主题列表(和分区)及其领导者的信息,每当新领导者出现时,动物园管理员都会更新这些信息选择或分区数量发生变化时. 因此,当生产者调用其中一个代理时,它会使用此信息列表进行响应.一旦生产者收到此信息,它就会缓存它并
..
EDIT2:最后,我使用 Java 制作了自己的生产者,并且运行良好,所以问题出在 Kafka-console-producer.kafka-console-consumer 运行良好. 编辑:我已经尝试过 0.9.0.1 版本并且具有相同的行为. 我正在完成我的学士期末项目,即 Spark Streaming 和 Flink 之间的比较.在这两个框架之前,我使用 Kafka 和脚本来
..
在最新版本的 Kafka 0.11.0.0 中,Apache 团队引入了幂等生产者和事务.是否可以保证我们要记录的一整套消息(例如 100 万条)仅在最后提交?我希望,例如,如果生产者与经纪人失去连接并且无法恢复它,消费者将看不到任何消息.是否可以? 解决方案 是的,可以在生产者中使用 Transactions.您启动一个事务,发布所有消息,然后提交该事务.所有消息一次写入 Kafka,但
..