当Azure事件中心中的一个分区处于活动状态时,如何禁用其他分区以停止接收消息 [英] How to disable other partitions to stop receiving messages when one partition is active in Azure event hubs
问题描述
我有一个安装在3台不同服务器上的应用程序.该应用程序订阅一个Event Hub.该事件中心有8个分区.因此,当我在所有3台计算机上启动应用程序时,所有3台计算机上的所有分区都是随机初始化的.
I have an application which is installed in 3 different servers.This application subscribes to a single Event hub. This event hub has 8 partitions. So when I start my application in all the 3 machines, all partitions are initialized randomly on all the 3 machines.
说像这样:
VM1:分区0,1,2
VM2:分区3,4
VM3:分区5,6,7
VM1 : Partition 0,1,2
VM2 : Partition 3,4
VM3 : Partition 5,6,7
所有这些分区都在连续接收消息.这些消息需要一个接一个地处理.现在,我的要求是,在一台机器/服务器中,我一次只希望接收一条消息(无论初始化了多少个分区).而且VM1,VM2,VM3可以并行运行.
All these partitions are receiving messages continuously. These messages needs to be processed one after the other. Now my requirement is , with in a machine/server, I want to receive only one message at a time (no matter how many partitions are initialized). Also VM1, VM2, VM3 can run in parallel.
一种情况是,在一台机器上,例如VM1,我已经通过分区0收到了一条消息.该消息现在正在处理,通常需要15分钟.在这15分钟内,我不希望分区1或分区2接收任何新消息,直到较早的消息完成为止.一旦完成了先前的消息处理,则3个分区中的任何一个都准备好接收新消息.一旦任何分区收到另一条消息,其他分区就不应再收到任何消息.
A scenario would be, in one machine, say VM1, I have received a message through Partition 0. That message is being processed now which typically takes say 15 mins. With in these 15 mins, I do not want either Partition 1 or 2 to receive any new messages until the earlier one is finished. Once the previous message processing is done, then either of the 3 partitions is ready for new message. Once anyone partition receives another message, other partitions should not receive any messages.
我正在使用的代码是这样的:
The code I'm using is something like this :
public class SimpleEventProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
}
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
}
public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
DoSomethingWithMessage(); // typically takes 15-20 mins to finish this method.
}
return context.CheckpointAsync();
}
}
这可能吗?
PS:我必须使用事件中心,没有其他选择.
PS: I have to use event hubs and have no other option.
推荐答案
您可以通过互斥静态锁定对象来实现此目的.
You can achieve this by mutual exclusion on a static lock object.
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
lock (lockObj)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
DoSomethingWithMessage(); // typically takes 15-20 mins to finish this method.
}
return context.CheckpointAsync();
}
}
不要忘记将EventProcessorOptions.MaxBatchSize设置为1,如下所示.
Don't forget to set EventProcessorOptions.MaxBatchSize to 1 as below.
var epo = new EventProcessorOptions
{
MaxBatchSize = 1
};
await eventProcessorHost.RegisterEventProcessorAsync<MyProcessorHost>(epo);
带有下游代理的完整处理器代码.
Full processor code with downstream agent.
public class SampleEventProcessor : IEventProcessor
{
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"Opened partition {context.PartitionId}");
return Task.FromResult<object>(null);
}
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Closed partition {context.PartitionId}");
return Task.FromResult<object>(null);
}
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
// Process the mesasage in downstream agent.
await DownstreamAgent.ProcessEventAsync(eventData);
// Checkpoint current position.
await context.CheckpointAsync();
}
}
public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Partition {context.PartitionId} - {error.Message}");
return Task.CompletedTask;
}
}
public class DownstreamAgent
{
const int DegreeOfParallelism = 1;
static SemaphoreSlim threadSemaphore = new SemaphoreSlim(DegreeOfParallelism, DegreeOfParallelism);
public static async Task ProcessEventAsync(EventData message)
{
// Wait for a spot so this message can get processed.
await threadSemaphore.WaitAsync();
try
{
// Process the message here
var data = Encoding.UTF8.GetString(message.Body.Array);
Console.WriteLine(data);
}
finally
{
// Release the semaphore here so that next message waiting can be processed.
threadSemaphore.Release();
}
}
}
这篇关于当Azure事件中心中的一个分区处于活动状态时,如何禁用其他分区以停止接收消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!