上传文件直接到S3块逐块用使用Iteratees播放斯卡拉 [英] Upload Files directly to S3 chunk-by-chunk using Play Scala using Iteratees

查看:322
本文介绍了上传文件直接到S3块逐块用使用Iteratees播放斯卡拉的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我曾经妄图直接上传文件使用Iteratees到S3。我还是新的函数式编程,并发现很难拼凑一些工作code。

我已经写它上传文件的过程块,并把它们发送到S3的iteratee。上传失败,在最后一个错误。

请帮我解决这个问题。

下面是code,我想出了

控制器处理器

 高清uploadFile = Action.async(BodyParser(RH => S3UploadHelper(斗名)s3Iteratee())){隐含请求=>
    未来 {
      如果(uploadLogger.isInfoEnabled)uploadLogger.info(的上传文件的内容:: \ N+ request.body)
      好吧(views.html.index(文件上传))
    }
  }
 

助手类

 案例类S3UploadHelper(斗:字符串键值:字符串= UUID.generate()){

  私人VAL AWS_ACCESS_KEY =
  私人VAL AWS_SECRET_KEY =
  私人VAL yourAWSCredentials =新BasicAWSCredentials(AWS_ACCESS_KEY,AWS_SECRET_KEY)
  VAL amazonS3Client =新AmazonS3Client(yourAWSCredentials)


  私人VAL initRequest =新InitiateMultipartUploadRequest(桶,键)
  私人VAL initResponse = amazonS3Client.initiateMultipartUpload(ini​​tRequest)
  VAL uploadId = initResponse.getUploadId

  VAL uploadLogger =记录仪(上载)

  高清s3Iteratee(ETag的:序号[PartETag] = Seq.empty [PartETag]):Iteratee [数组(字节),要么[结果,CompleteMultipartUploadResult] = {续
    案例:萨尔瓦多[数组[字节] =>
      //检索尚未在previous块处理的一部分,并将其复制到当前块的前
      VAL uploadRequest =新UploadPartRequest()
        .withBucketName(桶)
        .withKey(钥匙)
        .withPartNumber(etags.length + 1)的
        .withUploadId(uploadId)
        .withInputStream(新ByteArrayInputStream的(in.e))
        .withPartSize(in.e.length)
      如果(uploadLogger.isDebugEnabled)uploadLogger.debug(>>中+将String.valueOf(in.e))
      VAL ETAG =未来{amazonS3Client.uploadPart(uploadRequest).getPartETag}
      etag.map(ETag的:+ _)
      Await.result(ETAG,1.seconds)
      s3Iteratee(ETag的)
    案例@空=> s3Iteratee(ETag的)
    案例@ EOF =>
      进口scala.collection.JavaConversions._
      VAL COM prequest =新CompleteMultipartUploadRequest(桶,钥匙,uploadId,etags.toList)
      VAL结果= amazonS3Client.completeMultipartUpload(COM prequest)
      完成(右(结果),中)
    案例=> s3Iteratee(ETag的)
  }

}
 

虽然Iteratee似乎工作,我能够通过块处理文件块,上传失败,一个奇怪的错误。以下是日志

  [调试]上传 - >> [B @ 1df9048d
[调试]上传 - >> [B @ 152dcf59
[调试]上传 - >> [B @ 7cfeb0d8
[调试]上传 - >> [B @ 136844c5
[调试]上传 - >> [B @ 16f41590
[调试]上传 - >> [B @ 6dd85710
[调试]上传 - >> [B @ 64294203
[调试]上传 - >> [B @ 35366c2f
[调试]上传 - >> [B @ 358a78c
[调试]上传 - >> [B @ 2c171020
[调试]上传 - >> [B @ 20076fb
[调试]上传 - >> [B @ 4d13580
[调试]上传 - >> [B @ 42738651
[调试]上传 - >> [B @ 5671082f
[调试]上传 - >> [B @ 57c70bb4
[调试]上传 - >> [B @ 4154394f
[调试]上传 - >> [B @ 4f93cf15
[调试]上传 - >> [B @ 4bac523f
[调试]上传 - >> [B @ eaec52e
[调试]上传 - >> [B @ 6ed00bf5
[调试]上传 - >> [B @ 3f6a8a5d
[调试]上传 - >> [B @ 16fe1a25
[调试]上传 - >> [B @ 6e813a61
[调试]上传 - >> [B @ e01be7
[调试]上传 - >> [B @ 6bb351c4
[调试]上传 - >> [B @ dfa51a5
[调试]上传 - >> [B @ 6acf2049
[调试]上传 - >> [B @ 6a7021d4
[调试]上传 - >> [B @ 1b3c602f
[调试]上传 - >> [B @ 44146d94
[调试]上传 - >> [B @ 574ac037
[调试]上传 - >> [B @ 3cdf258b
[调试]上传 - >> [B @ 441a0727
[调试]上传 - >> [B @ 2385aafd
[调试]上传 - >> [B @ 224f9dc2
[调试]上传 - >> [B @ 6779077d
[调试]上传 - >> [B @ 734e178a
[调试]上传 - >> [B @ 7d92895c
[调试]上传 - >> [B @ 23edaaa1
[调试]上传 - >> [B @ c00134e
[调试]上传 - >> [B @ ff1a703
[错误]玩 - 无法调用动作,最终得到了一个错误:状态code:400,AWS服务:亚马逊S3,AWS请求ID:98h72s0EBA7653AD,AWS错误code:MalformedXML,AWS错误消息:XML你只要不是良好的,或没有验证对我们发布的架构,S3扩展请求ID:R7e44g8oRy5b4xd7MU ++ atibwrBSRFezeMxNCXE38gyzcwci5Zf
[错误]应用 - 

! @ 6k2maob49  - 内部服务器错误,(POST)/ V1 / file_upload]  - >

play.api.Application $$不久$ 1:执行异常[AmazonS3Exception:您提供的XML并没有良好的或没有验证对我们发布的架构]
        在play.api.Application $ class.handleError(Application.scala:296)〜[play_2.10-2.3.2.jar:2.3.2]
        在play.api.DefaultApplication.handleError(Application.scala:402)[play_2.10-2.3.2.jar:2.3.2]
        在play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        在play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        在scala.Option.map(Option.scala:145)[斯卡拉-library.jar:NA]
致:com.amazonaws.services.s3.model.AmazonS3Exception:您提供的XML并没有良好的或没有验证对我们发布的架构
        在com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:556)〜[AWS-java的SDK-1.3.11.jar:NA]
        在com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:289)〜[AWS-java的SDK-1.3.11.jar:NA]
        在com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:170)〜[AWS-java的SDK-1.3.11.jar:NA]
        在com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:2723)〜[AWS-java的SDK-1.3.11.jar:NA]
        在com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:1964)〜[AWS-java的SDK-1.3.11.jar:NA]
 

解决方案

我已经做到了这一点,亚马逊S3需要5MB的数据块,我回来元组在最后,你可以改变按您的要求。

  VAL客户端=新AmazonS3Client(新BasicAWSCredentials(access_key,SECRET_KEY))

高清my_parser = BodyParser {

VAL consume_5MB = Traversable.takeUpTo [数组[字节]](1024 * 1024 * 5)及>> Iteratee.consume()
VAL rechunkAdapter:Enumeratee [数组[字节]数组[字节] = Enumeratee.grouped(consume_5MB)

multipartFormData(Multipart.handleFilePart({

  案例Multipart.FileInfo(零件名称,FILE_NAME,CONTENT_TYPE)=> {

    VAL的object_id = java.util.UUID.randomUUID()的toString()的replaceAll。( - ,)
    VAL object_id_key = IF(content_type.getOrElse()。包括(视频)|| content_type.getOrElse()。包括(声音))OBJECT_ID其他OBJECT_ID + file_name.substring(file_name.lastIndexOf('。 '))
    VAR位置= 0
    VAL的ETag =新的java.util.ArrayList [PartETag]()

    VAL initRequest =新InitiateMultipartUploadRequest(桶,object_id_key)
    VAL initResponse = client.initiateMultipartUpload(ini​​tRequest)
    的println(文件名=+ FILE_NAME)
    的println(的contentType =+ CONTENT_TYPE)

    (rechunkAdapter和放大器;>> Iteratee.foldM [数组(字节),INT](1){(C,字节)=>
      未来 {
        的println(得到了一大块!+ C +大小(KB):+(bytes.length / 1024));
        VAL是=新java.io.ByteArrayInputStream中(字节)

        VAL uploadRequest =新UploadPartRequest()
          .withBucketName(桶).w​​ithKey(object_id_key)
          .withUploadId(ini​​tResponse.getUploadId())
          .withPartNumber(三)
          .withFileOffset(位置)
          .withInputStream(是)
          .withPartSize(bytes.length)

        etags.add(client.uploadPart(uploadRequest).getPartETag)
        位置=位置+ bytes.length

        C + 1
      }
    })地图{V =>
      尝试 {
        VAL COM prequest =新CompleteMultipartUploadRequest(
          桶,
          object_id_key,
          initResponse.getUploadId(),
          ETag的)
        client.completeMultipartUpload(COM prequest)
        的println(上传完毕+ FILE_NAME)
        client.setObjectAcl(桶,object_id_key,com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead)
        (object_id_key,FILE_NAME,content_type.getOrElse(应用程序/八位字节流))
      } 抓住 {
        方案E:异常=> {
          的println(S3上传前+ e.getMessage())
          VAL abortMPURequest =新AbortMultipartUploadRequest(XXXXXXX,OBJECT_ID,initResponse.getUploadId())
          client.abortMultipartUpload(abortMPURequest);
         (错误,FILE_NAME,content_type.getOrElse(应用程序/八位字节流))
        }
      }
    }
  }
}))
 

}

I have tried in vain to upload files directly to s3 using Iteratees. I am still new to functional programming, and finding it hard to piece together some working code.

I have written an iteratee which process chunks of the uploaded file and sends them to S3. The upload fails at the end with an error.

Please help me fix this.

Below is the code I came up with

Controller Handler

  def uploadFile = Action.async(BodyParser(rh => S3UploadHelper("bucket-name").s3Iteratee() ))  { implicit request =>
    Future {
      if(uploadLogger.isInfoEnabled) uploadLogger.info(s"Contents of Uploaded file :: \n " + request.body)
      Ok(views.html.index("File uploaded"))
    }
  }

Helper Class

case class S3UploadHelper(bucket: String, key: String = UUID.generate()) {

  private val AWS_ACCESS_KEY = ""
  private val AWS_SECRET_KEY = ""
  private val yourAWSCredentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY)
  val amazonS3Client = new AmazonS3Client(yourAWSCredentials)


  private val initRequest = new InitiateMultipartUploadRequest(bucket, key)
  private val initResponse = amazonS3Client.initiateMultipartUpload(initRequest)
  val uploadId = initResponse.getUploadId

  val uploadLogger = Logger("upload")

  def s3Iteratee(etags: Seq[PartETag] = Seq.empty[PartETag]): Iteratee[Array[Byte], Either[Result, CompleteMultipartUploadResult]] = Cont {
    case in: El[Array[Byte]] =>
      // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk
      val uploadRequest = new UploadPartRequest()
        .withBucketName(bucket)
        .withKey(key)
        .withPartNumber(etags.length + 1)
        .withUploadId(uploadId)
        .withInputStream(new ByteArrayInputStream(in.e))
        .withPartSize(in.e.length)
      if(uploadLogger.isDebugEnabled) uploadLogger.debug(">> " + String.valueOf(in.e))
      val etag = Future { amazonS3Client.uploadPart(uploadRequest).getPartETag }
      etag.map(etags :+ _)
      Await.result(etag, 1.seconds)
      s3Iteratee(etags)
    case in @ Empty => s3Iteratee(etags)
    case in @ EOF =>
      import scala.collection.JavaConversions._
      val compRequest = new CompleteMultipartUploadRequest(bucket, key, uploadId, etags.toList)
      val result = amazonS3Client.completeMultipartUpload(compRequest)
      Done(Right(result), in)
    case in => s3Iteratee(etags)
  }

}

Although the Iteratee seems to work and I am able to process the file chunk by chunk, the upload fails with a weird error. Here are the logs

[debug] upload - >> [B@1df9048d
[debug] upload - >> [B@152dcf59
[debug] upload - >> [B@7cfeb0d8
[debug] upload - >> [B@136844c5
[debug] upload - >> [B@16f41590
[debug] upload - >> [B@6dd85710
[debug] upload - >> [B@64294203
[debug] upload - >> [B@35366c2f
[debug] upload - >> [B@358a78c
[debug] upload - >> [B@2c171020
[debug] upload - >> [B@20076fb
[debug] upload - >> [B@4d13580
[debug] upload - >> [B@42738651
[debug] upload - >> [B@5671082f
[debug] upload - >> [B@57c70bb4
[debug] upload - >> [B@4154394f
[debug] upload - >> [B@4f93cf15
[debug] upload - >> [B@4bac523f
[debug] upload - >> [B@eaec52e
[debug] upload - >> [B@6ed00bf5
[debug] upload - >> [B@3f6a8a5d
[debug] upload - >> [B@16fe1a25
[debug] upload - >> [B@6e813a61
[debug] upload - >> [B@e01be7
[debug] upload - >> [B@6bb351c4
[debug] upload - >> [B@dfa51a5
[debug] upload - >> [B@6acf2049
[debug] upload - >> [B@6a7021d4
[debug] upload - >> [B@1b3c602f
[debug] upload - >> [B@44146d94
[debug] upload - >> [B@574ac037
[debug] upload - >> [B@3cdf258b
[debug] upload - >> [B@441a0727
[debug] upload - >> [B@2385aafd
[debug] upload - >> [B@224f9dc2
[debug] upload - >> [B@6779077d
[debug] upload - >> [B@734e178a
[debug] upload - >> [B@7d92895c
[debug] upload - >> [B@23edaaa1
[debug] upload - >> [B@c00134e
[debug] upload - >> [B@ff1a703
[error] play - Cannot invoke the action, eventually got an error: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 98h72s0EBA7653AD, AWS Error Code: MalformedXML, AWS Error Message: The XML you provided was not well-formed or did not validate against our published schema, S3 Extended Request ID: R7e44g8oRy5b4xd7MU++atibwrBSRFezeMxNCXE38gyzcwci5Zf
[error] application - 

! @6k2maob49 - Internal server error, for (POST) [/v1/file_upload] ->

play.api.Application$$anon$1: Execution exception[[AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema]]
        at play.api.Application$class.handleError(Application.scala:296) ~[play_2.10-2.3.2.jar:2.3.2]
        at play.api.DefaultApplication.handleError(Application.scala:402) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$3$$anonfun$applyOrElse$4.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.10-2.3.2.jar:2.3.2]
        at scala.Option.map(Option.scala:145) [scala-library.jar:na]
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The XML you provided was not well-formed or did not validate against our published schema
        at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:556) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:289) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:170) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:2723) ~[aws-java-sdk-1.3.11.jar:na]
        at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:1964) ~[aws-java-sdk-1.3.11.jar:na]

解决方案

I have done this in past, Amazon s3 needs 5Mb chunks, I was returning tuple at last, you could change as per your requirement.

val client = new AmazonS3Client(new BasicAWSCredentials(access_key, secret_key))

def my_parser = BodyParser { 

val consume_5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) &>> Iteratee.consume()
val rechunkAdapter: Enumeratee[Array[Byte], Array[Byte]] = Enumeratee.grouped(consume_5MB)

multipartFormData(Multipart.handleFilePart({

  case Multipart.FileInfo(partName, file_name, content_type) => {

    val object_id = java.util.UUID.randomUUID().toString().replaceAll("-", "")
    val object_id_key = if (content_type.getOrElse("").contains("video") || content_type.getOrElse("").contains("audio")) object_id else object_id + file_name.substring(file_name.lastIndexOf('.'))
    var position = 0
    val etags = new java.util.ArrayList[PartETag]()

    val initRequest = new InitiateMultipartUploadRequest(bucket, object_id_key)
    val initResponse = client.initiateMultipartUpload(initRequest)
    println("fileName = " + file_name)
    println("contentType = " + content_type)

    (rechunkAdapter &>> Iteratee.foldM[Array[Byte], Int](1) { (c, bytes) =>
      Future {
        println("got a chunk!  :" + c + " size in KB: " + (bytes.length / 1024));
        val is = new java.io.ByteArrayInputStream(bytes)

        val uploadRequest = new UploadPartRequest()
          .withBucketName(bucket).withKey(object_id_key)
          .withUploadId(initResponse.getUploadId())
          .withPartNumber(c)
          .withFileOffset(position)
          .withInputStream(is)
          .withPartSize(bytes.length)

        etags.add(client.uploadPart(uploadRequest).getPartETag)
        position = position + bytes.length

        c + 1
      }
    }).map { v =>
      try {
        val compRequest = new CompleteMultipartUploadRequest(
          bucket,
          object_id_key,
          initResponse.getUploadId(),
          etags)
        client.completeMultipartUpload(compRequest)
        println("Finished uploading " + file_name)   
        client.setObjectAcl(bucket, object_id_key, com.amazonaws.services.s3.model.CannedAccessControlList.PublicRead)
        (object_id_key, file_name, content_type.getOrElse("application/octet-stream")) 
      } catch {
        case e: Exception => {
          println("S3 upload Ex " + e.getMessage())
          val abortMPURequest = new AbortMultipartUploadRequest("xxxxxxx", object_id, initResponse.getUploadId())
          client.abortMultipartUpload(abortMPURequest);
         ("error", file_name, content_type.getOrElse("application/octet-stream"))
        }
      }
    }
  }
}))

}

这篇关于上传文件直接到S3块逐块用使用Iteratees播放斯卡拉的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆