将消息定向到消费者 [英] Directing messages to consumers
问题描述
我的客户端正在尝试向接收者发送消息.但是我注意到接收器有时不能接收到客户端发送的所有消息,因此丢失了一些消息(不确定问题出在哪里?客户端还是接收器). 关于为什么可能会发生的任何建议.这是我目前正在做的
My client is attempting to send messages to the receiver. However I noticed that the receiver sometimes does not receive all the messages sent by the client thus missing a few messages (not sure where the problem is ? Client or the receiver). Any suggestions on why that might be happening. This is what I am currently doing
在接收方,这就是我正在做的.
On the receiver side this is what I am doing.
这是事件处理器
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
}
}
这是客户端连接到事件中心的方式
This is how the client connects to the event hub
var StrBuilder = new EventHubsConnectionStringBuilder(eventHubConnectionString)
{
EntityPath = eventHubName,
};
this.eventHubClient = EventHubClient.CreateFromConnectionString(StrBuilder.ToString());
我如何将邮件定向到特定的消费者
How do I direct my messages to specific consumers
推荐答案
我正在使用来自eventhub官方文档的此示例代码,用于接收.
I'm using this sample code from eventhub official doc, for sending and receiving.
我有2个消费群体: $ Default 和 newcg .假设您有2个客户端,client_1使用默认的使用者组($ Default),client_2使用另一个使用者组(newcg)
And I have 2 consumer groups: $Default and newcg. Suppose you have 2 clients, the client_1 are using the default consumer group($Default), and client_2 are using the other consumer group(newcg)
首先,在创建发送客户端之后,在SendMessagesToEventHub
方法中,我们需要添加一个带值的属性.该值应为使用者组名称.示例代码如下:
First, after create the send client, in the SendMessagesToEventHub
method, we need to add a property with value. The value should be the consumer group name. Sample code like below:
private static async Task SendMessagesToEventHub(int numMessagesToSend)
{
for (var i = 0; i < numMessagesToSend; i++)
{
try
{
var message = "444 Message";
Console.WriteLine($"Sending message: {message}");
EventData mydata = new EventData(Encoding.UTF8.GetBytes(message));
//here, we add a property named "cg", it's value is the consumer group. By setting this property, then we can read this message via this specified consumer group.
mydata.Properties.Add("cg", "newcg");
await eventHubClient.SendAsync(mydata);
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
await Task.Delay(10);
}
Console.WriteLine($"{numMessagesToSend} messages sent.");
}
然后在client_1中,创建接收者项目后,该项目将使用默认使用者组($ Default)
->在SimpleEventProcessor
类-> ProcessEventsAsync
方法中,我们可以过滤掉不必要的事件数据. ProcessEventsAsync
方法的示例代码:
Then in the client_1, after create the receiver project, which use the default consumer group($Default)
-> in the SimpleEventProcessor
class -> ProcessEventsAsync
method, we can filter out the unnecessary event data. Sample code for ProcessEventsAsync
method:
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
//filter the data here
if (eventData.Properties["cg"].ToString() == "$Default")
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
Console.WriteLine(context.ConsumerGroupName);
}
}
return context.CheckpointAsync();
}
在另一个使用另一个使用者组的客户端(例如client_2)中,例如其名称为 newcg ,我们可以按照client_1中的步骤进行操作,只需对ProcessEventsAsync
方法进行一些更改,如下所示:
And in another client, like client_2, which use another consumer group, like it's name is newcg, we can follow the steps in client_1, just a little changes in ProcessEventsAsync
method, like below:
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
//filter the data here, using another consumer group name
if (eventData.Properties["cg"].ToString() == "newcg")
{
//other code
}
}
return context.CheckpointAsync();
}
这篇关于将消息定向到消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!