kafka-producer-api相关内容

将CometD客户端与Kafka生产者联系起来

是否可以将cometD客户端与Kafka生产者连接?有什么建议吗? 当前,我在python中有一个CometD客户端,该客户端正在从Salesforce对象中实时提取数据. 现在,我想将该数据推送到Kafka生产者中.有可能这样做吗?以及如何? 解决方案 已解决. 通过使用 https://github.com/dkmadigan/python-bayeux-client ..

Kafka的session.timeout.ms和max.poll.interval.ms之间的差异

AFAIK,max.poll.interval.ms在Kafka 0.10.1中引入.但是,尚不清楚何时可以同时使用session.timeout.ms和max.poll.interval.ms 考虑用例,在该用例中,心跳线程没有响应,但是我的处理线程设置了更高的值,因此它仍在处理记录.但是由于心跳线关闭,然后在跨过session.timeout.ms之后,实际发生了什么.因为我在POC中观察到, ..

kafka是否有任何默认的Web UI

我在卡夫卡有几个问题. 1)Kafka是否有默认的Web UI? 2)我们如何正常关闭独立的kafka服务器,kafka控制台- 消费者/控制台生产者. 任何解决方案都将受到高度赞赏. 谢谢. 解决方案 1)没有Kafka没有默认的用户界面. 但是,有许多第三方工具可以以图形方式显示Kafka资源.只需使用Google for kafka ui,然后选择显示您 ..
发布时间:2020-04-25 08:32:24 其他开发

保留期后的卡夫卡偏移

我有一个带有1个分区的kafka主题.如果其中包含100条消息,则偏移量将为0.99. 根据kafka保留政策,所有邮件将在指定期限后被清除. ,一旦全部清除(保留期之后),我将向该主题发送100条新消息.现在,消息的新偏移量将从何处开始?是从100还是从0开始? 我试图了解新的偏移量是100-199还是0-99? 解决方案 Kafka通过删除满足谓词的日志段来尊重日志保 ..
发布时间:2020-04-25 08:32:20 其他开发

Kafka Stream在重新平衡时重新处理旧消息

我有一个Kafka Streams应用程序,该应用程序从几个主题中读取数据,将数据加入并写入另一个主题. 这是我的Kafka群集的配置: 5 Kafka brokers Kafka topics - 15 partitions and replication factor 3. 我的Kafka Streams应用程序与我的Kafka代理在同一台计算机上运行. 每小时消耗/生 ..

我可以根据卡夫卡的特定条件消费吗?

我正在向Kafka写一个味精,并在另一端使用它. 在其中做一些处理,并将其写回到另一个Kafka主题. 我想知道哪个消息响应针对哪个请求. 当前决定从用户端捕获偏移量id,然后写入响应并读取响应有效负载并决定. 对于这种方法,我们需要阅读每条消息,是否还有其他方法可以根据使用者的配置条件进行消费? 解决方案 消费者只能阅读整个主题.您只能通过seek()跳过消息,但是没有 ..
发布时间:2020-04-25 08:31:49 其他开发

我可以在与Kafka Broker相同的机器上运行Kafka Streams Application吗?

我有一个Kafka Streams应用程序,该应用程序从几个主题中获取数据并将这些数据加入另一个主题中. Kafka配置: 5 kafka brokers Kafka Topics - 15 partitions and 3 replication factor. 注意:我在运行Kafka代理的同一台计算机上运行Kafka Streams应用程序. 每小时消耗/产生几百万条 ..

是否可以使用Kafka传输文件?

我每天要生成数千个文件,我想使用Kafka进行流式传输. 当我尝试读取文件时,每一行都被视为单独的消息. 我想知道如何在Kafka主题中使每个文件的内容作为一条消息,并与消费者一起如何在单独的文件中编写来自Kafka主题的每条消息. 解决方案 您可以编写自己的序列化器/反序列化器来处理文件. 例如: 生产者道具: props.put(ProducerConfig.KEY_ ..
发布时间:2020-04-25 08:30:49 其他开发

如何为kafka主题选择分区数?

我们有3个zk节点集群和7个代理.现在,我们必须创建一个主题,并且必须为此主题创建分区. 但是我没有找到任何公式来确定应该为该主题创建多少个分区. 生产者的速率为5k消息/秒,每条消息的大小为130字节. 预先感谢 解决方案 这个由Kafka联合创始人创建的旧基准非常易于理解规模的大小-由此得出的直接结论,就像上面的Vanlightly所说的那样,消费者的处理时间是决定分区数量 ..
发布时间:2020-04-25 08:30:31 其他开发

使用__consumer_offsets杀死节点不会导致使用者消耗任何消息

我有3个具有复制因子2的节点(nodes0,node1,node2)Kafka集群(broker0,broker1,broker2)和Zookeeper(使用Kafka tar打包的zookeeper)在另一个节点(节点4)上运行. 在启动Zookeper之后,我又启动了Broker 0,然后启动了其余节点.在代理0日志中可以看到它正在读取__consumer_offsets,似乎它们存储在 ..

如何在Apache Kafka中创建主题?

在kafka中创建主题的最佳方法是什么? 我们创建主题时要定义多少个副本/分区? 在新的生产者API中,当我尝试将消息发布到不存在的主题时,它第一次失败,然后成功发布. 我想知道副本,分区和群集节点数量之间的关系. 我们需要在发布消息之前创建主题吗? 解决方案 启动Kafka代理时,可以在conf/server.properties文件中定义属性集.该文件只是键值属性文 ..
发布时间:2020-04-25 08:28:49 其他开发

主题,分区和键

我正在寻找有关此主题的说明. 在Kafka文档中,我发现了以下内容: Kafka仅按分区中的消息(而不是主题中不同分区之间的消息)提供总顺序.对于大多数应用程序,按分区排序以及按键对数据进行分区的能力就足够了.但是,如果您需要对消息进行总订购,则可以使用仅具有一个分区的主题来实现,尽管这将意味着每个使用者组只有一个使用者进程. 这是我的问题: 这是否意味着如果我要拥有一个以上的消 ..
发布时间:2020-04-25 08:28:43 其他开发

发布和使用不同类型的消息的最佳方法是什么?

Kafka 0.8V 我想发布/consume byte []对象,java bean对象,可序列化的对象等等. 为这种类型的情况定义发布者和使用者的最佳方法是什么? 当我使用来自使用者迭代器的消息时,我不知道它是什么类型的消息. 有人可以给我指出如何设计这种情况的指南吗? 解决方案 我为每个Kafka主题强制执行一个单一的架构或对象类型.这样,当您收到消息时,便会确切地知道自 ..
发布时间:2020-04-25 08:28:18 其他开发

产生主题时,Kafka Streams不会将偏移量增加1

我已经实现了一个简单的Kafka Dead信记录处理器. 当使用由控制台生产者产生的记录时,它完美地工作. 但是我发现我们的Kafka Streams应用程序不能保证向接收器主题生成记录,即对于每条产生的记录,偏移量将增加1. 死信处理器背景: 我有一个方案,在发布处理记录所需的所有数据之前,可能会先接收记录. 当记录与流应用程序不匹配以进行处理时,它们将移至“死信"主题, ..