kafka-consumer-api相关内容
我正在尝试找出当前我的高级消费者正在解决的抵消措施.我使用Kafka 0.8.2.1,在Kafka的server.properties中设置了否"offset.storage"-我认为这意味着偏移量存储在Kafka中. (通过检查Zk shell中的以下路径,我还验证了Zookeeper中没有存储偏移量:/consumers/consumer_group_name/offsets/topic_na
..
我不清楚为什么我们同时需要session.timeout.ms和max.poll.interval.ms以及何时使用一个或多个?似乎这两个设置都指示协调器在假定死机之前等待从消费者那里获取心跳的时间上限. 对于基于解决方案 在KIP-62之前,只有session.timeout.ms(即Kafka 0.10.0和更早版本).通过max.poll.interval.ms "rel =" no
..
我正在发布到远程kafka服务器,并尝试使用来自该远程服务器的消息. (卡夫卡v 0.90.1) 发布工作正常,但也不费劲. 发布者 package org.test; import java.io.IOException; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProd
..
当我使用kafka 0.9.0.1运行以下命令时,收到此警告[1].您能告诉我我的主题有什么问题吗? (我正在与在ec2中运行的kafka经纪人交谈) #./kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC? [1] [2016-04-06
..
当我在wildfly v 10中部署我的简单kafka项目(可作为jar正常工作)的战争时,我得到了一些zookeeper连接异常[1].这会在kafka监听器开始与zookeeper连接时发生 [1]] 15:21:58,531 ERROR [org.jboss.msc.service.fail] (ServerService Thread Pool -- 82) MSC000001
..
我正在关注正式文档以实施 kakf-connect 从文件中读取数据。 我有完美的kafka运行。生产者和消费者发送和接收消息。 但是,当我运行以下命令时: sudo。 /bin/connect-standalone.sh config / connect-standalone.properties config / connect-file-source.properties
..
来自 https的引用: //www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1 缺点是,虽然commitSync()将重试提交,直到 成功或遇到不可恢复的失败,co
..
我正在运行一些来自 http://www.javaworld.com/article/3060078/big-data/big-data-messaging-with-kafka-part-1.html?page=2 ,并且kafkaconsumer根据需要从主题中消耗,但是每个轮询都导致许多调试日志的打印(到标准输出),这是我不想要的。 我已尝试在 /config/log4j.proper
..
根据 kafka javadocs 如果我: 订阅模式 创建与模式匹配的主题 重新平衡应该发生,这使消费者从该新主题中读取。但这种情况并没有发生。 如果我停止并启动消费者,它确实会接受新主题。所以我知道新主题与模式匹配。在 https:// stackoverflow中可能存在此问题的重复内容。 com / questions / 37120537 / whitelist-fi
..
在创建多个使用者(使用Kafka 0.9 java API)并且每个线程启动后,我收到以下异常 消费者失败但异常:org.apache.kafka.clients.consumer.CommitFailedException:由于组重新平衡 class com.messagehub.consumer.Consumer正在关闭,因此无法完成提交。 org.apache.kafka.cli
..
我在我的框架中使用Kafka生产者 - 消费者模型。消费者端消耗的记录随后被索引到elasticsearch上。在这里我有一个用例,如果ES关闭,我将不得不暂停kafka消费者直到ES启动,一旦启动,我需要恢复消费者并消耗我上次离开的记录。 我不认为这可以用@KafkaListener实现。有谁能请给我一个解决方案吗?我想我需要为此编写自己的KafkaListenerContainer,但我无法
..
我创造了一个简单的Kafka Producer&消费者。我正在使用kafka_2.11-0.9.0.0。这是我的制作人代码, public class KafkaProducerTest { public static String topicName =“test-topic-2” ; public static void main(String [] args){ // TOD
..
我正在使用Java编写 kafka 使用者。我想保留消息的实时,所以如果有太多消息等待消费,例如1000或更多,我应该放弃未使用的消息并开始使用最新的消息。 对于这个问题,我尝试比较最后提交的偏移量和主题的最新偏移量(只有1个分区),如果这两个偏移量之间的差异大于某个量,我将设置作为下一个偏移量的主题的最新偏移量,以便我可以放弃那些冗余消息。 现在我的问题是如何获得主题的最新偏移量,有
..
每当我尝试从kafka队列中读取消息时,我都会遇到以下异常: [error](运行) -main-0)java.lang.ClassCastException:org.apache.avro.generic.GenericData $ Record无法强制转换为com.harmeetsingh13.java.Customer java.lang.ClassCastException:o
..
我最近一直与卡夫卡合作,对消费者群体下的消费者有点混淆。混淆的中心是将消费者实现为流程还是线程。对于这个问题,假设我使用的是高级消费者。 让我们考虑一下我已经尝试过的场景。在我的主题中有2个分区(为简单起见,我们假设复制因子只有1)。我创建了一个消费者( ConsumerConnector )进程 consumer1 ,其中组 group1 ,然后创建一个大小为2的主题计数图,然后在该过程下
..
在轮询Kafka时,我使用 subscribe()函数订阅了多个主题。现在,我想设置我想从每个主题中读取的偏移量,而不是在每次 seek()和 poll()之后重新订阅来自一个主题。将在每个主题名称上迭代地调用 seek(),在轮询数据之前实现结果? 如何在Kafka中准确存储偏移量? 我每个主题只有一个分区,只有一个消费者可以从所有主题中读取。 解决方案 每个主题的KAFKA商店
..
我正在尝试让一个简单的Kafka Consumer使用Java API v0.9.0.1。我正在使用的kafka服务器是一个docker容器,也运行版本0.9.0.1。下面是使用者代码: public class Consumer { public static void main(String [] args)抛出IOException { KafkaConsumer
..
我正在为kafka运行一个简单的消费者,例如: int timeout = 80000; int bufferSize = 64 * 1024; consumer = new SimpleConsumer(host,port,timeout,bufferSize,clientName); 这可以运行几个小时,但我在$ b后得到一个例外 $ b kafka.con
..
只有很少的序列化器可用, org.apache.kafka.common.serialization.StringSerializer org.apache.kafka.common.serialization.StringSerializer 我们如何创建自己的自定义序列化器? 解决方案 这里有一个示例,可以使用您自己的序列化器/解串器来获取Kafka消息值
..
当我尝试使用我各自的架构运行 Kafka Consumer with Avro 时,它返回错误“AvroRuntimeException:格式错误的数据。长度为负:-40”。我看到其他人也有类似的问题将字节数组转换为json , Avro写入和读取,以及 Kafka Avro Binary *编码器。我还引用了这个 Consumer Group Example 有帮助,但到目前为止没有帮助这个错误
..