kafka-producer-api相关内容
是否可以将cometD客户端与Kafka生产者连接?有什么建议吗? 当前,我在python中有一个CometD客户端,该客户端正在从Salesforce对象中实时提取数据. 现在,我想将该数据推送到Kafka生产者中.有可能这样做吗?以及如何? 解决方案 已解决. 通过使用 https://github.com/dkmadigan/python-bayeux-client
..
AFAIK,max.poll.interval.ms在Kafka 0.10.1中引入.但是,尚不清楚何时可以同时使用session.timeout.ms和max.poll.interval.ms 考虑用例,在该用例中,心跳线程没有响应,但是我的处理线程设置了更高的值,因此它仍在处理记录.但是由于心跳线关闭,然后在跨过session.timeout.ms之后,实际发生了什么.因为我在POC中观察到,
..
我在Rest控制器中使用ReplyingKafkaTemplate返回同步响应.我还要设置标题REPLY_TOPIC.对于侦听器微服务部分, @KafkaListener(topics = "${kafka.topic.request-topic}") @SendTo public Model listen(Model request) throw
..
我正在使用Kafka v0.9.0.1(Scala v2.11)和com.101tec:zkclient v0.7.我正在尝试使用AdminUtils创建一个kafka主题.我的代码如下. String zkServers = "node1:2181,node2:2181,node3:2181,node4:2181"; Integer sessionTimeout = (int)TimeUn
..
因此,我正在尝试使用Kafka流进行交互式查询.我有在本地(在Windows上)运行的Zookeeper和Kafka.我将C:\ temp用作Zookeeper和Kafka的存储文件夹. 我已经设置了这样的主题 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 --partition
..
我在卡夫卡有几个问题. 1)Kafka是否有默认的Web UI? 2)我们如何正常关闭独立的kafka服务器,kafka控制台- 消费者/控制台生产者. 任何解决方案都将受到高度赞赏. 谢谢. 解决方案 1)没有Kafka没有默认的用户界面. 但是,有许多第三方工具可以以图形方式显示Kafka资源.只需使用Google for kafka ui,然后选择显示您
..
我有一个带有1个分区的kafka主题.如果其中包含100条消息,则偏移量将为0.99. 根据kafka保留政策,所有邮件将在指定期限后被清除. ,一旦全部清除(保留期之后),我将向该主题发送100条新消息.现在,消息的新偏移量将从何处开始?是从100还是从0开始? 我试图了解新的偏移量是100-199还是0-99? 解决方案 Kafka通过删除满足谓词的日志段来尊重日志保
..
我有一个Kafka Streams应用程序,该应用程序从几个主题中读取数据,将数据加入并写入另一个主题. 这是我的Kafka群集的配置: 5 Kafka brokers Kafka topics - 15 partitions and replication factor 3. 我的Kafka Streams应用程序与我的Kafka代理在同一台计算机上运行. 每小时消耗/生
..
我正在向Kafka写一个味精,并在另一端使用它. 在其中做一些处理,并将其写回到另一个Kafka主题. 我想知道哪个消息响应针对哪个请求. 当前决定从用户端捕获偏移量id,然后写入响应并读取响应有效负载并决定. 对于这种方法,我们需要阅读每条消息,是否还有其他方法可以根据使用者的配置条件进行消费? 解决方案 消费者只能阅读整个主题.您只能通过seek()跳过消息,但是没有
..
我有一个Kafka Streams应用程序,该应用程序从几个主题中获取数据并将这些数据加入另一个主题中. Kafka配置: 5 kafka brokers Kafka Topics - 15 partitions and 3 replication factor. 注意:我在运行Kafka代理的同一台计算机上运行Kafka Streams应用程序. 每小时消耗/产生几百万条
..
我正在使用Kafka 0.10.0.在处理之前,我想知道分区中记录的大小. 在0.9.0.1版本中,我曾经使用以下代码来查找分区的latest和earliest偏移量之间的差异.在新版本中,检索consumer#position方法时会卡住. package org.apache.kafka.example.utils; import java.util.ArrayList; imp
..
我每天要生成数千个文件,我想使用Kafka进行流式传输. 当我尝试读取文件时,每一行都被视为单独的消息. 我想知道如何在Kafka主题中使每个文件的内容作为一条消息,并与消费者一起如何在单独的文件中编写来自Kafka主题的每条消息. 解决方案 您可以编写自己的序列化器/反序列化器来处理文件. 例如: 生产者道具: props.put(ProducerConfig.KEY_
..
我们有3个zk节点集群和7个代理.现在,我们必须创建一个主题,并且必须为此主题创建分区. 但是我没有找到任何公式来确定应该为该主题创建多少个分区. 生产者的速率为5k消息/秒,每条消息的大小为130字节. 预先感谢 解决方案 这个由Kafka联合创始人创建的旧基准非常易于理解规模的大小-由此得出的直接结论,就像上面的Vanlightly所说的那样,消费者的处理时间是决定分区数量
..
我有3个具有复制因子2的节点(nodes0,node1,node2)Kafka集群(broker0,broker1,broker2)和Zookeeper(使用Kafka tar打包的zookeeper)在另一个节点(节点4)上运行. 在启动Zookeper之后,我又启动了Broker 0,然后启动了其余节点.在代理0日志中可以看到它正在读取__consumer_offsets,似乎它们存储在
..
在kafka中创建主题的最佳方法是什么? 我们创建主题时要定义多少个副本/分区? 在新的生产者API中,当我尝试将消息发布到不存在的主题时,它第一次失败,然后成功发布. 我想知道副本,分区和群集节点数量之间的关系. 我们需要在发布消息之前创建主题吗? 解决方案 启动Kafka代理时,可以在conf/server.properties文件中定义属性集.该文件只是键值属性文
..
我正在寻找有关此主题的说明. 在Kafka文档中,我发现了以下内容: Kafka仅按分区中的消息(而不是主题中不同分区之间的消息)提供总顺序.对于大多数应用程序,按分区排序以及按键对数据进行分区的能力就足够了.但是,如果您需要对消息进行总订购,则可以使用仅具有一个分区的主题来实现,尽管这将意味着每个使用者组只有一个使用者进程. 这是我的问题: 这是否意味着如果我要拥有一个以上的消
..
我有一个像下面这样的简单Java生产者 public class Producer { private final static String TOPIC = "my-example-topi8"; private final static String BOOTSTRAP_SERVERS = "localhost:8092"; public static voi
..
Kafka 0.8V 我想发布/consume byte []对象,java bean对象,可序列化的对象等等. 为这种类型的情况定义发布者和使用者的最佳方法是什么? 当我使用来自使用者迭代器的消息时,我不知道它是什么类型的消息. 有人可以给我指出如何设计这种情况的指南吗? 解决方案 我为每个Kafka主题强制执行一个单一的架构或对象类型.这样,当您收到消息时,便会确切地知道自
..
我已经开始学习Kafka.尝试对其进行基本操作.我坚持关于“经纪人"的观点. 我的kafka正在运行,但是当我要创建分区时. from kafka import TopicPartition (ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234') consumer.assign([Topi
..
我已经实现了一个简单的Kafka Dead信记录处理器. 当使用由控制台生产者产生的记录时,它完美地工作. 但是我发现我们的Kafka Streams应用程序不能保证向接收器主题生成记录,即对于每条产生的记录,偏移量将增加1. 死信处理器背景: 我有一个方案,在发布处理记录所需的所有数据之前,可能会先接收记录. 当记录与流应用程序不匹配以进行处理时,它们将移至“死信"主题,
..