使用 Akka Stream 从数据库流式传输记录 [英] Stream records from DataBase using Akka Stream

查看:19
本文介绍了使用 Akka Stream 从数据库流式传输记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 Akka 的系统,该系统当前通过消息队列处理传入的流数据.当一条记录到达时,它被处理,mq 被确认并传递记录以在系统内进一步处理.

I have a system using Akka which currently handles incoming streaming data over message queues. When a record arrives then it is processed, mq is acked and record is passed on for further handling within the system.

现在我想添加对使用数据库作为输入的支持.
输入源能够处理数据库的方法是什么(应该以接收器可以处理的速度流式传输 > 100M 记录 - 所以我假设是反应性/akka-streams?)?

Now I would like to add support for using DBs as input.
What would be a way to go for the input source to be able to handle DB (should stream in > 100M records at the pace that the receiver can handle - so I presume reactive/akka-streams?)?

推荐答案

Slick Library

流畅的流媒体通常是这样做的.

稍微扩展一下光滑的文档以包含 akka 流:

Extending the slick documentation a bit to include akka streams:

//SELECT Name from Coffees
val q = for (c <- coffees) yield c.name

val action = q.result

type Name = String

val databasePublisher : DatabasePublisher[Name] = db stream action

import akka.stream.scaladsl.Source

val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher

现在 akkaSourceFromSlick 就像任何其他 akka 流 Source.

Now akkaSourceFromSlick is like any other akka stream Source.

老派"结果集

也可以使用普通的ResultSet,不花哨,作为 akka 流的引擎".我们将利用流 Source 可以从 Iterator 实例化这一事实.

It is also possible to use a plain ResultSet, without slick, as the "engine" for an akka stream. We will utilize the fact that a stream Source can be instantiated from an Iterator.

首先使用标准 jdbc 技术创建 ResultSet:

First create the ResultSet using standard jdbc techniques:

import java.sql._

val resultSetGenerator : () => Try[ResultSet] = Try {
  val statement : Statement = ???
  statement executeQuery "SELECT Name from Coffees"
}

当然所有的 ResultSet 实例都必须在第一行之前移动光标:

Of course all ResultSet instances have to move the cursor before the first row:

val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] = 
  (resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)

一旦我们开始遍历行,我们就必须从正确的列中提取值:

Once we start iterating through rows we'll have to pull the value from the correct column:

val getNameFromResultSet : ResultSet => Name = _ getString "Name"

现在我们可以实现Iterator接口来从一个ResultSet创建一个Iterator[Name]:

And now we can implement the Iterator Interface to create a Iterator[Name] from a ResultSet:

val convertResultSetToNameIterator : ResultSet => Iterator[Name] = 
  (resultSet) => new Iterator[Try[Name]] {
    override def hasNext : Boolean  = resultSet.next
    override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
   } flatMap (_.toOption)

最后,将所有部分粘合在一起以创建我们需要传递给 Source.fromIterator 的函数:

And finally, glue all the pieces together to create the function we'll need to pass to Source.fromIterator:

val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] = 
  (_ : () => Try[ResultSet])
    .andThen(_ flatMap adjustResultSetBeforeFirst) 
    .andThen(_ map convertResultSetToNameIterator) 
    .andThen(_ getOrElse Iterator.empty)

此迭代器现在可以提供源:

This Iterator can now feed a Source:

val akkaSourceFromResultSet : Source[Name, _] = 
  Source fromIterator resultSetGenToNameIterator(resultSetGenerator)

这个实现一直到数据库都是被动的.由于 ResultSet 一次预取有限数量的行,因此数据只会在流 Sink 发出信号请求时通过数据库从硬盘驱动器中取出.

This implementation is reactive all the way down to the database. Since the ResultSet pre-fetches a limited number of rows at a time, data will only come off the hard drive through the database as the stream Sink signals demand.

这篇关于使用 Akka Stream 从数据库流式传输记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆