@Tailable(spring-data-reactive-mongodb) 相当于 spring-data-r2dbc [英] @Tailable(spring-data-reactive-mongodb) equivalent in spring-data-r2dbc

查看:114
本文介绍了@Tailable(spring-data-reactive-mongodb) 相当于 spring-data-r2dbc的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 spring-data-r2dbc.我在 Postgresql 上试试这个.我之前尝试过 spring-data-mongodb-reactive.我忍不住比较两者.

I am trying my hands on spring-data-r2dbc. I am try this on Postgresql. I have tried spring-data-mongodb-reactive before. I couldn't help but to compare both.

我发现尚不支持查询派生.但我想知道 @Tailable 是否有等价物.通过这种方式,我会实时收到数据库更改的通知.任何人都可以分享与此相关的任何代码示例.

I see that Query Derivation is not yet supported. But I was wondering if there is an equivalent for @Tailable. This way I would be notified of the database changes in real time. Ca anyone share any code samples with respect to this.

我知道底层数据库应该支持这一点.我相信 Postgresql 确实支持这种使用逻辑解码的东西(如果我错了,请纠正我).

I understand that the underlying database should support this. I believe Postgresql does support this kinda thing using Logical Decoding(Correct me if I am wrong here).

在 spring-data-r2dbc 中是否有 @Tailable 等价物?

Is there a @Tailable equivalent in spring-data-r2dbc ?

推荐答案

我遇到了同样的问题,不确定您是否找到了解决方案,但我能够通过执行以下操作来完成类似的事情.首先,我在表中添加了触发器

I was on the same issue not sure if you found a solution or not but I was able to accomplish something similar by doing the following. First, I added trigger to my table

CREATE TRIGGER trigger_name
    AFTER INSERT OR DELETE OR UPDATE 
    ON table_name
    FOR EACH ROW
    EXECUTE PROCEDURE trigger_function_name;

每当更新、删除或插入行时,这将在表上设置触发器.然后它会调用我设置的触发器函数,它看起来像这样:

This will set a trigger on the table whenever a row, is updated, deleted, or inserted. Then it will call the trigger function I have set up which looked something like this:

CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS 
$BODY$
DECLARE
    payload JSON;
BEGIN
    payload = row_to_json(NEW);
    PERFORM pg_notify('notification_name', payload::text);
    RETURN NULL;
END;
$BODY$;

这将允许我监听"来自我的 Spring Boot 项目的这些更新中的任何一个,它会将整行作为有效负载发送.接下来,在我的 Spring Boot 项目中,我配置了一个到我的数据库的连接.

This will allow me to 'listen' to the any of these updates from my spring boot project and it will send the entire row as a payload. Next, in my spring boot project I configured a connection to my db.

@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
    @Override
    @Bean
    public ConnectionFactory connectionFactory() {
        return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
                .host("host")
                .database("db")
                .port(port)
                .username("username")
                .password("password")
                .schema("schema")
                .connectTimeout(Duration.ofMinutes(2))
                .build());
    }
}

我自动装配(依赖注入)到我的服务类的构造函数中,然后将它转换为 r2dbc PostgressqlConnection 类,如下所示:

With that I Autowire (dependency injection) it into the constructor in my service class and cast it to a r2dbc PostgressqlConnection class like so:

this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();

现在我们想监听"我们的表并在对我们的表执行一些更新时得到通知.为此,我们设置了一个初始化方法,该方法在依赖注入后使用@PostContruct 注释执行

Now we want to 'listen' to our table and get the notified when perform some update to our table. To do that we set up an initialization method that is performed after dependency injection by using the @PostContruct annotation

@PostConstruct
private void postConstruct() {
    postgresqlConnection.createStatement("LISTEN notification_name").execute()
            .flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}

请注意,我们会监听我们在 pg_notify 方法中放入的任何名称.我们还想设置一个方法来在 bean 即将被扔掉时关闭连接,如下所示:

Notice that we listen to whatever name we put inside the pg_notify method. Also we want to set up a method to close the the connection when the bean is about to be tossed away, like so:

@PreDestroy
private void preDestroy() {
    postgresqlConnection.close().subscribe();
}

现在我只是创建一个方法来返回当前表中任何内容的 Flux,并且我还将它与我的通知合并,正如我在通知作为 json 出现之前所说的那样,所以我不得不反序列化它,然后我决定使用 ObjectMapper.所以,它看起来像这样:

Now I simply create a method that returns a Flux of whatever is currently in my table, and I also merge it with my notifications, as I said before the notifications come in as a json, so I had to deserialize it and I decided to use ObjectMapper. So, it will look something like this:

private Flux<YourClass> getUpdatedRows() {
    return postgresqlConnection.getNotifications().map(notification -> {
        try {
            //deserialize json
            return objectMapper.readValue(notification.getParameter(), YourClass.class);
        } catch (IOException e) {
            //handle exception
        }
    });
}

public Flux<YourClass> getDocuments() {
    return documentRepository.findAll().share().concatWith(getUpdatedRows());
}

希望这会有所帮助.干杯!

Hope this helps. Cheers!

这篇关于@Tailable(spring-data-reactive-mongodb) 相当于 spring-data-r2dbc的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆