使用play.api.libs.ws批处理请求 [英] Batching requests with play.api.libs.ws
问题描述
我有一个脚本,它会发出很多Web请求(〜300000).看起来像这样
I have a script which makes a lot of web requests (~300000). It looks something like this
// Setup a new wsClient
val config = new NingAsyncHttpClientConfigBuilder(DefaultWSClientConfig()).build
val builder = new AsyncHttpClientConfig.Builder(config)
val wsClient = new NingWSClient(builder.build)
// Each of these use the wsClient
def getAs: Future[Seq[A]] = { ... }
def getBs: Future[Seq[B]] = { ... }
def getCs: Future[Seq[C]] = { ... }
def getDs: Future[Seq[D]] = { ... }
(for {
as <- getAs
bs <- getBs
cs <- getCs
ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))
问题在于我会遇到Too many open files
异常,因为每个函数都异步发出数千个请求,每个请求都使用一个文件描述符.
The problem is that I will run into a Too many open files
exception because each function is asynchronously making thousands of requests, each of which uses a file descriptor.
我尝试重新组织我的功能,以便每个人都可以使用自己的客户端进行批处理:
I tried re-organizing my functions so that each one would make batches with their own client:
def getAs: Future[Seq[A]] = {
someCollection.group(1000).map(batch => {
val client = new NingWSClient(builder.build) // Make a new client for every batch
Future.sequence(batch.map(thing => {
wsClient.url(...).map(...)
})).map(things => {
wsClient.close // Close the client
things
})
})
}
但这会导致理解力提早结束(没有任何错误消息或异常):
But this causes the for-comprehension to end early (without any error messages or exceptions):
(for {
as <- getAs
bs <- getBs // This doesn't happen
cs <- getCs // Or any of the following ones
ds <- getDs
} yield (as, bs, cs, ds)).map(tuple => println("done"))
我只是在寻找正确的方法来发出大量的http请求,而不必打开太多的文件描述符.
I am just looking for the right way to make a large number of http requests without having to open too many file descriptors.
推荐答案
我遇到了类似的问题,对一个Web服务的请求过多(超过500个).
带有分组的代码示例几乎是正确的,但是,您将得到Iterator[Future[List[Int]]]
或Future.sequence
-d Future[Iterator[List[Int]]]
.但是,我认为它们全部将异步运行.您需要先点火,然后flatMap
点火(直到完成),然后再点火下一批.这是我设法写的,遵循这个答案:
I had a similar problem, too many requests for one web service(~500+).
Your code example with grouping is almost correct, however, you will get Iterator[Future[List[Int]]]
or if you Future.sequence
-d it Future[Iterator[List[Int]]]
. But, I think that they all will run asynchronously. You need to fire first batch and then flatMap
it(wait until it's finished) and then fire next batch. This is what I've managed to write, following this answer:
val futureIterator = list.grouped(50).foldLeft(Future.successful[List[Int]](Nil)) {
(fItems, items) =>
fItems flatMap { processed =>
println("PROCESSED: " + processed); println("SPAWNED: " + items);
Future.traverse(items)(getFuture) map (res => processed ::: res)
}
}
println(Await.result(futureIterator, Duration.Inf))
希望这会有所帮助!
这篇关于使用play.api.libs.ws批处理请求的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!