如何初始化 kafka ConsumerRecords<String,String>在 kafka 中进行测试 [英] How can I initialize kafka ConsumerRecords&lt;String,String&gt; in kafka for testing

查看:173
本文介绍了如何初始化 kafka ConsumerRecords<String,String>在 kafka 中进行测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为 kafka 消费者组件编写测试用例并模拟 kafkaConsumer.poll(),它返回 ConsumerRecords 的实例.我想初始化 ConsumerRecords 并在模拟中使用它,但是 ConsumerRecords 的构造函数期望我在测试中没有的实际 kafka 主题.我认为的一种方法是保留对象的序列化副本并反序列化以初始化 ConsumerRecords.有没有其他方法可以实现相同的目标.

I am writing test cases for kafka consumer components and mocking kafkaConsumer.poll() which returns instance of ConsumerRecords<String,String>. I want to initialize ConsumerRecords and use that in mock but the constructors of ConsumerRecords expect actual kafka topic which I don't have in tests. One way I think for this is by keeping a serialized copy of object and deserialize to initialize ConsumerRecords. Is there any other way to achieve the same.

推荐答案

以下是一些示例代码(Kafka 客户端 lib 版本 0.10.1.1):

Here is some example code (Kafka clients lib version 0.10.1.1):

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;

...
        String topic = "MyTopic";
        Collection<TopicPartition> partitions = new ArrayList<TopicPartition>();
        Collection<String> topicsCollection = new ArrayList<String>();
        partitions.add(new TopicPartition(topic, 1));
        Map<TopicPartition, Long> partitionsBeginningMap = new HashMap<TopicPartition, Long>();
        Map<TopicPartition, Long> partitionsEndMap = new HashMap<TopicPartition, Long>();

        long records = 10;
        for (TopicPartition partition : partitions) {
            partitionsBeginningMap.put(partition, 0l);
            partitionsEndMap.put(partition, records);
            topicsCollection.add(partition.topic());
        }

        MockConsumer<String, MyObject> second = new MockConsumer<String, MyObject>(
                OffsetResetStrategy.EARLIEST);
        second.subscribe(topicsCollection);
        second.rebalance(partitions);       
        second.updateBeginningOffsets(partitionsBeginningMap);
        second.updateEndOffsets(partitionsEndMap);
        for (long i = 0; i < 10; i++) {
            MyObject value = Generator.generate();
            ConsumerRecord<String, MyObject> record = new ConsumerRecord<String, MyObject>(
                    topic, 1, i, null,value);
            second.addRecord(record);
        }
    ...

这篇关于如何初始化 kafka ConsumerRecords<String,String>在 kafka 中进行测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆