如何在.net Core项目中使用SignalR Core监听Postgresql数据库? [英] How can I listen postgresql database with SignalR Core in .net core project?

查看:157
本文介绍了如何在.net Core项目中使用SignalR Core监听Postgresql数据库?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发.net核心网络应用程序。我想听我的PostgreSQL数据库。

I'm working on .net core web application. I want to listen my PostgreSQL database. And if there are any changes on table, I have to got it.

因此,根据我的研究,我必须使用SignalR Core。我用SignalR做过一些示例应用程序,例如聊天应用程序,但是它们都没有监听数据库。我找不到任何示例。

So according to my research, I have to use SignalR Core. I did some example application with SignalR like chat app but none of them listen database. I couldn't find any example for this.

-是否必须在PostgreSQL数据库上触发?

-是否必须在代码侧监听?

-如何使用SignalR核心?

请告诉我一种方法。

非常感谢。

推荐答案

此示例是asp.net core 3.0+的工作版本。完整代码如下。

This example is work asp.net core 3.0+. Full code is below.

第1步。在PostgreSql上创建触发器以进行监听操作

Step 1. Create a trigger on PostgreSql for listening actions

 create trigger any_after_alarm_speed after
 insert
 or
 delete
 or
 update
 on
 public.alarm_speed for each row execute procedure alarm_speedf();

步骤2。在Postgresql上创建程序

Step 2. Create Procedur on Postgresql

CREATE OR REPLACE FUNCTION public.alarm_speedf()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF TG_OP = 'INSERT' then
PERFORM pg_notify('notifyalarmspeed', format('INSERT %s %s', NEW.alarm_speed_id, 
NEW.alarm_speed_date));
ELSIF TG_OP = 'UPDATE' then
PERFORM pg_notify('notifyalarmspeed', format('UPDATE %s %s', OLD.alarm_speed_id, 
OLD.alarm_speed_date));
ELSIF TG_OP = 'DELETE' then
PERFORM pg_notify('notifyalarmspeed', format('DELETE %s %s', OLD.alarm_speed_id, 
OLD.alarm_speed_date));
END IF;
RETURN NULL;
END;
$function$;

第3步。创建中心

  public class speedalarmhub : Hub
    {

        private IMemoryCache _cache;
       `private IHubContext<speedalarmhub> _hubContext;
         public speedalarmhub(IMemoryCache cache, IHubContext<speedalarmhub> hubContext)
        {
            _cache = cache;
            _hubContext = hubContext; 
        }

        public async Task SendMessage()
        {
            if (!_cache.TryGetValue("SpeedAlarm", out string response))
            {
                SpeedListener speedlist = new SpeedListener(_hubContext,_cache);
                speedlist.ListenForAlarmNotifications();
                string jsonspeedalarm = speedlist.GetAlarmList();
                _cache.Set("SpeedAlarm", jsonspeedalarm);
                await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
            }
            else
            {
                await Clients.All.SendAsync("ReceiveMessage", _cache.Get("SpeedAlarm").ToString());
            }
        }

    }

步骤4创建监听器控制器

Step 4. Create Listener Controller

 public class SpeedListener :Controller
{
    private IHubContext<speedalarmhub> _hubContext;
    private IMemoryCache _cache;
    public SpeedListener(IHubContext<speedalarmhub> hubContext,IMemoryCache cache)
    {
        _hubContext = hubContext;
        _cache = cache; 
    }
    static string GetConnectionString()
    {
        var csb = new NpgsqlConnectionStringBuilder
        {
            Host = "yourip",
            Database = "yourdatabase",
            Username = "yourusername",
            Password = "yourpassword",
            Port = 5432,
            KeepAlive = 30
        };
        return csb.ConnectionString;
    }
    public void ListenForAlarmNotifications()
    {
        NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
        conn.StateChange += conn_StateChange;
        conn.Open();
        var listenCommand = conn.CreateCommand();
        listenCommand.CommandText = $"listen notifyalarmspeed;";
        listenCommand.ExecuteNonQuery();
        conn.Notification += PostgresNotificationReceived;
        _hubContext.Clients.All.SendAsync(this.GetAlarmList());
        while (true)
        {
            conn.Wait();
        }
    }
    private void PostgresNotificationReceived(object sender, NpgsqlNotificationEventArgs e)
    {

        string actionName = e.Payload.ToString();
        string actionType = "";
        if (actionName.Contains("DELETE"))
        {
            actionType = "Delete";
        }
        if (actionName.Contains("UPDATE"))
        {
            actionType = "Update";
        }
        if (actionName.Contains("INSERT"))
        {
            actionType = "Insert";
        }
        _hubContext.Clients.All.SendAsync("ReceiveMessage", this.GetAlarmList());
    }
    public string GetAlarmList()
    {
        var AlarmList = new List<AlarmSpeedViewModel>();
        using (NpgsqlCommand sqlCmd = new NpgsqlCommand())
        {
            sqlCmd.CommandType = CommandType.StoredProcedure;
            sqlCmd.CommandText = "sp_alarm_speed_process_get";
            NpgsqlConnection conn = new NpgsqlConnection(GetConnectionString());
            conn.Open();
            sqlCmd.Connection = conn;
            using (NpgsqlDataReader reader = sqlCmd.ExecuteReader())
            {
                while (reader.Read())
                {
                    AlarmSpeedViewModel model = new AlarmSpeedViewModel();
                    model.alarm_speed_id = reader.GetInt32(0);
                  // you must fill  your model items
                    AlarmList.Add(model);
                }
                reader.Close();
                conn.Close();
            }



        }
        _cache.Set("SpeedAlarm", SerializeObjectToJson(AlarmList));
        return _cache.Get("SpeedAlarm").ToString();
    }
    public String SerializeObjectToJson(Object alarmspeed)
    {
        try
        {
            var jss = new JavaScriptSerializer();
            return  jss.Serialize(alarmspeed);
        }
        catch (Exception) { return null; }
    }
    private void conn_StateChange(object sender, System.Data.StateChangeEventArgs e)
    {

        _hubContext.Clients.All.SendAsync("Current State: " + e.CurrentState.ToString() + " Original State: " + e.OriginalState.ToString(), "connection state changed");
    }
}

第5步呼叫中心

<script src="~/lib/signalr.js"></script>

<script type="text/javascript">
// Start the connection.
var connection = new signalR.HubConnectionBuilder()
    .withUrl('/speedalarmhub')
    .build();


connection.on('ReceiveMessage', function (message) {

            var encodedMsg = message;
            // Add the message to the page.

});
// Transport fallback functionality is now built into start.
connection.start()
    .then(function () {

        console.log('connection started');
        connection.invoke('SendMessage');
    })
    .catch(error => {
        console.error(error.message);
    });

步骤6。添加以下代码启动时配置服务

Step 6. Add below code Configuration Services at Startup

public void ConfigureServices(IServiceCollection services)
    {
        services.AddControllersWithViews();
        services.AddSignalR();
        services.AddMemoryCache();
    }

步骤7。在Configure方法中添加以下代码

Step 7. add below code in Configure method

app.UseEndpoints(endpoints =>
        {
            endpoints.MapControllerRoute(
                name: "default",
                pattern: "{controller=Home}/{action=Index}/{id?}");
              endpoints.MapHub<speedalarmhub>("/speedalarmhub");
        });

这篇关于如何在.net Core项目中使用SignalR Core监听Postgresql数据库?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆