kafka-consumer-api相关内容

主题,分区和键

我正在寻找有关此主题的说明. 在Kafka文档中,我发现了以下内容: Kafka仅按分区中的消息(而不是主题中不同分区之间的消息)提供总顺序.对于大多数应用程序,按分区排序以及按键对数据进行分区的能力就足够了.但是,如果您需要对消息进行总订购,则可以使用仅具有一个分区的主题来实现,尽管这将意味着每个使用者组只有一个使用者进程. 这是我的问题: 这是否意味着如果我要拥有一个以上的消 ..
发布时间:2020-04-25 08:28:43 其他开发

Kafka Consumer在使用Java的Play框架中

我已经用Java语言在play框架中搜索了数千个站点,以查找kafka消费者示例.但是找不到任何例子.谁能提供有关如何编写一项服务的详细信息,该服务会连续使用kafka产生的主题. 谢谢 解决方案 播放是一个网络框架.基本的Actor系统依赖Akka. Akka Kafka API 称为Alpakka ,因此我怀疑您在搜索错误的关键字 要结合Akka和带Kafka的Pla ..
发布时间:2020-04-25 08:28:34 Java开发

如何从Kafka的两个不同集群中消费?

我有两个kafka群集,分别是A和B,B是A的副本.我只想在A断开时使用群集B的消息,反之亦然.但是,从两个群集中消费消息将导致消息重复.因此,有什么方法可以配置我的kafka使用者以仅从一个群集接收消息. 谢谢- 解决方案 因此,有什么方法可以配置我的kafka使用者以仅从一个群集接收消息. 是:Kafka使用者实例将始终仅从一个Kafka集群接收消息.也就是说,没有内置 ..
发布时间:2020-04-25 08:28:24 其他开发

卡夫卡消费者抛出java.lang.OutOfMemoryError:直接缓冲内存

我正在使用单节点Kafka代理(0.10.2)和单节点Zookeeper代理(3.4.9).我有一台使用者服务器(单核和1.5 GB RAM ).每当我运行具有5个或更多线程的进程时,抛出这些异常后,我的使用者的线程就会被杀死 例外1 java.lang.OutOfMemoryError:Java堆空间 在java.nio.HeapByteBuffer.(HeapByteBuffer. ..

发布和使用不同类型的消息的最佳方法是什么?

Kafka 0.8V 我想发布/consume byte []对象,java bean对象,可序列化的对象等等. 为这种类型的情况定义发布者和使用者的最佳方法是什么? 当我使用来自使用者迭代器的消息时,我不知道它是什么类型的消息. 有人可以给我指出如何设计这种情况的指南吗? 解决方案 我为每个Kafka主题强制执行一个单一的架构或对象类型.这样,当您收到消息时,便会确切地知道自 ..
发布时间:2020-04-25 08:28:18 其他开发

按键加入多个Kafka主题

如何编写能够以可扩展方式加入多个Kafka主题的使用者? 我有一个主题,该主题发布了带有键的事件,第二个主题发布了与第一个具有相同键的子集相关的其他事件.我想写一个订阅者,订阅者同时订阅两个主题,并对出现在两个主题中的子集执行一些其他操作. 我可以用一个使用者轻松地做到这一点:从两个主题中读取所有内容,在本地维护状态,并在为给定键读取了两个事件时执行操作.但是我需要扩展解决方案. ..
发布时间:2020-04-25 08:28:08 其他开发

卡夫卡消费者0.9是否向后兼容?

即将到来的kafka消费者0.9.x是否将与0.8经纪人兼容? 换句话说-可以只切换到新的消费者实施方式,而无需动手其他任何事情? 解决方案 根据 Kafka 0.9.0的文档,您不能使用新使用者来从0.8.x代理读取数据. 原因如下: 0.9.0.0与以前的版本相比,经纪人之间的协议有所更改. ..
发布时间:2020-04-25 08:28:02 其他开发

如何检查kafka中的键分配给哪个分区?

我正在尝试调试一个问题,试图证明如果群集没有重新平衡,则每个唯一的密钥只会进入1个分区. 所以我想知道给定的主题,有没有办法确定将密钥发送到哪个分区? 解决方案 也在源代码中 您需要byte[] keyBytes假定它不为null,然后使用org.apache.kafka.common.utils.Utils,可以运行以下命令. Utils.toPositive(Util ..
发布时间:2020-04-25 08:27:52 其他开发

产生主题时,Kafka Streams不会将偏移量增加1

我已经实现了一个简单的Kafka Dead信记录处理器. 当使用由控制台生产者产生的记录时,它完美地工作. 但是我发现我们的Kafka Streams应用程序不能保证向接收器主题生成记录,即对于每条产生的记录,偏移量将增加1. 死信处理器背景: 我有一个方案,在发布处理记录所需的所有数据之前,可能会先接收记录. 当记录与流应用程序不匹配以进行处理时,它们将移至“死信"主题, ..

Spring Kafka使用者,在运行时寻求偏移量?

我使用的是Kafka主题中的KafkaMessageListenerContainer,我有一个应用程序逻辑来处理每条记录,这些记录也依赖于其他微服务.我现在在处理每条记录后手动提交偏移量. 但是,如果我的应用程序逻辑失败,则需要寻找失败的偏移量并继续对其进行处理,直到成功为止.为此,我需要对最后一个偏移量进行运行时手动搜索. 使用KafkaMessageListenerContain ..
发布时间:2020-04-25 08:27:46 其他开发