在Play Scala中使用迭代器和枚举器将数据流传输到S3 [英] Using Iteratees and Enumerators in Play Scala to Stream Data to S3

查看:90
本文介绍了在Play Scala中使用迭代器和枚举器将数据流传输到S3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Scala中构建一个Play Framework应用程序,我想在其中将字节数组流式传输到S3.我正在使用 Play-S3 库来执行此操作.文档部分的分段文件上传"与此处相关:

I am building a Play Framework application in Scala where I would like to stream an array of bytes to S3. I am using the Play-S3 library to do this. The "Multipart file upload" of the documentation section is what's relevant here:

// Retrieve an upload ticket
val result:Future[BucketFileUploadTicket] =
  bucket initiateMultipartUpload BucketFile(fileName, mimeType)

// Upload the parts and save the tickets
val result:Future[BucketFilePartUploadTicket] =
  bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content))

// Complete the upload using both the upload ticket and the part upload tickets
val result:Future[Unit] =
  bucket completeMultipartUpload (uploadTicket, partUploadTickets)

我正在尝试在应用程序中执行相同的操作,但是使用Iteratee s和Enumerator s.

I am trying to do the same thing in my application but with Iteratees and Enumerators.

流和异步性使事情变得有些复杂,但这是我到目前为止所拥有的(注意uploadTicket是在代码的前面定义的):

The streams and asynchronicity make things a little complicated, but here is what I have so far (Note uploadTicket is defined earlier in the code):

val partNumberStream = Stream.iterate(1)(_ + 1).iterator
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) =>
  bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket))
}
(body |>>> partUploadTicketsIteratee).andThen {
  case result =>
    result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match {
      case Success(x) => x.map(d => println("Success"))
      case Failure(t) => throw t
    }
}

所有内容都能编译运行,而不会发生任何事件.实际上,"Success"被打印,但是S3上没有文件显示.

Everything compiles and runs without incident. In fact, "Success" gets printed, but no file ever shows up on S3.

推荐答案

您的代码可能存在多个问题.这是由map方法调用引起的有点不可读.您将来的作品可能有问题.另一个问题可能是由于所有块(最后一个块除外)至少应为5MB的事实造成的.

There might be multiple problems with your code. It's a bit unreadable caused by the map method calls. You might have a problem with your future composition. Another problem might be caused by the fact that all chunks (except for the last) should be at least 5MB.

下面的代码尚未经过测试,但是显示了另一种方法.迭代方法是一种您可以创建小构件并将其组成一系列操作的方法.

The code below has not been tested, but shows a different approach. The iteratee approach is one where you can create small building blocks and compose them into a pipe of operations.

为了使代码编译,我添加了一个特征和一些方法

To make the code compile I added a trait and a few methods

trait BucketFilePartUploadTicket
val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
val body: Enumerator[Array[Byte]] = ???

在这里我们创建一些部分

Here we create a few parts

// Create 5MB chunks
val chunked = {
  val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
  Enumeratee.grouped(take5MB transform Iteratee.consume())
}

// Add a counter, used as part number later on
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
  case ((counter, _), bytes) => (counter + 1) -> bytes
}

// Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket
val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled)

// Construct the pipe to connect to the enumerator
// the ><> operator is an alias for compose, it is more intuitive because of 
// it's arrow like structure
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets

// Create a consumer that ends by finishing the upload
val consumeAndComplete = 
  Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload

只需连接零件即可运行

// This is the result, a Future[Unit]
val result = body through pipe run consumeAndComplete 

请注意,我没有测试任何代码,并且可能在我的方法中犯了一些错误.但是,这显示了解决问题的另一种方式,可能应该可以帮助您找到一个好的解决方案.

Note that I did not test any code and might have made some mistakes in my approach. This however shows a different way of dealing with the problem and should probably help you to find a good solution.

请注意,此方法要等待一部分完成上传,然后再进行下一部分.如果从服务器到亚马逊的连接速度比从浏览器到服务器的连接速度慢,则此机制将减慢输入速度.

Note that this approach waits for one part to complete upload before it takes on the next part. If the connection from your server to amazon is slower than the connection from the browser to you server this mechanism will slow the input.

您可以采用另一种方法,不等待零件上传的Future完成.这将导致下一步,您可以使用Future.sequence将上传期货的序列转换为包含结果序列的单个期货.结果将是一种机制,只要您有足够的数据就可以将零件发送到亚马逊.

You could take another approach where you do not wait for the Future of the part upload to complete. This would result in another step where you use Future.sequence to convert the sequence of upload futures into a single future containing a sequence of the results. The result would be a mechanism sending a part to amazon as soon as you have enough data.

这篇关于在Play Scala中使用迭代器和枚举器将数据流传输到S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆