kafka-producer-api相关内容

在Kafka-Python中流式传输CSV数据

使用Kafka-Python将CSV数据发送到Kafka主题.消费者成功发送和接收数据.现在,我试图连续流式处理csv文件,添加到文件中的任何新条目都应自动发送到Kafka主题.任何建议对于连续流CSV文件都是有帮助的 下面是我现有的代码, from kafka import KafkaProducer import logging from json import ..
发布时间:2021-02-14 19:59:40 其他开发

Kafka主题中丢失的消息

在 ProducerRecord 中尝试时间戳记;我发现了一些奇怪的东西.从生产者发送少量消息后,我运行了kafka-console-consumer.sh并验证了这些消息是否在主题中.我停下了制片人,等了一分钟.当我重新运行kafka-console-consumer.sh时,它没有显示我先前生成的消息.我还添加了producer.flush()和producer.close(),但结果仍然相同 ..
发布时间:2021-02-14 19:59:29 Java开发

部署在Kubernetes上的Kafka Producer无法生产在本地计算机上运行的Kafka集群

我在本地计算机上运行的Kafka集群的minikube设置以外的默认设置.我已经在一个Web服务中创建了一个生产者,并将其部署在minikube上. 为使生产者连接到Kafka,我使用的是10.0.2.2 IP,我还将其用于在minikube外部连接Cassandra和DGraph,因为它们工作正常. 但是,Kafka生产者无法正常工作,甚至在发送数据时也没有抛出错误,提示Broker ..
发布时间:2021-02-14 19:59:18 其他开发

如何在消费者组kafka中动态添加消费者

我该如何知道何时必须扩大消费群体中的消费群体.如果有快速的生产者,消费者扩大规模的诱因是什么? 解决方案 一种简单的方法是获取使用者滞后(可以将其计算为提交的偏移量与Beginning_offset之差),并且如果滞后是在最后一次计算的n倍增加,您可以扩大规模,反之亦然.您可能需要考虑一些极端情况,例如,如果消费者减少并且滞后会增加,并且自动缩放功能可能会产生更多线程/机器,那么这种情况会 ..
发布时间:2021-02-14 19:59:14 其他开发

为什么或如何在我重启我的kafka经纪人时丢失一些消息?

我在单个节点上运行kafka,我想查看我的kafka代理关闭时的kafka生产者行为,然后在几秒钟内重新启动我的代理,因此我创建了spring boot projet,在其中我可能会发送1000个客户JSON对象,并打印每次发送的JSON对象的偏移量. 我的应用程序运行正常,但是当我关闭我的kafka代理并且几秒钟后重新启动我的代理时,我的生产者通常会从最新偏移量发送对象. 对于我的示例,当控制台 ..

从控制台产生具有空值(墓碑)的Kafka消息

有什么方法可以在kafka-console-producer中产生一个带有空值的消息(即,将其标记为供压缩程序使用逻辑删除的消息)? 我尝试生成"mykey"和"mykey |".前者产生错误,而后者使值成为空字符串.像这样运行生产者: $KAFKA_HOME/bin/kafka-console-producer --broker-list localhost:9092 --topic ..
发布时间:2021-02-14 19:58:57 其他开发

在kafka中发送同步消息?

如何在kafka中发送同步消息? 一种实现方法是设置属性参数 max.in.flight.requests.per.connection = 1. 但是我想知道在kafka中是否有直接或替代的方式发送同步消息. (类似于producer.syncSend(...)等). 解决方案 生产者API从send返回Future.您可以调用Future#get进行阻止,直到发送完成为止. ..
发布时间:2021-02-14 19:58:53 其他开发

如何查看kafka标头

我们正在使用标头向Kafka发送带有标头的消息 org.apache.kafka.clients.producer.ProducerRecord public ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers) { this(topic, partition ..
发布时间:2021-02-14 19:58:46 其他开发

如果我在Kafka中有Transactional Producer,我可以使用Kafka Streams准确地阅读一次消息吗?

我希望具有完全一次的语义,但是我不想与Consumer一起阅读消息.我宁愿使用Kafka Streams AP阅读消息.如果我在Stream配置中添加 processing.guarantee = exactly_once ,会保留一次精确的语义吗? 解决方案 一次处理是基于读写过程的模式. Kafka Streams使用此模式,因此,如果您编写一个常规的Kafka Streams应用程序 ..

无法使用kafka-avro-console-consumer读取Avro消息. SerializationException:未知魔术字节

我正在编写一个REST代理,如融合的rest代理.它需要一个JSON负载,模式主题和ID,然后将JSON负载作为Avro对象写入流中.当我使用kafka-avro-console-consumer读取消息时,出现“未知魔术字节"错误. 这是我的kafka生产者配置: properties.put("client.id", LocalHostUtils.getLocalH ..