在Spray中完成请求处理后,是否可以安装回调? [英] Is it possible to install a callback after request processing is finished in Spray?
问题描述
我正在尝试从Spray提供大型的临时文件。 HTTP请求完成后,我需要删除这些文件。到目前为止,我还没有找到一种方法...
I'm trying to serve large temporary files from Spray. I need to delete those files once HTTP request is complete. I could not find a way to do this so far...
我正在使用类似于这个或这个:
I'm using code similar to this or this:
respondWithMediaType(`text/csv`) {
path("somepath" / CsvObjectIdSegment) {
id =>
CsvExporter.export(id) { // loan pattern to provide temp file for this request
file =>
encodeResponse(Gzip) {
getFromFile(file)
}
}
}
}
因此,它实际上调用了 getFromFile
,从而在 Future $中完成了路线c $ c>。问题是,即使
,但存在相同的问题- Future
完成,Web请求仍未完成。我试图编写类似于 getFromFile
的函数,我会在 file.delete()
该未来
的> onComplete 未来
在客户端之前完成如果文件足够大,则完成下载文件。
So essentially it calls getFromFile
which completes the route in a Future
. The problem is that even if that Future
is complete the web request is not complete yet. I tried to write a function similar to getFromFile
and I would call file.delete()
in onComplete
of that Future
but it has the same problem - Future
completes before the client finished downloading the file if the file is large enough.
这里是Spray的 getFromFile
供参考:
Here is getFromFile
from Spray for reference:
/**
* Completes GET requests with the content of the given file. The actual I/O operation is
* running detached in a `Future`, so it doesn't block the current thread (but potentially
* some other thread !). If the file cannot be found or read the request is rejected.
*/
def getFromFile(file: File)(implicit settings: RoutingSettings,
resolver: ContentTypeResolver,
refFactory: ActorRefFactory): Route =
getFromFile(file, resolver(file.getName))
我不能使用 file.deleteOnExit ()
,因为JVM可能不会重新启动一段时间,并且临时文件将一直浪费在磁盘空间周围。
I can't use file.deleteOnExit()
because JVM might not be restarted for a while and temp files will be kept laying around wasting disk space.
一个更笼统的问题-是否可以在Spray中安装回调,以便在完成请求处理后可以释放资源或可以更新统计信息/日志,等等。
On the other hand it's a more general question - is there a way to install a callback in Spray so that when request processing is complete resources can be released or statistics/logs can be updated, etc.
推荐答案
感谢@VladimirPetrosyan提供的指针。这是我的实现方式:
Thanks to @VladimirPetrosyan for the pointer. Here is how I implemented it:
路线有此:
trait MyService extends HttpService ... with CustomMarshallers {
override def routeSettings = implicitly[RoutingSettings]
...
get {
respondWithMediaType(`text/csv`) {
path("somepath" / CsvObjectIdSegment) {
filterInstanceId => // just an ObjectId
val tempResultsFile = CsvExporter.saveCsvResultsToTempFile(filterInstanceId)
respondWithLastModifiedHeader(tempResultsFile.lastModified) {
encodeResponse(Gzip) {
complete(tempResultsFile)
}
}
}
}
以及我的特质混合在一起,就会进行编组,产生分段响应:
and the trait that I mix in that does the unmarshalling producing chunked response:
import akka.actor._
import spray.httpx.marshalling.{MarshallingContext, Marshaller}
import spray.http.{MessageChunk, ChunkedMessageEnd, HttpEntity, ContentType}
import spray.can.Http
import spray.http.MediaTypes._
import scala.Some
import java.io.{RandomAccessFile, File}
import spray.routing.directives.FileAndResourceDirectives
import spray.routing.RoutingSettings
import math._
trait CustomMarshallers extends FileAndResourceDirectives {
implicit def actorRefFactory: ActorRefFactory
implicit def routeSettings: RoutingSettings
implicit val CsvMarshaller =
Marshaller.of[File](`text/csv`) {
(file: File, contentType: ContentType, ctx: MarshallingContext) =>
actorRefFactory.actorOf {
Props {
new Actor with ActorLogging {
val defaultChunkSize = min(routeSettings.fileChunkingChunkSize, routeSettings.fileChunkingThresholdSize).toInt
private def getNumberOfChunks(file: File): Int = {
val randomAccessFile = new RandomAccessFile(file, "r")
try {
ceil(randomAccessFile.length.toDouble / defaultChunkSize).toInt
} finally {
randomAccessFile.close
}
}
private def readChunk(file: File, chunkIndex: Int): String = {
val randomAccessFile = new RandomAccessFile(file, "r")
val byteBuffer = new Array[Byte](defaultChunkSize)
try {
val seek = chunkIndex * defaultChunkSize
randomAccessFile.seek(seek)
val nread = randomAccessFile.read(byteBuffer)
if(nread == -1) ""
else if(nread < byteBuffer.size) new String(byteBuffer.take(nread))
else new String(byteBuffer)
} finally {
randomAccessFile.close
}
}
val chunkNum = getNumberOfChunks(file)
val responder: ActorRef = ctx.startChunkedMessage(HttpEntity(contentType, ""), Some(Ok(0)))(self)
sealed case class Ok(seq: Int)
def stop() = {
log.debug("Stopped CSV download handler actor.")
responder ! ChunkedMessageEnd
file.delete()
context.stop(self)
}
def sendCSV(seq: Int) =
if (seq < chunkNum)
responder ! MessageChunk(readChunk(file, seq)).withAck(Ok(seq + 1))
else
stop()
def receive = {
case Ok(seq) =>
sendCSV(seq)
case ev: Http.ConnectionClosed =>
log.debug("Stopping response streaming due to {}", ev)
}
}
}
}
}
}
创建临时文件,然后actor开始流式传输块。每当接收到来自客户端的响应时,它将发送一个块。每当客户端断开连接时,都会删除临时文件并关闭actor。
The temp file is created and then actor starts streaming chunks. It sends a chunk whenever response from client is received. Whenever client disconnects temp file is deleted and actor is shut down.
这要求您以 spray-can 运行应用,我认为
This requires you to run your app in spray-can and I think will not work if you run it in container.
一些有用的链接:
example1 , example2 ,文档
Some useful links: example1, example2, docs
这篇关于在Spray中完成请求处理后,是否可以安装回调?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!