Using Iteratees and Enumerators in Play Scala to Transfer Data to S3

I am creating a Play Framework application in Scala where I would like to transfer an array of bytes to S3. For this, I use the Play-S3 library. The "loading a multi-page file" in the documentation section matters here:

// Retrieve an upload ticket val result:Future[BucketFileUploadTicket] = bucket initiateMultipartUpload BucketFile(fileName, mimeType) // Upload the parts and save the tickets val result:Future[BucketFilePartUploadTicket] = bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content)) // Complete the upload using both the upload ticket and the part upload tickets val result:Future[Unit] = bucket completeMultipartUpload (uploadTicket, partUploadTickets) 

I am trying to do the same in my application, but with Iteratee and Enumerator s.

Streams and asynchrony make things a little complicated, but here is what I still have (Note uploadTicket defined earlier in the code):

 val partNumberStream = Stream.iterate(1)(_ + 1).iterator val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) => bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket)) } (body |>>> partUploadTicketsIteratee).andThen { case result => result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match { case Success(x) => x.map(d => println("Success")) case Failure(t) => throw t } } 

Everything compiles and runs without incident. In fact, "Success" is printed, but not a single file appears on S3.

+5
source share
1 answer

There may be a lot of problems in your code. This is a bit unreadable caused by calls to the map method. You may have a problem with your future squad. Another problem can be caused by the fact that all pieces (except the last) must be at least 5 MB.

The code below has not been tested, but shows a different approach. An iterative approach is one where you can create small building blocks and compose them into a pipe of operations.

To compile the code, I added a trait and several methods

 trait BucketFilePartUploadTicket val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ??? val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ??? val body: Enumerator[Array[Byte]] = ??? 

Here we create some parts

 // Create 5MB chunks val chunked = { val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5) Enumeratee.grouped(take5MB transform Iteratee.consume()) } // Add a counter, used as part number later on val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) { case ((counter, _), bytes) => (counter + 1) -> bytes } // Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled) // Construct the pipe to connect to the enumerator // the ><> operator is an alias for compose, it is more intuitive because of // it arrow like structure val pipe = chunked ><> zipWithIndex ><> uploadPartTickets // Create a consumer that ends by finishing the upload val consumeAndComplete = Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload 

Startup is done by simply connecting parts

 // This is the result, a Future[Unit] val result = body through pipe run consumeAndComplete 

Please note that I have not tested the code and may have made some mistakes in my approach. This, however, shows a different way to solve the problem and probably helps you find a good solution.

Note that this approach expects one part to complete the download before it accepts the next part. If the connection from your server on Amazon is slower than the connection to the browser on your server, this mechanism will slow down the input.

You can use a different approach when you do not wait for the completion of Future download details. This will lead to another step when you use Future.sequence to convert a sequence of loading futures into one future containing a sequence of results. The result will be a mechanism to send parts to Amazon as soon as you have enough data.

+5
source

Source: https://habr.com/ru/post/1209636/


All Articles