kafka-producer-api相关内容
使用Kafka-Python将CSV数据发送到Kafka主题.消费者成功发送和接收数据.现在,我试图连续流式处理csv文件,添加到文件中的任何新条目都应自动发送到Kafka主题.任何建议对于连续流CSV文件都是有帮助的 下面是我现有的代码, from kafka import KafkaProducer import logging from json import
..
这是我的配置: props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, "1"); props.put(ProducerConfig.LINGER_MS_CONFIG, "1"); 和 try { producer.send(record); } catch
..
我是kafka的新手,我使用Kafka Producer Java api. 面对卡夫卡这个问题, Kafka: Invalid transition attempted from state COMMITTING_TRANSACTION to state ABORTING_TRANSACTION. 人们写道,只有在没有交易进行中时才应调用producer.abortTransaction(
..
在 ProducerRecord 中尝试时间戳记;我发现了一些奇怪的东西.从生产者发送少量消息后,我运行了kafka-console-consumer.sh并验证了这些消息是否在主题中.我停下了制片人,等了一分钟.当我重新运行kafka-console-consumer.sh时,它没有显示我先前生成的消息.我还添加了producer.flush()和producer.close(),但结果仍然相同
..
逻辑是,从自定义源获取数据的流作业必须同时写入Kafka和HDFS. 我写了一个(非常)基本的Kafka生产者来做这个,但是整个流工作都挂在send方法上. class KafkaProducer(val kafkaBootstrapServers: String, val kafkaTopic: String, val sslCertificatePath: String, val
..
我在本地计算机上运行的Kafka集群的minikube设置以外的默认设置.我已经在一个Web服务中创建了一个生产者,并将其部署在minikube上. 为使生产者连接到Kafka,我使用的是10.0.2.2 IP,我还将其用于在minikube外部连接Cassandra和DGraph,因为它们工作正常. 但是,Kafka生产者无法正常工作,甚至在发送数据时也没有抛出错误,提示Broker
..
我该如何知道何时必须扩大消费群体中的消费群体.如果有快速的生产者,消费者扩大规模的诱因是什么? 解决方案 一种简单的方法是获取使用者滞后(可以将其计算为提交的偏移量与Beginning_offset之差),并且如果滞后是在最后一次计算的n倍增加,您可以扩大规模,反之亦然.您可能需要考虑一些极端情况,例如,如果消费者减少并且滞后会增加,并且自动缩放功能可能会产生更多线程/机器,那么这种情况会
..
我正在使用apache camel kafka作为客户端来生成消息,我观察到的是kafka生产者花费1 ms来推送消息,如果我通过使用骆驼聚合将消息合并为批处理,则要花费100ms来推送一条消息. 安装简要说明 3 kafka clusther 16Core 32GB RAM 示例代码 String endpoint="kafka:test?topic=test&brok
..
我将spring-kafka用于幂等生产者配置: 这些是我的配置道具: Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers())); //configure
..
我在单个节点上运行kafka,我想查看我的kafka代理关闭时的kafka生产者行为,然后在几秒钟内重新启动我的代理,因此我创建了spring boot projet,在其中我可能会发送1000个客户JSON对象,并打印每次发送的JSON对象的偏移量. 我的应用程序运行正常,但是当我关闭我的kafka代理并且几秒钟后重新启动我的代理时,我的生产者通常会从最新偏移量发送对象. 对于我的示例,当控制台
..
我遇到了错误: org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms. 当尝试使用Java在Windows上的本地kafka实例中生成主题时.请注意,主题testtopic2存在,并且我可以使用Windows控制台生产器向它生成消息
..
有什么方法可以在kafka-console-producer中产生一个带有空值的消息(即,将其标记为供压缩程序使用逻辑删除的消息)? 我尝试生成"mykey"和"mykey |".前者产生错误,而后者使值成为空字符串.像这样运行生产者: $KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic
..
如何在kafka中发送同步消息? 一种实现方法是设置属性参数 max.in.flight.requests.per.connection = 1. 但是我想知道在kafka中是否有直接或替代的方式发送同步消息. (类似于producer.syncSend(...)等). 解决方案 生产者API从send返回Future.您可以调用Future#get进行阻止,直到发送完成为止.
..
我正在我的应用程序中使用Kafka 1.0.1,并且已经开始使用0.11中引入的Idempotent Producer功能,并且在使用Idempontent功能时难以理解订购保证. 我的生产者的配置是: enable.idempotence = true max.in.flight.requests.per.connection = 5 retries = 50 ac
..
我们正在使用标头向Kafka发送带有标头的消息 org.apache.kafka.clients.producer.ProducerRecord public ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers) { this(topic, partition
..
我创建了3个Kafka经纪人设置,其经纪人ID为20、21、22.然后,我创建了这个主题: bin/kafka-topics.sh --zookeeper localhost:2181 \ --create --topic zeta --partitions 4 --replication-factor 3 导致: 当生产者发送消息"hello world"时,到主题zeta
..
当我们生成消息时,我们可以定义一个回调,该回调可能会发生异常: kafkaProducer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { // OK } e
..
我希望具有完全一次的语义,但是我不想与Consumer一起阅读消息.我宁愿使用Kafka Streams AP阅读消息.如果我在Stream配置中添加 processing.guarantee = exactly_once ,会保留一次精确的语义吗? 解决方案 一次处理是基于读写过程的模式. Kafka Streams使用此模式,因此,如果您编写一个常规的Kafka Streams应用程序
..
我正在编写一个REST代理,如融合的rest代理.它需要一个JSON负载,模式主题和ID,然后将JSON负载作为Avro对象写入流中.当我使用kafka-avro-console-consumer读取消息时,出现“未知魔术字节"错误. 这是我的kafka生产者配置: properties.put("client.id", LocalHostUtils.getLocalH
..
试图了解(对于kafka来说是新的)kafka中的轮询事件循环是如何工作的. 用例:有关该主题的25条记录,最大民意调查大小设置为5. max.poll.interval.ms = 5000 //5 seconds by default max.poll.records = 5 任务顺序 从主题中轮询记录. 在for循环中处理记录. 某些处理登录,逻辑将通过或失败.
..