kafka-producer-api相关内容
我有一个主题,该主题包含10个分区,1个消费者组和4个消费者,并且工作人员人数为3. 我可以看到分区中的消息分布不均,一个分区中的数据太多,而另一个分区是免费的. 如何使我的生产者将负载平均分配到所有分区中,以便正确使用所有分区? 解决方案 根据DefaultPartitioner类本身的JavaDoc注释,默认分区策略为: 如果在记录中指定了分区,请使用它. 如果未指
..
当我尝试发送超过1 Mb大小的消息时,出现消息大小太大异常.当我尝试生成一条消息时,该错误出现在我的客户端应用程序中.经过一番谷歌搜索后,我发现应该更改设置以增加最大邮件大小.好吧,我是在/kafka/config/server.properties 文件中完成的.我添加了下2个设置: message.max.bytes = 15728640copy.fetch.max.bytes = 157
..
我正在发布到远程kafka服务器,并尝试使用来自该远程服务器的消息.(卡夫卡v 0.90.1)发布工作正常,但也不费劲. 发布者 package org.test;导入java.io.IOException;导入java.util.Properties;导入org.apache.kafka.clients.producer.KafkaProducer;导入org.apache.kafka
..
我是Kafka的新手,正在为我的新应用程序尝试一些小用例.用例基本上是卡夫卡制片人—>卡夫卡消费品—>槽-卡夫卡水源—> flume-hdfs-sink. 在消耗(步骤2)时,以下是步骤顺序.1. Consumer.Poll(1.0)1.a.产生多个主题(正在监听多个水槽代理)1.b.生产.轮询()2.每25毫秒一次Flush()3.每隔msgs提交一次(asynchCommit = fal
..
关于订购,我遇到了两个短语 生产者发送到特定主题分区的消息将是按发送顺序附加.即,如果发送了记录M1由同一生产者作为记录M2,并且先发送M1,然后发送M1偏移量将小于M2,并在日志中更早显示. 另一个 (config param)max.in.flight.requests.per.connection-最大数量客户端将在单个连接上发送的未确认请求阻止之前.请注意,如果此设置设置
..
我在应用程序中使用的是Kafka 1.0.1,并且已经开始使用0.11中引入的Idempotent Producer功能,并且在使用Idempontent功能时难以理解订购保证. 我的生产者的配置是: enable.idempotence = true max.in.flight.requests.per.connection = 5 重试= 50 acks =全部
..
我知道,当一个主题具有多个分区时,Kafka将无法保证数据的排序.但是我的问题是:-我需要对事件主题进行多个分区(用户活动生成事件),因为我希望多个使用者组使用该主题中的数据.但是有时候我需要重新引导整个数据,即从头到尾读取完整的数据并根据Kafka中的历史消息重建事件图,然后我失去了创建问题的顺序.一种方法可能是在Map-Reduce范式中对其进行处理,在该范式中,我根据时间映射数据并对其进行排
..
我正在使用FluentD(第12版的最新稳定版本)向Kafka发送消息.但是FluentD使用的是旧的KafkaProducer,因此记录时间戳始终设置为-1.因此,我必须使用WallclockTimestampExtractor将记录的时间戳设置为消息到达kafka时的时间点. 我真正感兴趣的时间戳是通过消息中的fluentd发送的: “时间戳":"1507885936",“主机":
..
我在为我的python代码使用kafka时遇到麻烦.我使用python 2.7.5和软件包kafka-python. 我想通过kafka主题发送csv(300000行,每行20个字段).在此之前,我将每个序列化排成一个json文件,直到这里,一切正常.我的生产者发送文件的每一行,然后关闭.但另一方面,我的消费者什么也没消费... 就kafka而言,我有一个带有单个分区的主题.我的kaf
..
需要更改Kafka Producer配置的哪些参数,以便生产者应:1)重试n次2)在n个间隔之后如果代理关闭,则显示相同的消息. 我需要处理与此有关的情况: https://github.com/rsyslog/rsyslog/issues/1052 解决方案 您可以将"重试"设置为n(次数).但这还不够,您还需要研究其他配置,否则可能会因此而受到影响或使配置无效. 1)如果生
..
尝试将Avro格式的消息发送到Kafka并使用它.直到一些研究添加了Thread.sleep(16000)以便生产者等待消息后,它才发送消息.但是,它再次停止工作.这是 org.apache.kafka.common.protocol.Errors-意外错误代码:87.无法生成主题消息. 有什么建议吗?我的下面的代码 公共类AvroAutomationTest3IT {私有静态最终Logg
..
我正在读取一个csv文件,并将此输入的行提供给我的Kafka Producer.现在我希望我的Kafka Producer以每秒100条消息的速度产生消息. 解决方案 看看Kafka Producer的 linger.ms 和 batch.size 属性.您必须相应地调整这些属性才能获得所需的速率. 生产者将在请求传输之间到达的所有记录归为一个批处理的请求.通常,只有在记录到达速度快
..
尝试将大约5万条消息加载到KAFKA主题中.在少数运行开始时,但并非总是如此. org.apache.kafka.common.KafkaException:无法执行事务方法,因为我们处于错误状态在org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManage
..
在向Kafka生成AVRO数据时,Avro串行器将相同的模式ID写入用于写入数据的字节数组中. Kafka Consumer根据接收到的字节数组中的架构ID从架构注册表中获取架构.因此,在生产者和使用者中都使用相同的架构ID,因此该架构也是如此. 但是为什么有很多文章包括此一个说消费者的模式可能与生产者的模式不同. 请帮助我理解这一点. 解决方案 Kafka Cons
..
在我的Scala(2.11)流应用程序中,我正在使用IBM MQ中一个队列中的数据,并将其写入具有一个分区的Kafka主题.在使用了来自MQ的数据之后,消息有效负载被拆分为3000个较小的消息,这些消息存储在字符串序列中.然后,使用KafkaProducer将这3000封邮件中的每封邮件发送到Kafka(2.x版). 您将如何发送这3000条消息? 我既不能增加IBM MQ中的队列数量
..
我在最近的c#项目中使用Confluent kafka软件包.我通过以下方式创建了生产者: prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"}; foreach(msg in msglist){ using(var producer = new ProducerBuilder
..
我有3个节点kafka群集,每个群集都有zookeeper和kafka.如果我明确杀死了Zookeeper和kafka的领导节点,则整个集群将不接受任何传入的数据并等待该节点返回. kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 min.insync.replicas=2 --
..
ItemReader正在从DB2读取数据,并提供了Java对象ClaimDto.现在,ClaimProcessor接受ClaimDto的对象,并返回CompositeClaimRecord对象,该对象由claimRecord1和claimRecord2组成,该对象将被发送到两个不同的Kafka主题.如何分别将claimRecord1和claimRecord2写入topic1和topic2.
..
下面是etl.xml中job的配置
..
我有ListenAbleFuture列表.我要等待这个列表 ListenableFuture>最多15分钟(如果它们尚未完成). 我该怎么做到. 当前我正在执行此操作,但是我不希望每个ListenAbleFuture等待15分钟. for (ListenableFuture
..