@Tailable(spring-data-reactive-mongodb) 相当于 spring-data-r2dbc [英] @Tailable(spring-data-reactive-mongodb) equivalent in spring-data-r2dbc
问题描述
我正在尝试使用 spring-data-r2dbc.我在 Postgresql 上试试这个.我之前尝试过 spring-data-mongodb-reactive.我忍不住比较两者.
I am trying my hands on spring-data-r2dbc. I am try this on Postgresql. I have tried spring-data-mongodb-reactive before. I couldn't help but to compare both.
我发现尚不支持查询派生.但我想知道 @Tailable
是否有等价物.通过这种方式,我会实时收到数据库更改的通知.任何人都可以分享与此相关的任何代码示例.
I see that Query Derivation is not yet supported. But I was wondering if there is an equivalent for @Tailable
. This way I would be notified of the database changes in real time. Ca anyone share any code samples with respect to this.
我知道底层数据库应该支持这一点.我相信 Postgresql 确实支持这种使用逻辑解码的东西(如果我错了,请纠正我).
I understand that the underlying database should support this. I believe Postgresql does support this kinda thing using Logical Decoding(Correct me if I am wrong here).
在 spring-data-r2dbc 中是否有 @Tailable
等价物?
Is there a @Tailable
equivalent in spring-data-r2dbc ?
推荐答案
我遇到了同样的问题,不确定您是否找到了解决方案,但我能够通过执行以下操作来完成类似的事情.首先,我在表中添加了触发器
I was on the same issue not sure if you found a solution or not but I was able to accomplish something similar by doing the following. First, I added trigger to my table
CREATE TRIGGER trigger_name
AFTER INSERT OR DELETE OR UPDATE
ON table_name
FOR EACH ROW
EXECUTE PROCEDURE trigger_function_name;
每当更新、删除或插入行时,这将在表上设置触发器.然后它会调用我设置的触发器函数,它看起来像这样:
This will set a trigger on the table whenever a row, is updated, deleted, or inserted. Then it will call the trigger function I have set up which looked something like this:
CREATE FUNCTION trigger_function_name
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS
$BODY$
DECLARE
payload JSON;
BEGIN
payload = row_to_json(NEW);
PERFORM pg_notify('notification_name', payload::text);
RETURN NULL;
END;
$BODY$;
这将允许我监听"来自我的 Spring Boot 项目的这些更新中的任何一个,它会将整行作为有效负载发送.接下来,在我的 Spring Boot 项目中,我配置了一个到我的数据库的连接.
This will allow me to 'listen' to the any of these updates from my spring boot project and it will send the entire row as a payload. Next, in my spring boot project I configured a connection to my db.
@Configuration
@EnableR2dbcRepositories("com.(point to wherever repository is)")
public class R2DBCConfig extends AbstractR2dbcConfiguration {
@Override
@Bean
public ConnectionFactory connectionFactory() {
return new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("host")
.database("db")
.port(port)
.username("username")
.password("password")
.schema("schema")
.connectTimeout(Duration.ofMinutes(2))
.build());
}
}
我自动装配(依赖注入)到我的服务类的构造函数中,然后将它转换为 r2dbc PostgressqlConnection 类,如下所示:
With that I Autowire (dependency injection) it into the constructor in my service class and cast it to a r2dbc PostgressqlConnection class like so:
this.postgresqlConnection = Mono.from(connectionFactory.create()).cast(PostgresqlConnection.class).block();
现在我们想监听"我们的表并在对我们的表执行一些更新时得到通知.为此,我们设置了一个初始化方法,该方法在依赖注入后使用@PostContruct 注释执行
Now we want to 'listen' to our table and get the notified when perform some update to our table. To do that we set up an initialization method that is performed after dependency injection by using the @PostContruct annotation
@PostConstruct
private void postConstruct() {
postgresqlConnection.createStatement("LISTEN notification_name").execute()
.flatMap(PostgresqlResult::getRowsUpdated).subscribe();
}
请注意,我们会监听我们在 pg_notify 方法中放入的任何名称.我们还想设置一个方法来在 bean 即将被扔掉时关闭连接,如下所示:
Notice that we listen to whatever name we put inside the pg_notify method. Also we want to set up a method to close the the connection when the bean is about to be tossed away, like so:
@PreDestroy
private void preDestroy() {
postgresqlConnection.close().subscribe();
}
现在我只是创建一个方法来返回当前表中任何内容的 Flux,并且我还将它与我的通知合并,正如我在通知作为 json 出现之前所说的那样,所以我不得不反序列化它,然后我决定使用 ObjectMapper.所以,它看起来像这样:
Now I simply create a method that returns a Flux of whatever is currently in my table, and I also merge it with my notifications, as I said before the notifications come in as a json, so I had to deserialize it and I decided to use ObjectMapper. So, it will look something like this:
private Flux<YourClass> getUpdatedRows() {
return postgresqlConnection.getNotifications().map(notification -> {
try {
//deserialize json
return objectMapper.readValue(notification.getParameter(), YourClass.class);
} catch (IOException e) {
//handle exception
}
});
}
public Flux<YourClass> getDocuments() {
return documentRepository.findAll().share().concatWith(getUpdatedRows());
}
希望这会有所帮助.干杯!
Hope this helps. Cheers!
这篇关于@Tailable(spring-data-reactive-mongodb) 相当于 spring-data-r2dbc的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!