kafka-producer-api相关内容
这是我学习 kafka 的早期阶段.我正在检查本地机器中的每个 kafka 属性/概念. 所以我遇到了这个属性min.insync.replicas,这是我的理解.如果我误解了什么,请纠正我. 将消息发送到主题后,必须将该消息写入至少 min.insync.replicas 数量的关注者. min.insync.replicas 还包括领导者. 如果可用的 live brokers
..
各位,局域网有一台虚拟服务器,ip是192.168.18.230,我的机器ip是192.168.0.175.今天,我尝试使用我的机器(192.168.0.175)向我的虚拟服务器(192.168.18.230)发送一些消息,与 Kafka 控制台生产者 $ bin/kafka-console-producer.sh --broker-list 192.168.18.230:9092 --topi
..
正如我所见,Kafka 模板在内部使用了 Kafka 生产者.我只想知道确切的区别是什么.此外,与 Kafka 生产者相比,我发现 Kafka 模板中有许多可用的 send() 方法. 请帮我解决.如果有人知道更多. 解决方案 生产者是模式,而 KafkaTemplate 包装了一个生产者实例,并提供了向 Kafka 主题发送消息的便捷方法.(来源) Kafka Producer
..
我正在尝试在 Kafka Processor 中实现一个事务,以确保我不会两次重新处理相同的消息.给定一条消息 (A),我需要创建一个将在事务中的另一个主题上生成的消息列表,并且我想在同一事务中提交原始消息 (A).从文档中我发现了 Producer 方法 sendOffsetsToTransaction 似乎只有在成功时才能在事务中提交偏移量.这是我的 Processor 的 process()
..
我刚刚开始使用 Kafka,这对微服务来说听起来非常好,但我基本上在 Scala 中工作. 我将 kafka 添加到我的 sbt 项目中: libraryDependencies += "org.apache.kafka" %% "kafka" % "2.0.0" 然后我这样做: import org.apache.kafka.clients.producer.{Callback,Ka
..
是否可以将 CometD 客户端与 Kafka 生产者连接起来?有什么建议吗? 目前我在 python 中有一个 CometD 客户端,它从 Salesforce 对象中实时提取数据. 现在我想将该数据推送到 Kafka 生产者.有可能这样做吗?以及如何? 解决方案 已解决. 通过使用https://github.com/dkmadigan/python-bayeux-c
..
我正在尝试使用 KafkaAvroSerialzer 设置一个 kafka 生产者以获取价值.每当 rit 尝试创建 Producer 时,我都会遇到此错误.我正在使用 confluent 5.2.1 中提供的所有 jar java.lang.NoClassDefFoundError: 无法初始化类 io.confluent.kafka.schemaregistry.client.rest.Re
..
我在 Rest 控制器中使用 ReplyingKafkaTemplate 来返回同步响应.我也在设置标题 REPLY_TOPIC.对于监听器微服务部分, @KafkaListener(topics = "${kafka.topic.request-topic}")@发给公共模型监听(模型 请求)抛出 InterruptedException {SumModel 模型 = request.getR
..
各位,局域网有一台虚拟服务器,ip是192.168.18.230,我的机器ip是192.168.0.175.今天,我尝试使用我的机器(192.168.0.175)向我的虚拟服务器(192.168.18.230)发送一些消息,与 Kafka 控制台生产者 $ bin/kafka-console-producer.sh --broker-list 192.168.18.230:9092 --topi
..
这是我的配置: props.put(ProducerConfig.ACKS_CONFIG, "all");props.put(ProducerConfig.RETRIES_CONFIG, “1");props.put(ProducerConfig.LINGER_MS_CONFIG, “1"); 和 尝试{生产者.发送(记录);} catch (Throwable ex) {log.error
..
我使用 apache camel kafka 作为客户端来生成消息,我观察到 kafka 生成器需要 1 毫秒来推送消息,如果我使用骆驼聚合将消息合并到批处理中,那么推送一条消息需要 100 毫秒. 安装简述3 卡夫卡集群 16 核 32GB 内存 示例代码 String endpoint="kafka:test?topic=test&brokers=nodekfa:9092,no
..
我有一个 Kafka 集群在本地机器上运行,默认设置在我的 minikube 设置之外.我在我的一个 Web 服务中创建了一个生产者并将其部署在 minikube 上. 为了让生产者连接到 Kafka,我使用了 10.0.2.2 IP,我也用它来连接 minikube 之外的 Cassandra 和 DGraph,因为它们工作正常. 然而,Kafka 生产者没有工作,甚至在发送数据时也
..
我有一个 json 有效负载,我想将其作为生产者 Api 中的标头发送 {"type": "record_created",“版本":1,"orgId": "",“用户身份": "",“用户名": "","correlationId": "",“工作ID":“"} 以上有效载荷应作为标头发送 producer.send(new ProducerRecord(topic, messageNo,
..
比如说,我发布和使用不同类型的 java 对象.对于每个对象,我必须定义自己的序列化器实现.我们如何在“serializer.class"属性下提供 kafka 消费者/生产者属性文件中的所有实现? 解决方案 我们对不同主题中的不同对象有类似的设置,但在一个主题中始终使用相同的对象类型.我们使用 ByteArrayDeserializer 随 Java API 0.9.0.1 一起提供,这
..
我正在研究 Kafka 0.9.我想知道是否有任何方法可以通过知道分区和偏移量来从其主题中检索已处理的消息.例如,消费者当前正在使用分区 1 和偏移量 10 处的消息.而我想在同一分区和偏移量 5 处获取消息. 我能想到的一种方法是将偏移量重置为 5 并消耗一条消息.但是 poll() 方法只能返回一批消息.所以我必须接受第一条消息而忽略其他消息.处理完消息后,偏移量被重置回来. 我认
..
我是 kafka 的新手,我使用 Kafka Producer Java api.面对卡夫卡的这个问题,Kafka:尝试从状态 COMMITTING_TRANSACTION 到状态 ABORTING_TRANSACTION 的转换无效. 人们已经写到 producer.abortTransaction() 应该只在没有交易正在进行时才被调用......知道如何检查是否有交易进行中吗?以及如何
..
我在此链接中遵循第 7 步(使用 Kafka Connect 导入/导出数据): http://kafka.apache.org/documentation.html#quickstart 它运行良好,直到我删除了“test.txt"文件.主要是因为这就是 log4j 文件的工作方式.一段时间后,文件将被旋转 - 我的意思是 - 它将被重命名 &将开始写入具有相同名称的新文件.
..
我正在研究 Kafka 0.9.我想知道是否有任何方法可以通过知道分区和偏移量来从其主题中检索已处理的消息.例如,消费者当前正在使用分区 1 和偏移量 10 处的消息.而我想在同一分区和偏移量 5 处获取消息. 我能想到的一种方法是将偏移量重置为 5 并消耗一条消息.但是 poll() 方法只能返回一批消息.所以我必须接受第一条消息而忽略其他消息.处理完消息后,偏移量被重置回来. 我认
..
我知道当一个主题有多个分区时,Kafka 将无法保证数据的排序.但我的问题是:- 我需要对一个事件主题(生成事件的用户活动)进行多个分区,因为我希望多个消费者组使用该主题中的数据.但有时我需要引导整个数据,即从头到尾读取完整的数据,并从 Kafka 中的历史消息重建我的事件图,然后我失去了造成问题的排序.一种方法可能是在 Map-Reduce 范式中处理它,我根据时间映射数据并对其进行排序和使用.
..
我有一个 json 有效负载,我想将其作为生产者 Api 中的标头发送 {"type": "record_created",“版本":1,"orgId": "",“用户身份": "",“用户名": "","correlationId": "",“工作ID":“"} 以上有效载荷应作为标头发送 producer.send(new ProducerRecord(topic, messageNo,
..