使用Akka Stream从数据库流记录 [英] Stream records from DataBase using Akka Stream
问题描述
我有一个使用Akka的系统,该系统当前处理消息队列上的传入流数据。当记录到达然后被处理时,mq被确认并且记录被传递以在系统内进一步处理。
I have a system using Akka which currently handles incoming streaming data over message queues. When a record arrives then it is processed, mq is acked and record is passed on for further handling within the system.
现在,我想添加对使用DBs的支持。 input。
输入源能够处理DB的方法是什么(应该以接收器可以处理的速度流进> 100M条记录中-因此我假设是无功/ akka流?)?
Now I would like to add support for using DBs as input.
What would be a way to go for the input source to be able to handle DB (should stream in > 100M records at the pace that the receiver can handle - so I presume reactive/akka-streams?)?
推荐答案
光滑库
滑动流通常是这样完成的。
Slick streaming is how this is usually done.
扩展了漂亮的文档,以包含akka流:
Extending the slick documentation a bit to include akka streams:
//SELECT Name from Coffees
val q = for (c <- coffees) yield c.name
val action = q.result
type Name = String
val databasePublisher : DatabasePublisher[Name] = db stream action
import akka.stream.scaladsl.Source
val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher
现在 akkaSourceFromSlick
就像任何其他akka流 Source
。
Now akkaSourceFromSlick
is like any other akka stream Source
.
Old School结果集
也可以使用普通的 ResultSet
而不加任何修饰,作为akka流的引擎。我们将利用以下事实:可以从 Iterator
实例化流 Source
。
It is also possible to use a plain ResultSet
, without slick, as the "engine" for an akka stream. We will utilize the fact that a stream Source
can be instantiated from an Iterator
.
首先使用标准的jdbc技术创建ResultSet:
First create the ResultSet using standard jdbc techniques:
import java.sql._
val resultSetGenerator : () => Try[ResultSet] = Try {
val statement : Statement = ???
statement executeQuery "SELECT Name from Coffees"
}
当然所有ResultSet实例必须将光标移动到第一行之前:
Of course all ResultSet instances have to move the cursor before the first row:
val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] =
(resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)
一旦我们开始遍历我们必须从正确的列中提取值:
Once we start iterating through rows we'll have to pull the value from the correct column:
val getNameFromResultSet : ResultSet => Name = _ getString "Name"
现在我们可以实现 Iterator
接口,从结果集中创建 Iterator [Name]
:
And now we can implement the Iterator
Interface to create a Iterator[Name]
from a ResultSet:
val convertResultSetToNameIterator : ResultSet => Iterator[Name] =
(resultSet) => new Iterator[Try[Name]] {
override def hasNext : Boolean = resultSet.next
override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
} flatMap (_.toOption)
最后,将所有部分粘合在一起以创建我们需要的功能传递给 Source.fromIterator
:
And finally, glue all the pieces together to create the function we'll need to pass to Source.fromIterator
:
val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] =
(_ : () => Try[ResultSet])
.andThen(_ flatMap adjustResultSetBeforeFirst)
.andThen(_ map convertResultSetToNameIterator)
.andThen(_ getOrElse Iterator.empty)
此Iterator现在可以提供源代码:
This Iterator can now feed a Source:
val akkaSourceFromResultSet : Source[Name, _] =
Source fromIterator resultSetGenToNameIterator(resultSetGenerator)
此实现对整个数据库都是反应式的。由于ResultSet一次预取有限数量的行,因此数据流仅通过流 Sink
信号需求通过数据库从硬盘驱动器中取出。
This implementation is reactive all the way down to the database. Since the ResultSet pre-fetches a limited number of rows at a time, data will only come off the hard drive through the database as the stream Sink
signals demand.
这篇关于使用Akka Stream从数据库流记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!