Technical: AWS S3 bucket pre-signed paths for entire prefixes

2017-10-08 22:30 ( aws s3 cloud scala )

AWS S3 provides a very efficient storage solution for a wide range of use-cases, but that comes with its limitations. One such aspect is the underlying key-value store: the entire name of the file, together with its path (despite appearing otherwise e.g. in the S3 console) is just a single key and there is no notion of directories.

Another useful feature of S3 is the security model, where files are private by default, however access to them can be temporarily granted with pre-signed URLs. AWS provides examples of how to do it with Java SDK even. However, combined with the key-value store abstraction's limitations, one cannot grant access to entire directories.

The alternative is to create a single archive of a number of files and share a URL to the archive, but how to do it efficiently. Akka Streams is one fine option. A key design motivation of Akka streams is to offer an intuitive and safe way to formulate stream processing setups such that we can then execute them efficiently and with bounded resource usage.

Alpakka for streaming AWS S3 connector

With the right tools, downloading an entire file with text contents in a streaming fashion becomes as simple as:

streamingS3Client.download("bucket", "key")
  .runReduce(_ ++ _)
  .map(result => result.utf8String)

Similarly uploading:

val contentSource = Source.single(ByteString(value)) // creates a Source from a single string value
val s3Sink = streamingS3Client.multipartUpload(bucketName, key) // creates a Sink
contentSource.runWith(s3Sink) // runs the graph by wiring up a source to a string

The "right tools" here is the Alpakka library of Akka Streams connectors to various technologies, protocols or libraries. Specifically - the S3 connector. To include it in your Scala project add it to build.sbt:

libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.13"

Configure the streaming S3 client:

val awsCredentials = new AWSStaticCredentialsProvider(
  new BasicAWSCredentials(ACCESS_KEY_ID, SECRET_KEY)
)
val settings = new S3Settings(MemoryBufferType, None, awsCredentials, AWS_REGION, false)
val streamingS3Client = new S3Client(settings)(system, materializer)

In here you need to already have:

Zipping files with legacy APIs

To perform compression, we want to use Zip, provided by Java Utils:

import java.util.zip.{ ZipEntry, ZipOutputStream }

Managing the Zip archive is, however, done through Java output streams, very different, using mutable structures and not directly compatible with Akka streams:

val zip = new ZipOutputStream(new FileOutputStream(out))
zip.putNextEntry(new ZipEntry(name))
zip.write(byteArrayContent)
zip.close()

Streaming to a Zip file on S3

Relatively little documented StreamConverters come to help:

val s3Sink = streamingS3Client.multipartUpload(bucketName, s"$prefix.zip")

val (zip, upload) = StreamConverters.asOutputStream() // Creates a source of ByteString, which when materialized will return an OutputStream
  .toMat(s3Sink)(Keep.both) // Connects the ByteString source to s3Sink for uploading
  .mapMaterializedValue(m => (new ZipOutputStream(m._1), m._2)) // Creates a ZipOutputStream from the materialized OutputStream
  .run() // Returns the materialized value of the flow (i.e. the stream) and the Future of Multipart Upload

As per documentation, what asOutputStream() does, is "creates a Source which when materialized will return an OutputStream which it is possible to write the ByteStrings to the stream this Source is attached to."

The returned values are therefore the Zip Output stream one can write to, and the Future of the Multipart Upload result to AWS S3! Now the task of streaming a Zip archive up to S3 is as simple as writing the contents to the output stream and waiting for the future to finish.

Connecting the streaming of files from S3 to streaming to Zip on S3

The slightly more complex part is how to stream contents from an entire S3 sub-path all the way back up in a Zip archive:

val downloaded = streamingS3Client.listBucket(bucketName, Some(prefix + "/"))
  .map { contents =>
    streamingS3Client.download(bucketName, contents.key)
      .prepend(Source.single { // Prepend a step of creating a new entry before writing the content
        zip.putNextEntry(new ZipEntry(contents.key)) // Create new entry in the Zip Output Stream
        ByteString("") // Return empty content to prepend to the stream of content
      })
      .map(_.toArray) // Convert ByteString to Array of bytes
      .map(zip.write) // Write the array of bytes to the Zip Output Stream
  }
  .flatMapMerge(1, identity) // Flattens the streams, 1 at a time due to need to write to bugger sequentially
  .reduce((_, _) => Done) // reduce all results to one, emitting when the last is done
  .runForeach(_ => zip.close()) // close the zip file

And the final step is getting of the pre-signed URL itself, though that is done using the standard S3 client, with no streaming involved:

val s3Client: AmazonS3 = AmazonS3ClientBuilder.standard() // Use the standard S3 client builder (not streaming)
    .withRegion(AWS_REGION)
    .withCredentials(awsCredentials)
    .build()

val expiration = DateTime.now().plus(5.minutes) // configure timeout for your download operation

val generatePresignedUrlRequest = new GeneratePresignedUrlRequest(bucketName, key)
  .withMethod(HttpMethod.GET) // only allow a `GET` request
  .withExpiration(expiration.toDate)

val url = Future(s3Client.generatePresignedUrl(generatePresignedUrlRequest)) // generate the URL
url.map(_.toString)

In which case we only need to make sure that all Futures finish before returning the URL:

    for {
      _ <- downloaded
      _ <- upload
      contentUrl <- zipFileUrl // Future of the pre-signed URL
    } yield contentUrl

After the resulting Future completes successfully, you should have a pre-signed link to a Zip archive with all files with a given prefix on your S3 bucket.