kafka-producer-api相关内容
我是Kafka的新手,正在研究将专有流服务连接到Kafka的原型. 我希望获取有关该主题的最后一条消息的密钥,因为我们的内部流消费者需要使用连接时收到的最后一条消息的ID进行登录. 是否可以使用KafkaProducer或KafkaConsumer来做到这一点? 我尝试使用使用者执行以下操作,但是当同时运行控制台使用者时,我会看到消息重播. // Poll so w
..
我在ec2上的一台机器上安装了kafka zookeeper和3个代理,端口为9092..9094,并尝试从另一台机器上使用主题内容.端口2181(zk),9092、9093和9094(服务器)对用户计算机开放.我什至可以做一个bin/kafka-topics.sh --describe --zookeeper 172.X.X.X:2181 --topic remotetopic给我 主题:
..
我正在研究Kafka 9是一个业余项目,并完成了一些"Hello World"类型的示例. 我必须考虑基于请求响应消息传递的现实世界Kafka应用程序,更具体地说,是如何将Kafka请求消息链接到其响应消息. 我一直在考虑将生成的UUID用作请求消息密钥,并将此请求UUID用作关联的响应消息密钥.与WebSphere MQ具有消息相关性ID的机制几乎相同. 我的第2阶段结束过程将
..
我知道无法在Kafka中订购多个分区,并且只能保证组中单个使用者(单个分区)的分区顺序.但是,使用Kafka Streams 0.10,现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每个消息都保持顺序,那么在使用方,可以说使用Kafka Streams 0.10现在可行吗?假设我们收到了所有消息,我们是否无法根据消耗的时间戳对所有分区进行排序,而可能将它们转发到一个单独的主题进行
..
我对kafka还是很陌生,所以如果这个问题很琐碎,请原谅我.对于计时测试,我有一个非常简单的设置,如下所示: 机器A->写入主题1(代理)->机器B从主题1读取 机器B->写刚读到主题2的消息(代理)->机器A从主题2读 现在,我正在无限循环中发送大约1400字节的消息,很快就占满了我的小经纪人的空间.我正在尝试为log.retention.ms,log.retention.bytes
..
我在 Spring-boot: 上使用Kafka Kafka Producer类: @Service public class MyKafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; private static Logger LOGGER = Logg
..
我正在尝试加载一个简单的文本文件,而不是Kafka中的标准输入. 下载Kafka之后,我执行了以下步骤: 开始了动物园管理员: bin/zookeeper-server-start.sh config/zookeeper.properties 启动服务器 bin/kafka-server-start.sh config/server.properties 创建了一个名
..
出于企业解决方案的目的,我开始学习Kafka. 在阅读期间,我想到了一些问题: 生产者在生成消息时-会指定要向其发送消息的 topic ,对吗?它关心分区吗? 订户正在运行时-是否指定其组ID,以便它可以成为同一主题或该组消费者感兴趣的多个主题的消费者集群的一部分? 每个消费者组在经纪人上都有对应的分区吗?还是每个消费者都有一个分区? 是由代理创建的分区,因此对于消费者而
..
我正在发布到远程kafka服务器,并尝试使用来自该远程服务器的消息. (卡夫卡v 0.90.1) 发布工作正常,但也不费劲. 发布者 package org.test; import java.io.IOException; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProd
..
当我使用kafka 0.9.0.1运行以下命令时,收到此警告[1].您能告诉我我的主题有什么问题吗? (我正在与在ec2中运行的kafka经纪人交谈) #./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC? [1] [2016-04-06
..
我正在关注正式文档以实施 kakf-connect 从文件中读取数据。 我有完美的kafka运行。生产者和消费者发送和接收消息。 但是,当我运行以下命令时: sudo。 /bin/connect-standalone.sh config / connect-standalone.properties config / connect-file-source.properties
..
Hy, 我正在使用spring-kafka 1.3.0.RELEASE创建一个事务生成器。 当引导服务器关闭时,DefaultKafkaProducerFactory会无休止地等待,直到引导服务器启动。 我做错了什么? 我可以设置超时和/或其他类似属性吗? 这是我重现场景的代码示例: public static void main(String [] args){
..
Kafka允许通过Producer(KafkaProducer)类下面的方法发送异步消息: public java.util.concurrent。未来< RecordMetadata> send(ProducerRecord记录) public java.util.concurrent.Future发送(ProducerReco
..
我创造了一个简单的Kafka Producer&消费者。我正在使用kafka_2.11-0.9.0.0。这是我的制作人代码, public class KafkaProducerTest { public static String topicName =“test-topic-2” ; public static void main(String [] args){ // TOD
..
每当我尝试从kafka队列中读取消息时,我都会遇到以下异常: [error](运行) -main-0)java.lang.ClassCastException:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer java.lang.ClassCastException:o
..
我有一个单节点,多(3)个代理Zookeeper / Kafka设置。我正在使用Kafka 0.10 Java客户端。 我写了以下简单的远程(在与Kafka不同的服务器上)生产者(在代码中我用MYIP替换了我的公共IP地址) ): 属性config = new Properties(); try { config.put(ProducerConfig.CLIENT_ID_CONF
..
我正在尝试让一个简单的Kafka Consumer使用Java API v0.9.0.1。我正在使用的kafka服务器是一个docker容器,也运行版本0.9.0.1。下面是使用者代码: public class Consumer { public static void main(String [] args)抛出IOException { KafkaConsumer
..
我们编写了一个用于向kafka发布消息的java客户端。代码如下所示 属性props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,“202.xx.xx.xxx:9092”); props.setProperty(ProducerConfig.METADATA_FETCH_T
..
我想在开始生产和消费工作之前确保kafka服务器是否正在运行。它是在windows环境中,这是我的kafka服务器在eclipse中的代码... 属性kafka = new Properties(); kafka.setProperty(“broker.id”,“1”); kafka.setProperty(“port”,“9092”); kafka.setProperty(“l
..
我试图使用Confluent平台提供的kafka-hdfs-connector将来自Kafka的数据复制到Hive表中。虽然我能够成功地做到这一点,但我想知道如何根据时间间隔来分段传入的数据。例如,我希望每5分钟创建一个新的分区。 我试过了 io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner with partition.du
..