如何使用Iteratees和普查员在播放项目将数据传送至S3? [英] How to use Iteratees and Enumerators in Play project to stream data to S3?

查看:239
本文介绍了如何使用Iteratees和普查员在播放项目将数据传送至S3?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我建立在Scala中一个播放框架应用程序,我想流的字节到S3的数组。我现在用的是播放-S3 库来做到这一点。而多部分文件上传的相关文档部分是什么与此有关:

I am building a Play Framework application in Scala where I would like to stream an array of bytes to S3. I am using the Play-S3 library to do this. The "Multipart file upload" of the documentation section is what's relevant here:

// Retrieve an upload ticket
val result:Future[BucketFileUploadTicket] =
  bucket initiateMultipartUpload BucketFile(fileName, mimeType)

// Upload the parts and save the tickets
val result:Future[BucketFilePartUploadTicket] =
  bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content))

// Complete the upload using both the upload ticket and the part upload tickets
val result:Future[Unit] =
  bucket completeMultipartUpload (uploadTicket, partUploadTickets)

我试图做同样的事情在我的应用程序,但与 Iteratee 枚举秒。

该流和异步性让事情变得有点复杂,但这里是我迄今为止(注 uploadTicket 前面的code定义):

The streams and asynchronicity make things a little complicated, but here is what I have so far (Note uploadTicket is defined earlier in the code):

val partNumberStream = Stream.iterate(1)(_ + 1).iterator
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) =>
  bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket))
}
(body |>>> partUploadTicketsIteratee).andThen {
  case result =>
    result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match {
      case Success(x) => x.map(d => println("Success"))
      case Failure(t) => throw t
    }
}

一切编译和运行无事故。事实上,成功被打印出来。的问题是,没有文件曾经显示上S3

Everything compiles and runs without incident. In fact, "Success" gets printed out. The problem is that no file ever shows up on S3.

推荐答案

可能有多个问题与您的code。这是一个有点不可读引起地图方法调用。你可能会与你的未来组成的一个问题。另一个问题可能是由于这一事实,所有的块(除了最后)应至少为5MB引起

There might be multiple problems with your code. It's a bit unreadable caused by the map method calls. You might have a problem with your future composition. Another problem might be caused by the fact that all chunks (except for the last) should be at least 5MB.

下面的code还没有测试过,但显示了不同的方法。该iteratee方法是,您可以创建小积木,撰写成操作的管道。

The code below has not been tested, but shows a different approach. The iteratee approach is one where you can create small building blocks and compose them into a pipe of operations.

为使我增加了一个特点和一些方法code编译

To make the code compile I added a trait and a few methods

trait BucketFilePartUploadTicket
val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
val body: Enumerator[Array[Byte]] = ???

在这里,我们创建了几个零件

Here we create a few parts

// Create 5MB chunks
val chunked = {
  val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
  Enumeratee.grouped(take5MB transform Iteratee.consume())
}

// Add a counter, used as part number later on
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
  case ((counter, _), bytes) => (counter + 1) -> bytes
}

// Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket
val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled)

// Construct the pipe to connect to the enumerator
// the ><> operator is an alias for compose, it is more intuitive because of 
// it's arrow like structure
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets

// Create a consumer that ends by finishing the upload
val consumeAndComplete = 
  Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload

运行它是通过简单地连接部位做

Running it is done by simply connecting the parts

// This is the result, a Future[Unit]
val result = body through pipe run consumeAndComplete 

请注意,我没有测试任何code,并可能已在我的方法的一些错误。然而,这显示了处理问题的方式不同,所以可能帮助你找到一个很好的解决方案。

Note that I did not test any code and might have made some mistakes in my approach. This however shows a different way of dealing with the problem and should probably help you to find a good solution.

请注意,这种方法会等待一个部分花费上的下一部分之前完成上载。如果从服务器到Amazon的连接比从浏览器到你的服务器这种机制会减缓输入的连接速度较慢。

Note that this approach waits for one part to complete upload before it takes on the next part. If the connection from your server to amazon is slower than the connection from the browser to you server this mechanism will slow the input.

您可以采取,你不等待未来的一部分上传来完成的另一种方法。这将导致在你使用 Future.sequence 来上传期货序列转换为包含结果的顺序一个未来的另一步。其结果将是一个机制,在将零件亚马逊,只要你有足够的数据。

You could take another approach where you do not wait for the Future of the part upload to complete. This would result in another step where you use Future.sequence to convert the sequence of upload futures into a single future containing a sequence of the results. The result would be a mechanism sending a part to amazon as soon as you have enough data.

这篇关于如何使用Iteratees和普查员在播放项目将数据传送至S3?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆