kafka-producer-api相关内容
我有一个由3个Kafka代理组成的集群,所有主题的复制因子均为3.自最近几天以来,我一直面对这个问题,即使Kafka在所有3台服务器上运行,消费者和生产者在获得响应时也会突然(一天几次)被卡住,直到我检查代理日志(“连接到0的连接已断开,"然后读取罪魁祸首节点(在本例中为第一个节点),然后在该节点上重新启动Zookeeper和代理. 根据日志,由于重新平衡,这种情况正在发生. 我将mi
..
我是Kafka的新手,正在为我的新应用程序尝试一些小用例.用例基本上是 卡夫卡制片人—>卡夫卡消费品—>槽-卡夫卡水源—>水槽-hdfs-水槽. 在消耗(步骤2)时,以下是步骤顺序. 1. Consumer.Poll(1.0) 1.a.产生多个主题(正在监听多个水槽代理) 1.b.生产.轮询() 2.每25毫秒一次Flush() 3.每隔msgs提交一次(asynchCommit = fal
..
我们将log retention hours设置为1小时,如下所示(以前的设置是72H) 使用以下Kafka命令行工具,将kafka retention.ms设置为1H.我们的目的是清除主题-test_topic中早于1H的数据,因此我们使用了以下命令: kafka-configs.sh --alter \ --zookeeper localhost:2181 \ --ent
..
我想阅读kafka中的Apache日志,然后进一步处理Spark Streaming.我是kafka的新手.据我了解,我必须编写一个生产者类来读取日志文件. 解决方案 您可以通过创建一个连接器来实现此目的,该连接器会将日志文件的每一行都提供给Kafka主题.在此处查看示例: https://docs.confluent.io /current/connect/devguide.h
..
我无法在Android应用中使用Kafka生产者.我收到以下异常. Caused by: java.lang.ClassNotFoundException: Didn't find class "javax.management.DynamicMBean" on path: DexPathList[[dex file "/data/data/com.a.kafkaproducer/files
..
我正在尝试向具有2个分区的单个主题生成一些消息.所有消息都将仅分配到2号分区. 我希望生产者流可以将消息分布在所有分区上. const kafka = require('kafka-node') const { Transform } = require('stream'); const _ = require('lodash'); const client = new kafka.Kaf
..
尽管为字段定义了默认值,但kafka-avro-console-producer会完全忽略它: $ kafka-avro-console-producer --broker-list localhost:9092 --topic test-avro \ --property schema.registry.url=http://localhost:8081 --property \ valu
..
我只是从Kafka开始,这听起来对微服务确实很好,但是我基本上在Scala工作. 我通过以下方式将kafka添加到我的sbt项目中: libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0" 然后我这样做: import org.apache.kafka.clients.producer.{Callback,
..
我在Windows 10笔记本电脑上的Linux的Windows SubSystem下安装了带Zookeeper 3.4.12的Kafka 1.1.0.我可以在ubuntu中处理和使用消息,但是当我想从Windows(使用Java程序或使用工具kafka-console-producer.bat)生成消息时,出现以下错误: [2018-05-11 15:31:01,449]错误当将消息发送到
..
尝试将大约5万条消息加载到KAFKA主题中.在少数运行开始时,但并非总是如此. org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals
..
我正在尝试在Kafka Processor中实现交易,以确保不会重复处理同一则消息两次.给定一条消息(A),我需要创建将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息(A).从文档中,我发现了Producer方法sendOffsetsToTransaction,该方法似乎只有在事务成功后才能在事务中提交偏移量.这是我的Processor的process()方法中的代码:
..
我有一个json有效负载,我想作为生产者Api中的标头发送 { "type": "record_created", "version": 1, "orgId": "", "userId": "", "userName": "", "correlationId": "", "jobId": "" } 上述有效载荷应作为标头发送
..
每个人,局域网中都有一个虚拟服务器,其IP为192.168.18.230,而我的机器IP为192.168.0.175. 今天,我尝试通过Kafka控制台生产者 使用我的计算机(192.168.0.175)向虚拟服务器(192.168.18.230)发送一些消息. $ bin/kafka-console-producer.sh --broker-list 192.168.18.230:9092
..
我在10.242.44.55的专用网络中有一台带有Zookeeper和Kafka的服务器.我已经将网关上的端口从[public_ip]:39092转发到了10.242.44.55:9092.我从listeners=INTERNAL://:9092,EXTERNAL://:39092 advertised.listeners=INTERNAL://10.242.44.55:9092,EXTERNAL
..
我的水槽属性: { "name": "jdbc-oracle", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "topics": "orders", "connection.url": "jdbc:orac
..
我正在尝试用KafkaAvroSerialzer建立一个Kafka生产者以获取价值.当eve rit试图创建生产者时,我遇到了这个错误.我正在使用融合5.2.1中提供的所有jars java.lang.NoClassDefFoundError: Could not initialize class io.confluent.kafka.schemaregistry.client.rest.R
..
我有一个经典的微服务架构.因此,有不同的应用程序.每个应用程序可能具有1..N实例.系统已部署到Kubernetes.,因此,我们有许多不同的PODs,它们可以随时启动和停止. 我想实现读取过程写入模式,因此我需要卡夫卡交易. 要配置事务,我需要为每个Kafka生产者设置一些transaction id. (实际上,我需要transaction-id-prefix,因为我将Spring
..
我用pykafka写了一个简单的生产者,但似乎无法使其执行.基本的生产者和生产请求如下.当我用一条小消息调用此方法100次,并添加一些计时/配置代码时,大约需要14秒钟.我知道这是异步发送消息,因此我希望它的运行速度非常快.我缺少某些设置吗?我也尝试过使用min_queued_messages = 1进行尝试,而这花费了大约2秒的时间. from pykafka import KafkaCl
..
我正在使用Spring Kafka模板来生成消息.而且它生成消息的速度太慢.大约需要8分钟才能产生15000条消息. 以下是我创建Kafka模板的方式: @Bean public ProducerFactory highSpeedAvroProducerFactory( @Qualifier("highSpeedProdu
..
我正在研究Kafka 0.9.我想知道是否有任何方法可以通过了解分区和偏移量来从其主题中检索已处理的消息.例如,使用者当前正在使用分区1和偏移量10处的消息.我想在同一分区和偏移量5处获取消息. 我能想到的一种方法是将偏移量重置为5并消耗一条消息.但是poll()方法只能返回一批消息.因此,我必须接受第一个消息,而忽略其他消息.处理完消息后,将偏移量重置回去. 我认为这会起作用.但是仍
..