读取 SQL Server Broker 消息并使用 NServiceBus 发布它们 [英] Read SQL Server Broker messages and publish them using NServiceBus

查看:44
本文介绍了读取 SQL Server Broker 消息并使用 NServiceBus 发布它们的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 NServiceBus 非常陌生,在我们的一个项目中,我们希望完成以下任务 -

I am very new to NServiceBus, and in one of our project, we want to accomplish following -

  1. 每当在Sql server中修改表数据时,构造一条消息并插入sql server broker队列
  2. 使用 NServiceBus 读取代理队列消息
  3. 作为另一个事件再次发布消息,以便其他订阅者可以处理.

现在是第 2 点,我不知道如何完成它.

Now it is point 2, that I do not have much clue, how to get it done.

我参考了以下帖子,之后我能够在代理队列中输入消息,但无法在我们的项目中与 NServiceBus 集成,因为 NServiceBus 库是旧版本,而且许多使用的方法已被弃用.因此,在当前版本中使用它们会变得非常麻烦,或者如果我以不正确的方式使用它们.

I have referred the following posts, after which I was able to enter the message in broker queue, but unable to integrate with NServiceBus in our project, as the NServiceBus libraries are of older version and also many methods used are deprecated. So using them with current versions is getting very troublesome, or if I was doing it in improper way.

http://www.nullreference.se/2010/12/06/using-nservicebus-and-servicebroker-net-part-2https://github.com/jdaigle/servicebroker.net

任何有关正确执行此操作的帮助都是无价的.

Any help on the correct way of doing this would be invaluable.

谢谢.

推荐答案

我使用的是当前版本的 nServiceBus (5)、VS2013 和 SQL Server 2008.我使用 本教程,它使用 SQL Server 对象代理和 SQLDependency 来监视对特定桌子.(注意这在 SQL Server 的更高版本中可能会被弃用).

I'm using the current version of nServiceBus (5), VS2013 and SQL Server 2008. I created a Database Change Listener using this tutorial, which uses SQL Server object broker and SQLDependency to monitor the changes to a specific table. (NB This may be deprecated in later versions of SQL Server).

SQL Dependency 允许您使用所有基本 SQL 功能的广泛选择,尽管有 您需要注意的一些限制.我稍微修改了教程中的代码以提供更好的错误信息:

SQL Dependency allows you to use a broad selection of all the basic SQL functionality, although there are some restrictions that you need to be aware of. I modified the code from the tutorial slightly to provide better error information:

    void NotifyOnChange(object sender, SqlNotificationEventArgs e)
    {
        // Check for any errors
        if (@"Subscribe|Unknown".Contains(e.Type.ToString())) { throw _DisplayErrorDetails(e); }

        var dependency = sender as SqlDependency;
        if (dependency != null) dependency.OnChange -= NotifyOnChange;
        if (OnChange != null) { OnChange(); }
    }

    private Exception _DisplayErrorDetails(SqlNotificationEventArgs e)
    {
        var message = "useful error info";

        var messageInner = string.Format("Type:{0}, Source:{1}, Info:{2}", e.Type.ToString(), e.Source.ToString(), e.Info.ToString());

        if (@"Subscribe".Contains(e.Type.ToString()) && @"Invalid".Contains(e.Info.ToString()))
            messageInner += "\r\n\nThe subscriber says that the statement is invalid - check your SQL statement conforms to specified requirements (http://stackoverflow.com/questions/7588572/what-are-the-limitations-of-sqldependency/7588660#7588660).\n\n";

        return new Exception(messageMain, new Exception(messageInner));

    }

我还创建了一个具有数据库优先"实体框架数据模型的项目,以允许我对更改后的数据执行某些操作.

I also created a project with a "database first" Entity Framework data model to allow me do something with the changed data.

[相关部分] 我的 nServiceBus 项目包含两个作为主机运行"端点,其中一个发布事件消息.第二个端点处理消息.发布者已设置为 IWantToRunAtStartup,它实例化 DBListener 并将我想作为更改监视器运行的 SQL 语句传递给它.onChange() 函数传入一个匿名函数来读取更改的数据并发布消息:

[The relevant part of] My nServiceBus project comprises two "Run as Host" endpoints, one of which publishes event messages. The second endpoint handles the messages. The publisher has been setup to IWantToRunAtStartup, which instantiates the DBListener and passes it the SQL statement I want to run as my change monitor. The onChange() function is passed an anonymous function to read the changed data and publish a message:

using statements

namespace Sample4.TestItemRequest
{
    public partial class MyExampleSender : IWantToRunWhenBusStartsAndStops
    {
        private string NOTIFY_SQL = @"SELECT [id] FROM [dbo].[Test] WITH(NOLOCK) WHERE ISNULL([Status], 'N') = 'N'";
    public void Start() { _StartListening(); }
    public void Stop() { throw new NotImplementedException(); }

    private void _StartListening()
    {
        var db = new Models.TestEntities();

        // Instantiate a new DBListener with the specified connection string            
        var changeListener = new DatabaseChangeListener(ConfigurationManager.ConnectionStrings["TestConnection"].ConnectionString);

        // Assign the code within the braces to the DBListener's onChange event
        changeListener.OnChange += () =>
        {
            /* START OF EVENT HANDLING CODE  */

            //This uses LINQ against the EF data model to get the changed records
            IEnumerable<Models.TestItems> _NewTestItems = DataAccessLibrary.GetInitialDataSet(db);

            while (_NewTestItems.Count() > 0)
            {
                foreach (var qq in _NewTestItems)
                {
                    // Do some processing, if required

                    var newTestItem = new NewTestStarted() { ... set properties from qq object ... };
                    Bus.Publish(newTestItem);
                }

                // Because there might be a number of new rows added, I grab them in small batches until finished.
                // Probably better to use RX to do this, but this will do for proof of concept
                _NewTestItems = DataAccessLibrary.GetNextDataChunk(db);

            }

            changeListener.Start(string.Format(NOTIFY_SQL));

            /* END OF EVENT HANDLING CODE  */

        };

        // Now everything has been set up.... start it running.
        changeListener.Start(string.Format(NOTIFY_SQL));

        }
    }
}

重要 OnChange 事件触发会导致侦听器停止监视.它基本上是一个单一的事件通知程序.处理完事件后,最后要做的就是重新启动 DBListener.(您可以在 END OF EVENT HANDLING 注释之前的行中看到这一点).

Important The OnChange event firing causes the listener to stop monitoring. It basically is a single event notifier. After you have handled the event, the last thing to do is restart the DBListener. (You can see this in the line preceding the END OF EVENT HANDLING comment).

您需要添加对 System.Data 和可能的 System.Data.DataSetExtensions 的引用.

You need to add a reference to System.Data and possibly System.Data.DataSetExtensions.

该项目目前仍处于概念验证阶段,因此我很清楚上述内容可以有所改进.还请记住,我必须删除公司特定的代码,因此可能存在错误.将其视为模板,而不是工作示例.

The project at the moment is still proof of concept, so I'm well aware that the above can be somewhat improved. Also bear in mind I had to strip out company specific code, so there may be bugs. Treat it as a template, rather than a working example.

我也不知道这是否是放置代码的正确位置——这也是我今天在 StackOverflow 上的部分原因;寻找更好的 ServiceBus 主机代码示例.无论我的代码有什么问题,该解决方案都非常有效 - 到目前为止 - 也能满足您的目标.

I also don't know if this is the right place to put the code - that's partly why I'm on StackOverflow today; to look for better examples of ServiceBus host code. Whatever the failings of my code, the solution works pretty effectively - so far - and meets your goals, too.

不要太担心 ServiceBroker 方面的事情.按照教程进行设置后,SQLDependency 会为您处理详细信息.

Don't worry too much about the ServiceBroker side of things. Once you have set it up, per the tutorial, SQLDependency takes care of the details for you.

这篇关于读取 SQL Server Broker 消息并使用 NServiceBus 发布它们的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆