Azure服务总线-使用OnMessage()方法接收消息 [英] Azure Service Bus - Receive Messages with OnMessage() Method

查看:92
本文介绍了Azure服务总线-使用OnMessage()方法接收消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

遵循 MS文档,从订阅中接收消息并不困难.但是,如果我希望我的应用程序每次发布新消息时都收到一条消息-持续轮询.因此,是 SubscriptionClient 类的 OnMessage()方法.

Following the MS documentation it was not difficult to receive message(s) from subscription. However, if I'd like my application to receive a message every time new message is posted - a constant polling. Hence the OnMessage() method of the SubscriptionClient class.

MS文档说:" ......,当调用OnMessage时,客户端启动一个内部消息泵,该泵不断轮询队列或订阅.此消息泵由一个无限循环组成,该无限循环发出Receive()调用.呼叫超时,它将发出下一个Receive()呼叫.... "

但是在应用程序运行时,仅在接收到最新消息时调用 OnMessage()方法.当发布新消息时,似乎无法进行持续轮询.在尝试了许多不同的方法之后,我可以使这项工作并使应用程序在收到新消息时做出反应的唯一方法是将代码放入具有无限循环的单独任务中.在许多级别上,这似乎是完全错误的!(请参见下面的代码).

But when the application is running, the moment OnMessage() method is called only latest message(s) is received. When new messages are posted the constant polling does not seem to be working. After trying many different approaches the only way I could make this work and have the application react the moment new message is received is to place the code into a separate task with infinite loop. This seems totally wrong on so many levels! (see code below).

有人可以帮助我更正我的代码或发布工作示例以完成相同功能而无需循环吗?谢谢!

Can anyone help me to correct my code or post a working sample to accomplish the same functionality without the loop? Thank you!

 public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
        {
            var newMessage = new MessageQueue();
            int i = 0;

            Task listener = Task.Factory.StartNew(() =>
            {
                while (true)
                {
                    SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

                    Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();

                    OnMessageOptions options = new OnMessageOptions();
                                     options.AutoComplete = false;
                                     options.AutoRenewTimeout = TimeSpan.FromMinutes(1);

                    Client.OnMessage((message) =>
                    {
                        try
                        {
                            retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
                            retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
                            retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString());
                            retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString());
                            retrievedMessage.Add("message", message.Properties["Message"].ToString());

                            newMessage.AnnounceNewMessage(retrievedMessage); // event ->

                            message.Complete(); // Remove message from subscription.
                        }
                        catch (Exception ex)
                        {
                            string exmes = ex.Message;
                            message.Abandon();
                        }

                    }, options);

                    retrievedMessage.Clear();

                    i++;

                    Thread.Sleep(3000);
                }

            });
        }

推荐答案

您的代码有一些问题需要解决-

Your code has a few issues to iron out -

  • 失败了,我假设您的应用程序然后退出-或 至少正在侦听消息的线程终止.
  • 您的while循环不断重复代码以连接消息处理程序, 您只需要执行一次.
  • 您需要一种使调用堆栈保持活动状态并防止应用程序垃圾回收对象的方法.
  • It falls through and I assume your application then exits - or at least the thread that is listening for the messages terminates.
  • Your while loop keeps repeating the code to hook up the message handler, you only need to do this once.
  • You need a way to keep the call stack alive and prevent your app from garbage collecting your object.

以下内容将帮助您成功走向成功.祝你好运.

The below should see you on your way to success. Good luck.

 ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
    SubscriptionClient Client;

    public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString)
    {
        Task listener = Task.Factory.StartNew(() =>
        {
            // You only need to set up the below once. 
            Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);

            OnMessageOptions options = new OnMessageOptions();
            options.AutoComplete = false;
            options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
            options.ExceptionReceived += LogErrors;

            Client.OnMessage((message) =>
            {
                try
                {
                    Trace.WriteLine("Got the message with ID {0}", message.MessageId);
                    message.Complete(); // Remove message from subscription.
                }
                catch (Exception ex)
                {
                    Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString());
                    message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters.
                }

            }, options);

            CompletedResetEvent.WaitOne();
        });
    }

    /// <summary>
    /// Added in rudimentary exception handling .
    /// </summary>
    /// <param name="sender">The sender.</param>
    /// <param name="ex">The <see cref="ExceptionReceivedEventArgs"/> instance containing the event data.</param>
    private void LogErrors(object sender, ExceptionReceivedEventArgs ex)
    {
        Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString());
    }

    /// <summary>
    /// Call this to stop the messages arriving from subscription.
    /// </summary>
    public void StopMessagesFromSubscription()
    {
        Client.Close(); // Close the message pump down gracefully
        CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully 
    }

或者,您可以使用ReceiveBatch自己以更传统的方式将消息分块:

Alternatively you can chunk the message off in a more traditional fashion yourself using ReceiveBatch:

var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30),
                                                       cancellationToken);

这篇关于Azure服务总线-使用OnMessage()方法接收消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆