kafka-producer-api相关内容

消费者和生产者失败,并出现错误:“在读取响应之前,连接到0的连接已断开".

我有一个由3个Kafka代理组成的集群,所有主题的复制因子均为3.自最近几天以来,我一直面对这个问题,即使Kafka在所有3台服务器上运行,消费者和生产者在获得响应时也会突然(一天几次)被卡住,直到我检查代理日志(“连接到0的连接已断开,"然后读取罪魁祸首节点(在本例中为第一个节点),然后在该节点上重新启动Zookeeper和代理. 根据日志,由于重新平衡,这种情况正在发生. 我将mi ..

了解Kafka poll(),flush()&犯罪()

我是Kafka的新手,正在为我的新应用程序尝试一些小用例.用例基本上是 卡夫卡制片人—>卡夫卡消费品—>槽-卡夫卡水源—>水槽-hdfs-水槽. 在消耗(步骤2)时,以下是步骤顺序. 1. Consumer.Poll(1.0) 1.a.产生多个主题(正在监听多个水槽代理) 1.b.生产.轮询() 2.每25毫秒一次Flush() 3.每隔msgs提交一次(asynchCommit = fal ..

即使保留时间/大小仍然保留,数据仍然保留在Kafka主题中

我们将log retention hours设置为1小时,如下所示(以前的设置是72H) 使用以下Kafka命令行工具,将kafka retention.ms设置为1H.我们的目的是清除主题-test_topic中早于1H的数据,因此我们使用了以下命令: kafka-configs.sh --alter \ --zookeeper localhost:2181 \ --ent ..

如何从kafka中的文件中读取日志?

我想阅读kafka中的Apache日志,然后进一步处理Spark Streaming.我是kafka的新手.据我了解,我必须编写一个生产者类来读取日志文件. 解决方案 您可以通过创建一个连接器来实​​现此目的,该连接器会将日志文件的每一行都提供给Kafka主题.在此处查看示例: https://docs.confluent.io /current/connect/devguide.h ..

ProducerStream仅产生单个分区

我正在尝试向具有2个分区的单个主题生成一些消息.所有消息都将仅分配到2号分区. 我希望生产者流可以将消息分布在所有分区上. const kafka = require('kafka-node') const { Transform } = require('stream'); const _ = require('lodash'); const client = new kafka.Kaf ..
发布时间:2020-04-25 08:39:11 其他开发

KafkaProducer sendOffsetsToTransaction需要offset + 1才能成功提交当前偏移量

我正在尝试在Kafka Processor中实现交易,以确保不会重复处理同一则消息两次.给定一条消息(A),我需要创建将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息(A).从文档中,我发现了Producer方法sendOffsetsToTransaction,该方法似乎只有在事务成功后才能在事务中提交偏移量.这是我的Processor的process()方法中的代码: ..

如何为Kubernetes中托管的多个应用程序选择Kafka交易ID?

我有一个经典的微服务架构.因此,有不同的应用程序.每个应用程序可能具有1..N实例.系统已部署到Kubernetes.,因此,我们有许多不同的PODs,它们可以随时启动和停止. 我想实现读取过程写入模式,因此我需要卡夫卡交易. 要配置事务,我需要为每个Kafka生产者设置一些transaction id. (实际上,我需要transaction-id-prefix,因为我将Spring ..

为什么pykafka中的生产者这么慢?

我用pykafka写了一个简单的生产者,但似乎无法使其执行.基本的生产者和生产请求如下.当我用一条小消息调用此方法100次,并添加一些计时/配置代码时,大约需要14秒钟.我知道这是异步发送消息,因此我希望它的运行速度非常快.我缺少某些设置吗?我也尝试过使用min_queued_messages = 1进行尝试,而这花费了大约2秒的时间. from pykafka import KafkaCl ..
发布时间:2020-04-25 08:38:35 Python

通过了解消息在Kafka中的分区和偏移量来检索消息

我正在研究Kafka 0.9.我想知道是否有任何方法可以通过了解分区和偏移量来从其主题中检索已处理的消息.例如,使用者当前正在使用分区1和偏移量10处的消息.我想在同一分区和偏移量5处获取消息. 我能想到的一种方法是将偏移量重置为5并消耗一条消息.但是poll()方法只能返回一批消息.因此,我必须接受第一个消息,而忽略其他消息.处理完消息后,将偏移量重置回去. 我认为这会起作用.但是仍 ..
发布时间:2020-04-25 08:38:29 其他开发