使用Akka流查看mongo集合 [英] using akka streams to go over mongo collection

查看:104
本文介绍了使用Akka流查看mongo集合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在mongo中有一个人集合,我想以流的形式遍历集合中的每个人,并为每个人调用一个执行api调用,更改模型并插入新集合的方法

I have a collection of people in mongo, and I want to go over each person in the collection as a stream, and for each person call a method that is performing api call, changing the model, and inserting to a new collection in mongo.

看起来像这样:

  def processPeople()(implicit m: Materializer): Future[Unit] = {

    val peopleSource: Source[Person, Future[State]] = collection.find(json()).cursor[Person]().documentSource()

    peopleSource.runWith(Sink.seq[Person]).map(people => {
      people.foreach(person => {
        changeModelAndInsertToNewCollection(person)
      }) 
    })
  }

但是不起作用...更改模型的部分似乎正在起作用,但是对mongo的插入不起作用。

but this is not working...the part of changing the model seems like is working, but the insert to mongo is not working.

看起来方法也未启动立即,开始之前需要进行一分钟的处理。您看到问题了吗?

It looks like also the method is not starting right away, there some processing going behind before for a min before it starts....do you see the issue?

推荐答案

解决方案1:

def changeModelAndInsertToNewCollection(person:Person) : Future[Boolean] ={
//Todo : call mongo api to update the person
???
}

def processPeople()(implicit m: Materializer): Future[Done] = {
val numberOfConcurrentUpdate = 10

val peopleSource: Source[Person, Future[State]] =
  collection
    .find(json())
    .cursor[Person]()
    .documentSource()

peopleSource
  .mapAsync(numberOfConcurrentUpdate)(changeModelAndInsertToNewCollection)
  withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
  .runWith(Sink.ignore)}

解决方案2:
使用 Alpakka 作为mongo的akka​​流连接器

Solution 2 : using Alpakka as akka stream connector for mongo

val source: Source[Document, NotUsed] =
MongoSource(collection.find(json()).cursor[Person]().documentSource())

source.runWith(MongoSink.updateOne(2, collection))

这篇关于使用Akka流查看mongo集合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆