From 0bf574b511ef30df515fa97524a8d69b94e5c32d Mon Sep 17 00:00:00 2001 From: Alexey Romanenko Date: Fri, 13 Aug 2021 19:11:05 +0200 Subject: [PATCH] [BEAM-12429] Add support for S3 Bucket Key at the object level --- .../org/apache/beam/sdk/io/aws/options/S3Options.java | 8 ++++++++ .../beam/sdk/io/aws/s3/S3FileSystemConfiguration.java | 9 +++++++++ .../beam/sdk/io/aws/s3/S3WritableByteChannel.java | 1 + .../org/apache/beam/sdk/io/aws/s3/S3TestUtils.java | 6 +++++- .../beam/sdk/io/aws/s3/S3WritableByteChannelTest.java | 11 ++++++++--- 5 files changed, 31 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java index 41b07b128e90..e9979b5c99ea 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java @@ -73,6 +73,14 @@ public interface S3Options extends AwsOptions { void setSSEAwsKeyManagementParams(SSEAwsKeyManagementParams value); + @Description( + "Set to true to use an S3 Bucket Key for object encryption with server-side " + + "encryption using AWS KMS (SSE-KMS)") + @Default.Boolean(false) + boolean getBucketKeyEnabled(); + + void setBucketKeyEnabled(boolean value); + @Description( "Factory class that should be created and used to create a builder of AmazonS3 client." + "Override the default value if you need a S3 client with custom properties, like path style access, etc.") diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemConfiguration.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemConfiguration.java index 9677529443fe..d5f2327b5ff9 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemConfiguration.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystemConfiguration.java @@ -61,6 +61,12 @@ public abstract class S3FileSystemConfiguration { /** KMS key id for SSE-KMS encryption, e.g. "arn:aws:kms:...". */ public abstract @Nullable SSEAwsKeyManagementParams getSSEAwsKeyManagementParams(); + /** + * Whether to ose an S3 Bucket Key for object encryption with server-side encryption using AWS KMS + * (SSE-KMS) or not. + */ + public abstract boolean getBucketKeyEnabled(); + /** Builder used to create the {@code AmazonS3Client}. */ public abstract AmazonS3ClientBuilder getS3ClientBuilder(); @@ -84,6 +90,7 @@ public static Builder fromS3Options(S3Options s3Options) { .setSSEAlgorithm(s3Options.getSSEAlgorithm()) .setSSECustomerKey(s3Options.getSSECustomerKey()) .setSSEAwsKeyManagementParams(s3Options.getSSEAwsKeyManagementParams()) + .setBucketKeyEnabled(s3Options.getBucketKeyEnabled()) .setS3ClientBuilder(getBuilder(s3Options)); } @@ -111,6 +118,8 @@ public abstract static class Builder { public abstract Builder setSSEAwsKeyManagementParams(@Nullable SSEAwsKeyManagementParams value); + public abstract Builder setBucketKeyEnabled(boolean value); + public abstract Builder setS3ClientBuilder(AmazonS3ClientBuilder value); public abstract S3FileSystemConfiguration build(); diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java index 93f5b1371bb0..9e00e93c0be2 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannel.java @@ -92,6 +92,7 @@ class S3WritableByteChannel implements WritableByteChannel { .withObjectMetadata(objectMetadata); request.setSSECustomerKey(config.getSSECustomerKey()); request.setSSEAwsKeyManagementParams(config.getSSEAwsKeyManagementParams()); + request.setBucketKeyEnabled(config.getBucketKeyEnabled()); InitiateMultipartUploadResult result; try { result = amazonS3.initiateMultipartUpload(request); diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java index 83f5a870b45a..3df2f10f9c82 100644 --- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3TestUtils.java @@ -95,7 +95,10 @@ static S3FileSystemConfiguration s3ConfigWithSSEAwsKeyManagementParams(String sc "arn:aws:kms:eu-west-1:123456789012:key/dc123456-7890-ABCD-EF01-234567890ABC"; SSEAwsKeyManagementParams sseAwsKeyManagementParams = new SSEAwsKeyManagementParams(awsKmsKeyId); - return configBuilder(scheme).setSSEAwsKeyManagementParams(sseAwsKeyManagementParams).build(); + return configBuilder(scheme) + .setSSEAwsKeyManagementParams(sseAwsKeyManagementParams) + .setBucketKeyEnabled(true) + .build(); } static S3Options s3OptionsWithSSEAwsKeyManagementParams() { @@ -105,6 +108,7 @@ static S3Options s3OptionsWithSSEAwsKeyManagementParams() { SSEAwsKeyManagementParams sseAwsKeyManagementParams = new SSEAwsKeyManagementParams(awsKmsKeyId); options.setSSEAwsKeyManagementParams(sseAwsKeyManagementParams); + options.setBucketKeyEnabled(true); return options; } diff --git a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannelTest.java b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannelTest.java index 35e6682534d2..00f9cffda5ff 100644 --- a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannelTest.java +++ b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/s3/S3WritableByteChannelTest.java @@ -106,7 +106,8 @@ private void writeFromOptions(S3Options options) throws IOException { options.getSSEAlgorithm(), toMd5(options.getSSECustomerKey()), options.getSSEAwsKeyManagementParams(), - options.getS3UploadBufferSizeBytes()); + options.getS3UploadBufferSizeBytes(), + options.getBucketKeyEnabled()); } private void writeFromConfig(S3FileSystemConfiguration config) throws IOException { @@ -120,7 +121,8 @@ private void writeFromConfig(S3FileSystemConfiguration config) throws IOExceptio config.getSSEAlgorithm(), toMd5(config.getSSECustomerKey()), config.getSSEAwsKeyManagementParams(), - config.getS3UploadBufferSizeBytes()); + config.getS3UploadBufferSizeBytes(), + config.getBucketKeyEnabled()); } private void write( @@ -130,7 +132,8 @@ private void write( String sseAlgorithm, String sseCustomerKeyMd5, SSEAwsKeyManagementParams sseAwsKeyManagementParams, - long s3UploadBufferSizeBytes) + long s3UploadBufferSizeBytes, + boolean bucketKeyEnabled) throws IOException { InitiateMultipartUploadResult initiateMultipartUploadResult = new InitiateMultipartUploadResult(); @@ -145,6 +148,7 @@ private void write( sseAlgorithm = "aws:kms"; initiateMultipartUploadResult.setSSEAlgorithm(sseAlgorithm); } + initiateMultipartUploadResult.setBucketKeyEnabled(bucketKeyEnabled); doReturn(initiateMultipartUploadResult) .when(mockAmazonS3) .initiateMultipartUpload(any(InitiateMultipartUploadRequest.class)); @@ -153,6 +157,7 @@ private void write( mockAmazonS3.initiateMultipartUpload( new InitiateMultipartUploadRequest(path.getBucket(), path.getKey())); assertEquals(sseAlgorithm, mockInitiateMultipartUploadResult.getSSEAlgorithm()); + assertEquals(bucketKeyEnabled, mockInitiateMultipartUploadResult.getBucketKeyEnabled()); assertEquals(sseCustomerKeyMd5, mockInitiateMultipartUploadResult.getSSECustomerKeyMd5()); UploadPartResult result = new UploadPartResult();