在 Play Scala 中使用迭代器和枚举器将数据流式传输到 S3 [英] Using Iteratees and Enumerators in Play Scala to Stream Data to S3

查看:24
本文介绍了在 Play Scala 中使用迭代器和枚举器将数据流式传输到 S3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在 Scala 中构建一个 Play Framework 应用程序,我想在其中将字节数组流式传输到 S3.我正在使用 Play-S3 库来执行此操作.文档部分的分段文件上传"与此处相关:

I am building a Play Framework application in Scala where I would like to stream an array of bytes to S3. I am using the Play-S3 library to do this. The "Multipart file upload" of the documentation section is what's relevant here:

// Retrieve an upload ticket
val result:Future[BucketFileUploadTicket] =
  bucket initiateMultipartUpload BucketFile(fileName, mimeType)

// Upload the parts and save the tickets
val result:Future[BucketFilePartUploadTicket] =
  bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content))

// Complete the upload using both the upload ticket and the part upload tickets
val result:Future[Unit] =
  bucket completeMultipartUpload (uploadTicket, partUploadTickets)

我试图在我的应用程序中做同样的事情,但使用 Iteratees 和 Enumerators.

I am trying to do the same thing in my application but with Iteratees and Enumerators.

流和异步性让事情变得有点复杂,但这是我目前所拥有的(注意 uploadTicket 在代码的前面定义):

The streams and asynchronicity make things a little complicated, but here is what I have so far (Note uploadTicket is defined earlier in the code):

val partNumberStream = Stream.iterate(1)(_ + 1).iterator
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) =>
  bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket))
}
(body |>>> partUploadTicketsIteratee).andThen {
  case result =>
    result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match {
      case Success(x) => x.map(d => println("Success"))
      case Failure(t) => throw t
    }
}

一切都编译并运行,没有任何意外.事实上,"Success" 会被打印出来,但 S3 上从来没有出现过任何文件.

Everything compiles and runs without incident. In fact, "Success" gets printed, but no file ever shows up on S3.

推荐答案

您的代码可能存在多个问题.map 方法调用造成的有点不可读.您将来的作文可能有问题.另一个问题可能是由于所有块(最后一个除外)都应至少为 5MB.

There might be multiple problems with your code. It's a bit unreadable caused by the map method calls. You might have a problem with your future composition. Another problem might be caused by the fact that all chunks (except for the last) should be at least 5MB.

下面的代码未经测试,但显示了不同的方法.迭代方法是一种您可以创建小的构建块并将它们组合成操作管道的方法.

The code below has not been tested, but shows a different approach. The iteratee approach is one where you can create small building blocks and compose them into a pipe of operations.

为了使代码编译,我添加了一个特征和一些方法

To make the code compile I added a trait and a few methods

trait BucketFilePartUploadTicket
val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
val body: Enumerator[Array[Byte]] = ???

这里我们创建了几个部分

Here we create a few parts

// Create 5MB chunks
val chunked = {
  val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
  Enumeratee.grouped(take5MB transform Iteratee.consume())
}

// Add a counter, used as part number later on
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
  case ((counter, _), bytes) => (counter + 1) -> bytes
}

// Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket
val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled)

// Construct the pipe to connect to the enumerator
// the ><> operator is an alias for compose, it is more intuitive because of 
// it's arrow like structure
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets

// Create a consumer that ends by finishing the upload
val consumeAndComplete = 
  Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload

运行它只需简单地连接部件

Running it is done by simply connecting the parts

// This is the result, a Future[Unit]
val result = body through pipe run consumeAndComplete 

请注意,我没有测试任何代码,可能在我的方法中犯了一些错误.然而,这显示了处理问题的不同方式,应该可以帮助您找到一个好的解决方案.

Note that I did not test any code and might have made some mistakes in my approach. This however shows a different way of dealing with the problem and should probably help you to find a good solution.

请注意,此方法会等待一个部分完成上传,然后再进行下一部分.如果从您的服务器到 amazon 的连接比从浏览器到您的服务器的连接慢,则此机制会减慢输入速度.

Note that this approach waits for one part to complete upload before it takes on the next part. If the connection from your server to amazon is slower than the connection from the browser to you server this mechanism will slow the input.

您可以采取另一种方法,即不等待部分上传的 Future 完成.这将导致另一个步骤,您使用 Future.sequence 将上传期货序列转换为包含结果序列的单个期货.结果将是一种机制,一旦您有足够的数据,就会将零件发送到亚马逊.

You could take another approach where you do not wait for the Future of the part upload to complete. This would result in another step where you use Future.sequence to convert the sequence of upload futures into a single future containing a sequence of the results. The result would be a mechanism sending a part to amazon as soon as you have enough data.

这篇关于在 Play Scala 中使用迭代器和枚举器将数据流式传输到 S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆