在Slick中反应式流如何用于插入数据 [英] How are reactive streams used in Slick for inserting data

查看:138
本文介绍了在Slick中反应式流如何用于插入数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Slick的文档中,提供了使用响应流的示例.仅用于读取数据,作为DatabasePublisher的一种方式.但是,当您要根据插入率将数据库用作接收器和后备保护时,会发生什么情况?

In Slick's documentation examples for using Reactive Streams are presented just for reading data as a means of a DatabasePublisher. But what happens when you want to use your database as a Sink and backpreasure based on your insertion rate?

我一直在寻找等效的 DatabaseSubscriber ,但它不存在.所以问题是,如果我有资料来源,请说:

I've looked for equivalent DatabaseSubscriber but it doesn't exist. So the question is, if I have a Source, say:

val source = Source(0 to 100)

val source = Source(0 to 100)

我该如何使用Slick创建一个将这些值写入具有模式的表的接收器:

how can I crete a Sink with Slick that writes those values into a table with schema:

create table NumberTable (value INT)

create table NumberTable (value INT)

推荐答案

序列插入

最简单的方法是在内部插入 Sink.foreach .

The easiest way would be to do inserts within a Sink.foreach.

假设您已使用模式代码生成并进一步假设您的表名为"NumberTable"

Assuming you've used the schema code generation and further assuming your table is named "NumberTable"

//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow} 

val numberTableDB = Database forConfig "NumberTableConfig"

我们可以编写一个执行插入的功能

We can write a function that does the insertion

def insertIntoDb(num : Int) = 
  numberTableDB run (Numbertable += NumbertableRow(num))

该功能可以放在接收器中

And that function can be placed in the Sink

val insertSink = Sink[Int] foreach insertIntoDb

Source(0 to 100) runWith insertSink

批量插入

您可以通过一次批处理N个插入来进一步扩展Sink方法:

You could further extend the Sink methodology by batching N inserts at a time:

def batchInsertIntoDb(nums : Seq[Int]) = 
  numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))

val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb

此批处理的接收器可以由Flow进行批处理分组:

This batched Sink can be fed by a Flow which does the batch grouping:

val batchSize = 10

Source(0 to 100).via(Flow[Int].grouped(batchSize))
                .runWith(batchInsertSink)

这篇关于在Slick中反应式流如何用于插入数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆