kafka-consumer-api相关内容
我正在寻找有关此主题的说明. 在Kafka文档中,我发现了以下内容: Kafka仅按分区中的消息(而不是主题中不同分区之间的消息)提供总顺序.对于大多数应用程序,按分区排序以及按键对数据进行分区的能力就足够了.但是,如果您需要对消息进行总订购,则可以使用仅具有一个分区的主题来实现,尽管这将意味着每个使用者组只有一个使用者进程. 这是我的问题: 这是否意味着如果我要拥有一个以上的消
..
我有一个像下面这样的简单Java生产者 public class Producer { private final static String TOPIC = "my-example-topi8"; private final static String BOOTSTRAP_SERVERS = "localhost:8092"; public static voi
..
我在kafka客户端上使用spark-sql-2.4.x版本. 即使在设置使用者配置参数之后 IE. 最大分区提取字节数max.poll.records 设置不正确,并显示以下默认值 Dataset df = sparkSession .readStream() .format("k
..
我已经用Java语言在play框架中搜索了数千个站点,以查找kafka消费者示例.但是找不到任何例子.谁能提供有关如何编写一项服务的详细信息,该服务会连续使用kafka产生的主题. 谢谢 解决方案 播放是一个网络框架.基本的Actor系统依赖Akka. Akka Kafka API 称为Alpakka ,因此我怀疑您在搜索错误的关键字 要结合Akka和带Kafka的Pla
..
我在我的一台机器中创建了一个kafka主题,该主题的IP为192.168.25.50,主题名称为test-poc.然后通过使用kafka-console-producer,我产生了如下消息 kafka-console-producer --broker-list localhost:9092 --topic test-poc >test message1 >test message2
..
我有两个kafka群集,分别是A和B,B是A的副本.我只想在A断开时使用群集B的消息,反之亦然.但是,从两个群集中消费消息将导致消息重复.因此,有什么方法可以配置我的kafka使用者以仅从一个群集接收消息. 谢谢- 解决方案 因此,有什么方法可以配置我的kafka使用者以仅从一个群集接收消息. 是:Kafka使用者实例将始终仅从一个Kafka集群接收消息.也就是说,没有内置
..
如果我指定了一个返回泛型类的方法,那我该怎么做,而不是动态地指定泛型的类型? 例如 try { Class c =Class.forName(keytype); Class d= Class.forName(valuetype); KafkaConsumer consumerconsumer = new KafkaConsumer
..
我正在使用单节点Kafka代理(0.10.2)和单节点Zookeeper代理(3.4.9).我有一台使用者服务器(单核和1.5 GB RAM ).每当我运行具有5个或更多线程的进程时,抛出这些异常后,我的使用者的线程就会被杀死 例外1 java.lang.OutOfMemoryError:Java堆空间 在java.nio.HeapByteBuffer.(HeapByteBuffer.
..
Kafka 0.8V 我想发布/consume byte []对象,java bean对象,可序列化的对象等等. 为这种类型的情况定义发布者和使用者的最佳方法是什么? 当我使用来自使用者迭代器的消息时,我不知道它是什么类型的消息. 有人可以给我指出如何设计这种情况的指南吗? 解决方案 我为每个Kafka主题强制执行一个单一的架构或对象类型.这样,当您收到消息时,便会确切地知道自
..
我试图在统一环境中运行 Kafka 的代码示例,因此,我创建了一个 consumer 客户端(下面提供的代码). using System.Collections; using System.Collections.Generic; using UnityEngine; using Confluent.Kafka; using Confluent.Kafka.Serialization; u
..
我使用 Spring Kafka API 来实现Kafka具有手动偏移管理的消费者: @KafkaListener(topics = "some_topic") public void onMessage(@Payload Message message, Acknowledgment acknowledgment) { if (someCondition) { ac
..
如何编写能够以可扩展方式加入多个Kafka主题的使用者? 我有一个主题,该主题发布了带有键的事件,第二个主题发布了与第一个具有相同键的子集相关的其他事件.我想写一个订阅者,订阅者同时订阅两个主题,并对出现在两个主题中的子集执行一些其他操作. 我可以用一个使用者轻松地做到这一点:从两个主题中读取所有内容,在本地维护状态,并在为给定键读取了两个事件时执行操作.但是我需要扩展解决方案.
..
我已经开始学习Kafka.尝试对其进行基本操作.我坚持关于“经纪人"的观点. 我的kafka正在运行,但是当我要创建分区时. from kafka import TopicPartition (ERROR THERE) consumer = KafkaConsumer(bootstrap_servers='localhost:1234') consumer.assign([Topi
..
即将到来的kafka消费者0.9.x是否将与0.8经纪人兼容? 换句话说-可以只切换到新的消费者实施方式,而无需动手其他任何事情? 解决方案 根据 Kafka 0.9.0的文档,您不能使用新使用者来从0.8.x代理读取数据. 原因如下: 0.9.0.0与以前的版本相比,经纪人之间的协议有所更改.
..
我正在研究卡夫卡流.我想使用选择性极低(千分之一)的过滤器过滤流.我当时在看这种方法: https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter(org.apache.kafka.streams.kstream.Predicate) 但是我找不到任何证据,如果过滤器将由
..
我正在使用具有以下配置的批处理侦听,但是我的消息错误地反序列化了: @KafkaListener( id = "${kafka.buyers.product-sales-pricing.id}", topics = "${kafka.buyers.product-sales-pricing.topic}", groupId = "${kafka.buyers.grou
..
我正在尝试调试一个问题,试图证明如果群集没有重新平衡,则每个唯一的密钥只会进入1个分区. 所以我想知道给定的主题,有没有办法确定将密钥发送到哪个分区? 解决方案 也在源代码中 您需要byte[] keyBytes假定它不为null,然后使用org.apache.kafka.common.utils.Utils,可以运行以下命令. Utils.toPositive(Util
..
我已经实现了一个简单的Kafka Dead信记录处理器. 当使用由控制台生产者产生的记录时,它完美地工作. 但是我发现我们的Kafka Streams应用程序不能保证向接收器主题生成记录,即对于每条产生的记录,偏移量将增加1. 死信处理器背景: 我有一个方案,在发布处理记录所需的所有数据之前,可能会先接收记录. 当记录与流应用程序不匹配以进行处理时,它们将移至“死信"主题,
..
我使用的是Kafka主题中的KafkaMessageListenerContainer,我有一个应用程序逻辑来处理每条记录,这些记录也依赖于其他微服务.我现在在处理每条记录后手动提交偏移量. 但是,如果我的应用程序逻辑失败,则需要寻找失败的偏移量并继续对其进行处理,直到成功为止.为此,我需要对最后一个偏移量进行运行时手动搜索. 使用KafkaMessageListenerContain
..
启动Kafka使用者时收到异常消息. org.apache.kafka.clients.consumer.OffsetOutOfRangeException: 没有为分区配置重设策略的偏移量超出范围{test-0 = 29898318} 我正在将Kafka版本9.0.0与Java 7结合使用. 解决方案 因此,您正在尝试访问主题(test)分区(0)中的偏移量(29898318
..