将消息定向到消费者 [英] Directing messages to consumers

查看:66
本文介绍了将消息定向到消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的客户端正在尝试向接收者发送消息.但是我注意到接收器有时不能接收到客户端发送的所有消息,因此丢失了一些消息(不确定问题出在哪里?客户端还是接收器). 关于为什么可能会发生的任何建议.这是我目前正在做的

My client is attempting to send messages to the receiver. However I noticed that the receiver sometimes does not receive all the messages sent by the client thus missing a few messages (not sure where the problem is ? Client or the receiver). Any suggestions on why that might be happening. This is what I am currently doing

在接收方,这就是我正在做的.

On the receiver side this is what I am doing.

这是事件处理器

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            }
        }

这是客户端连接到事件中心的方式

This is how the client connects to the event hub

var StrBuilder = new EventHubsConnectionStringBuilder(eventHubConnectionString)
{
 EntityPath = eventHubName,
};
this.eventHubClient = EventHubClient.CreateFromConnectionString(StrBuilder.ToString());

我如何将邮件定向到特定的消费者

How do I direct my messages to specific consumers

推荐答案

我正在使用来自eventhub官方文档的此示例代码,用于

I'm using this sample code from eventhub official doc, for sending and receiving.

我有2个消费群体: $ Default newcg .假设您有2个客户端,client_1使用默认的使用者组($ Default),client_2使用另一个使用者组(newcg)

And I have 2 consumer groups: $Default and newcg. Suppose you have 2 clients, the client_1 are using the default consumer group($Default), and client_2 are using the other consumer group(newcg)

首先,在创建发送客户端之后,在SendMessagesToEventHub方法中,我们需要添加一个带值的属性.该值应为使用者组名称.示例代码如下:

First, after create the send client, in the SendMessagesToEventHub method, we need to add a property with value. The value should be the consumer group name. Sample code like below:

    private static async Task SendMessagesToEventHub(int numMessagesToSend)
    {
        for (var i = 0; i < numMessagesToSend; i++)
        {
            try
            {
                var message = "444 Message";
                Console.WriteLine($"Sending message: {message}");
                EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));

                //here, we add a property named "cg", it's value is the consumer group. By setting this property, then we can read this message via this specified consumer group.
                mydata.Properties.Add("cg", "newcg");

                await eventHubClient.SendAsync(mydata);

            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }

            await Task.Delay(10);
        }

        Console.WriteLine($"{numMessagesToSend} messages sent.");
    }

然后在client_1中,创建接收者项目后,该项目将使用默认使用者组($ Default) ->在SimpleEventProcessor类-> ProcessEventsAsync方法中,我们可以过滤掉不必要的事件数据. ProcessEventsAsync方法的示例代码:

Then in the client_1, after create the receiver project, which use the default consumer group($Default) -> in the SimpleEventProcessor class -> ProcessEventsAsync method, we can filter out the unnecessary event data. Sample code for ProcessEventsAsync method:

        public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (var eventData in messages)
            {
                //filter the data here
                if (eventData.Properties["cg"].ToString() == "$Default")
                {                    
                    var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

                    Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
                    Console.WriteLine(context.ConsumerGroupName);
                }
            }

            return context.CheckpointAsync();
        }

在另一个使用另一个使用者组的客户端(例如client_2)中,例如其名称为 newcg ,我们可以按照client_1中的步骤进行操作,只需对ProcessEventsAsync方法进行一些更改,如下所示:

And in another client, like client_2, which use another consumer group, like it's name is newcg, we can follow the steps in client_1, just a little changes in ProcessEventsAsync method, like below:

            public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                foreach (var eventData in messages)
                {
                    //filter the data here, using another consumer group name
                    if (eventData.Properties["cg"].ToString() == "newcg")
                    {  
                       //other code
                    }
                   }

                 return context.CheckpointAsync();
               }

这篇关于将消息定向到消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆