kafka-producer-api相关内容

获取发送给kafka主题的最后一条消息

我是Kafka的新手,正在研究将专有流服务连接到Kafka的原型. 我希望获取有关该主题的最后一条消息的密钥,因为我们的内部流消费者需要使用连接时收到的最后一条消息的ID进行登录. 是否可以使用KafkaProducer或KafkaConsumer来做到这一点? 我尝试使用使用者执行以下操作,但是当同时运行控制台使用者时,我会看到消息重播. // Poll so w ..
发布时间:2020-04-25 08:27:30 其他开发

Kafka:如何连接kafka-console-consumer以获取远程代理主题内容?

我在ec2上的一台机器上安装了kafka zookeeper和3个代理,端口为9092..9094,并尝试从另一台机器上使用主题内容.端口2181(zk),9092、9093和9094(服务器)对用户计算机开放.我什至可以做一个bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remotetopic给我 主题: ..

Kafka是否支持请求响应消息传递

我正在研究Kafka 9是一个业余项目,并完成了一些"Hello World"类型的示例. 我必须考虑基于请求响应消息传递的现实世界Kafka应用程序,更具体地说,是如何将Kafka请求消息链接到其响应消息. 我一直在考虑将生成的UUID用作请求消息密钥,并将此请求UUID用作关联的响应消息密钥.与WebSphere MQ具有消息相关性ID的机制几乎相同. 我的第2阶段结束过程将 ..

Kafka多分区排序

我知道无法在Kafka中订购多个分区,并且只能保证组中单个使用者(单个分区)的分区顺序.但是,使用Kafka Streams 0.10,现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每个消息都保持顺序,那么在使用方,可以说使用Kafka Streams 0.10现在可行吗?假设我们收到了所有消息,我们是否无法根据消耗的时间戳对所有分区进行排序,而可能将它们转发到一个单独的主题进行 ..

Kafka最佳保留和删除策略

我对kafka还是很陌生,所以如果这个问题很琐碎,请原谅我.对于计时测试,我有一个非常简单的设置,如下所示: 机器A->写入主题1(代理)->机器B从主题1读取 机器B->写刚读到主题2的消息(代理)->机器A从主题2读 现在,我正在无限循环中发送大约1400字节的消息,很快就占满了我的小经纪人的空间.我正在尝试为log.retention.ms,log.retention.bytes ..

了解Kafka主题和分区

出于企业解决方案的目的,我开始学习Kafka. 在阅读期间,我想到了一些问题: 生产者在生成消息时-会指定要向其发送消息的 topic ,对吗?它关心分区吗? 订户正在运行时-是否指定其组ID,以便它可以成为同一主题或该组消费者感兴趣的多个主题的消费者集群的一部分? 每个消费者组在经纪人上都有对应的分区吗?还是每个消费者都有一个分区? 是由代理创建的分区,因此对于消费者而 ..
发布时间:2020-04-25 08:25:35 其他开发

提取具有关联ID 1的元数据时发生警告错误:{MY_TOPIC?= INVALID_TOPIC_EXCEPTION}(org.apache.kafka.clients.NetworkClient)

当我使用kafka 0.9.0.1运行以下命令时,收到此警告[1].您能告诉我我的主题有什么问题吗? (我正在与在ec2中运行的kafka经纪人交谈) #./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC? [1] [2016-04-06 ..

带有transactionIdPrefix的DefaultKafkaProducerFactory会在引导服务器关闭时等待

Hy, 我正在使用spring-kafka 1.3.0.RELEASE创建一个事务生成器。 当引导服务器关闭时,DefaultKafkaProducerFactory会无休止地等待,直到引导服务器启动。 我做错了什么? 我可以设置超时和/或其他类似属性吗? 这是我重现场景的代码示例: public static void main(String [] args){ ..
发布时间:2019-01-08 19:06:45 Java开发

Apache Kafka和Avro:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer

每当我尝试从kafka队列中读取消息时,我都会遇到以下异常: [error](运行) -main-0)java.lang.ClassCastException:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer java.lang.ClassCastException:o ..

Kafka 0.10 Java客户端TimeoutException:包含1条记录的批处理已过期

我有一个单节点,多(3)个代理Zookeeper / Kafka设置。我正在使用Kafka 0.10 Java客户端。 我写了以下简单的远程(在与Kafka不同的服务器上)生产者(在代码中我用MYIP替换了我的公共IP地址) ): 属性config = new Properties(); try { config.put(ProducerConfig.CLIENT_ID_CONF ..
发布时间:2018-12-28 14:36:13 Java开发

如何检查Kafka Server是否正在运行?

我想在开始生产和消费工作之前确保kafka服务器是否正在运行。它是在windows环境中,这是我的kafka服务器在eclipse中的代码... 属性kafka = new Properties(); kafka.setProperty(“broker.id”,“1”); kafka.setProperty(“port”,“9092”); kafka.setProperty(“l ..
发布时间:2018-12-05 10:27:52 Java开发

基于时间的桶记录(kafka-hdfs-connector)

我试图使用Confluent平台提供的kafka-hdfs-connector将来自Kafka的数据复制到Hive表中。虽然我能够成功地做到这一点,但我想知道如何根据时间间隔来分段传入的数据。例如,我希望每5分钟创建一个新的分区。 我试过了 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner with partition.du ..