为多个主题创建多个消费者组 [英] Create multiple consumer group for multiple topic

查看:17
本文介绍了为多个主题创建多个消费者组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

就我而言,我需要多个主题,每个主题都与多个消费者相关联.我想为每个主题设置一个消费者组.我在 kafka .net 客户端中没有找到任何方法,以便我可以动态创建消费者组并将主题与该消费者组链接.我正在使用 kafka 0.9.0 版本,请告诉我是否需要更改为 kafka server 设置或在 Zookeeper 上?

In my case I need multiple topics with each topic being linked with multiple consumers. I want to set a consumer group for each topic. I did not find any method in kafka .net client so that I can create consumer group dynamically and link the topic with that consumer group. I am using kafka 0.9.0 version, please tell me if I need to change to kafka server setting or on Zookeeper?

推荐答案

我使用 Microsoft .NET kafka 构建了一个快速原型,链接如下.不确定它是否解决了您的问题.

I'm built a quick prototype with Microsoft .NET kafka as link below. not sure it's solving your problem or not.

但是,我强烈建议您使用这个库,因为它包含比 kafka-net 多得多的功能(例如,支持 Zookeeper 维护偏移量、主题组等)

However, I'm hightly recommend you to use this library because it contain much more feature than kafka-net(e.g. supports zookeeper for maintaining offset, topic group, etc.)

https://github.com/Microsoft/CSharpClient-for-Kafka

示例代码

当消费者收到消息时,这将向 kafka 发送 10 条消息并输出消息到控制台.

This will send 10 message to kafka and output message to console when consumer got it.

static void Main(string[] args)
    {
        Task.Factory.StartNew(() =>
        {
            ConsumerConfiguration consumerConfig = new ConsumerConfiguration
            {
                AutoCommit = true, 
                AutoCommitInterval = 1000, 
                GroupId = "group1",
                ConsumerId = "1",
                AutoOffsetReset = OffsetRequest.SmallestTime,
                NumberOfTries = 20,
                ZooKeeper = new ZooKeeperConfiguration("localhost:2181", 30000, 30000, 2000)           
            };
            var consumer = new ZookeeperConsumerConnector(consumerConfig, true);
            var dictionaryMapping = new Dictionary<string, int>();
            dictionaryMapping.Add("topic1", 1);

            var streams = consumer.CreateMessageStreams(dictionaryMapping, new DefaultDecoder());

            var messageStream = streams["topic1"][0];

            foreach (var message in messageStream.GetCancellable(new CancellationToken()))
            {
                Console.WriteLine("Response: P{0},O{1} : {2}", message.PartitionId, message.Offset, Encoding.UTF8.GetString(message.Payload));

                //If you set AutoCommit to false, you can commit by yourself from this command.
                //consumer.CommitOffsets()     
            }
        });


        var brokerConfig = new BrokerConfiguration()
        {
            BrokerId = 1,
            Host = "localhost",
            Port = 9092
        };
        var config = new ProducerConfiguration(new List<BrokerConfiguration> { brokerConfig });
        config.CompressionCodec = CompressionCodecs.DefaultCompressionCodec;
        config.ProducerRetries = 3;
        config.RequiredAcks = -1;            
        var kafkaProducer = new Producer(config);

        byte[] payloadData = Encoding.UTF8.GetBytes("Test Message");
        var inputMessage = new Message(payloadData);
        var data = new ProducerData<string, Message>("topic1", inputMessage);

        for (int i = 0; i < 10; i++)
        {
            kafkaProducer.Send(data);
        }

        Console.ReadLine();
    }

希望对您有所帮助.

这篇关于为多个主题创建多个消费者组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆