kafka-producer-api相关内容

Kafka 配置 min.insync.replicas 不起作用

这是我学习 kafka 的早期阶段.我正在检查本地机器中的每个 kafka 属性/概念. 所以我遇到了这个属性min.insync.replicas,这是我的理解.如果我误解了什么,请纠正我. 将消息发送到主题后,必须将该消息写入至少 min.insync.replicas 数量的关注者. min.insync.replicas 还包括领导者. 如果可用的 live brokers ..
发布时间:2021-11-12 03:05:59 其他开发

卡夫卡模板和卡夫卡生产者有什么区别?

正如我所见,Kafka 模板在内部使用了 Kafka 生产者.我只想知道确切的区别是什么.此外,与 Kafka 生产者相比,我发现 Kafka 模板中有许多可用的 send() 方法. 请帮我解决.如果有人知道更多. 解决方案 生产者是模式,而 KafkaTemplate 包装了一个生产者实例,并提供了向 Kafka 主题发送消息的便捷方法.(来源) Kafka Producer ..

KafkaProducer sendOffsetsToTransaction 需要 offset+1 才能成功提交当前偏移量

我正在尝试在 Kafka Processor 中实现一个事务,以确保我不会两次重新处理相同的消息.给定一条消息 (A),我需要创建一个将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A).从文档中我发现了 Producer 方法 sendOffsetsToTransaction 似乎只有在成功时才能在事务中提交偏移量.这是我的 Processor 的 process() ..

将 CometD 客户端与 Kafka 生产者连接起来

是否可以将 CometD 客户端与 Kafka 生产者连接起来?有什么建议吗? 目前我在 python 中有一个 CometD 客户端,它从 Salesforce 对象中实时提取数据. 现在我想将该数据推送到 Kafka 生产者.有可能这样做吗?以及如何? 解决方案 已解决. 通过使用https://github.com/dkmadigan/python-bayeux-c ..

为什么骆驼 kafka 生产者很慢?

我使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生成器需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送一条消息需要 100 毫秒. 安装简述3 卡夫卡集群 16 核 32GB 内存 示例代码 String endpoint="kafka:test?topic=test&brokers=nodekfa:9092,no ..
发布时间:2021-11-12 02:57:45 其他开发

部署在 Kubernetes 上的 Kafka Producer 无法生产到本地机器上运行的 Kafka 集群

我有一个 Kafka 集群在本地机器上运行,默认设置在我的 minikube 设置之外.我在我的一个 Web 服务中创建了一个生产者并将其部署在 minikube 上. 为了让生产者连接到 Kafka,我使用了 10.0.2.2 IP,我也用它来连接 minikube 之外的 Cassandra 和 DGraph,因为它们工作正常. 然而,Kafka 生产者没有工作,甚至在发送数据时也 ..
发布时间:2021-11-12 02:57:39 其他开发

kafka生产者API中的头信息

我有一个 json 有效负载,我想将其作为生产者 Api 中的标头发送 {"type": "record_created",“版本":1,"orgId": "",“用户身份": "",“用户名": "","correlationId": "",“工作ID":“"} 以上有效载荷应作为标头发送 producer.send(new ProducerRecord(topic, messageNo, ..
发布时间:2021-11-12 02:57:16 Java开发

如何在kafka中定义多个序列化器?

比如说,我发布和使用不同类型的 java 对象.对于每个对象,我必须定义自己的序列化器实现.我们如何在“serializer.class"属性下提供 kafka 消费者/生产者属性文件中的所有实现? 解决方案 我们对不同主题中的不同对象有类似的设置,但在一个主题中始终使用相同的对象类型.我们使用 ByteArrayDeserializer 随 Java API 0.9.0.1 一起提供,这 ..
发布时间:2021-11-12 02:56:53 其他开发

通过知道消息在 Kafka 中的分区和偏移量来检索消息

我正在研究 Kafka 0.9.我想知道是否有任何方法可以通过知道分区和偏移量来从其主题中检索已处理的消息.例如,消费者当前正在使用分区 1 和偏移量 10 处的消息.而我想在同一分区和偏移量 5 处获取消息. 我能想到的一种方法是将偏移量重置为 5 并消耗一条消息.但是 poll() 方法只能返回一批消息.所以我必须接受第一条消息而忽略其他消息.处理完消息后,偏移量被重置回来. 我认 ..
发布时间:2021-11-12 02:56:44 其他开发

Kafka 连接教程停止工作

我在此链接中遵循第 7 步(使用 Kafka Connect 导入/导出数据): http://kafka.apache.org/documentation.html#quickstart 它运行良好,直到我删除了“test.txt"文件.主要是因为这就是 log4j 文件的工作方式.一段时间后,文件将被旋转 - 我的意思是 - 它将被重命名 &将开始写入具有相同名称的新文件. ..

通过知道消息在 Kafka 中的分区和偏移量来检索消息

我正在研究 Kafka 0.9.我想知道是否有任何方法可以通过知道分区和偏移量来从其主题中检索已处理的消息.例如,消费者当前正在使用分区 1 和偏移量 10 处的消息.而我想在同一分区和偏移量 5 处获取消息. 我能想到的一种方法是将偏移量重置为 5 并消耗一条消息.但是 poll() 方法只能返回一批消息.所以我必须接受第一条消息而忽略其他消息.处理完消息后,偏移量被重置回来. 我认 ..
发布时间:2021-11-12 02:55:14 其他开发

如何从多分区的 Kafka 主题中按顺序(按时间戳顺序)使用数据

我知道当一个主题有多个分区时,Kafka 将无法保证数据的排序.但我的问题是:- 我需要对一个事件主题(生成事件的用户活动)进行多个分区,因为我希望多个消费者组使用该主题中的数据.但有时我需要引导整个数据,即从头到尾读取完整的数据,并从 Kafka 中的历史消息重建我的事件图,然后我失去了造成问题的排序.一种方法可能是在 Map-Reduce 范式中处理它,我根据时间映射数据并对其进行排序和使用. ..
发布时间:2021-11-12 02:54:49 其他开发

kafka生产者API中的头信息

我有一个 json 有效负载,我想将其作为生产者 Api 中的标头发送 {"type": "record_created",“版本":1,"orgId": "",“用户身份": "",“用户名": "","correlationId": "",“工作ID":“"} 以上有效载荷应作为标头发送 producer.send(new ProducerRecord(topic, messageNo, ..
发布时间:2021-11-12 02:52:23 Java开发