From d192b624654bd8db65312f386091859080c2a93d Mon Sep 17 00:00:00 2001 From: Moritz Mack Date: Thu, 3 Feb 2022 12:28:19 +0100 Subject: [PATCH] [BEAM-13246] Add support for S3 Bucket Key at the object level (AWS Sdk v2). --- .../org/apache/beam/sdk/io/aws2/options/S3Options.java | 7 +++++++ .../beam/sdk/io/aws2/s3/S3FileSystemConfiguration.java | 9 +++++++++ .../beam/sdk/io/aws2/s3/S3WritableByteChannel.java | 1 + .../java/org/apache/beam/sdk/io/aws2/s3/S3TestUtils.java | 7 ++++++- .../beam/sdk/io/aws2/s3/S3WritableByteChannelTest.java | 5 +++++ 5 files changed, 28 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/S3Options.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/S3Options.java index dbed7fcbd539..e59a8ebca2e2 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/S3Options.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/S3Options.java @@ -69,6 +69,13 @@ public interface S3Options extends AwsOptions { void setSSEKMSKeyId(String value); + @Description( + "Enable to use an S3 Bucket Key for object encryption with server-side encryption using AWS KMS (SSE-KMS)") + @Default.Boolean(false) + boolean getBucketKeyEnabled(); + + void setBucketKeyEnabled(boolean value); + @Description( "Factory class that should be created and used to create a builder of S3client." + "Override the default value if you need a S3 client with custom properties, like path style access, etc.") diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemConfiguration.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemConfiguration.java index c5d25d724be7..0a52399e92cf 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemConfiguration.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystemConfiguration.java @@ -58,6 +58,12 @@ public abstract class S3FileSystemConfiguration { /** KMS key id for SSE-KMS encyrption, e.g. "arn:aws:kms:..." */ public abstract @Nullable String getSSEKMSKeyId(); + /** + * Whether to use an S3 Bucket Key for object encryption with server-side encryption using AWS KMS + * (SSE-KMS) or not. + */ + public abstract boolean getBucketKeyEnabled(); + /** Builder used to create the {@code S3Client}. */ public abstract S3ClientBuilder getS3ClientBuilder(); @@ -81,6 +87,7 @@ public static Builder builderFrom(S3Options s3Options) { .setSSEAlgorithm(s3Options.getSSEAlgorithm()) .setSSECustomerKey(s3Options.getSSECustomerKey()) .setSSEKMSKeyId(s3Options.getSSEKMSKeyId()) + .setBucketKeyEnabled(s3Options.getBucketKeyEnabled()) .setS3ClientBuilder(getBuilder(s3Options)); } @@ -112,6 +119,8 @@ public abstract static class Builder { public abstract Builder setSSEKMSKeyId(@Nullable String value); + public abstract Builder setBucketKeyEnabled(boolean value); + public abstract Builder setS3ClientBuilder(S3ClientBuilder value); public abstract S3FileSystemConfiguration build(); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3WritableByteChannel.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3WritableByteChannel.java index cc61980838b0..a86b529439ac 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3WritableByteChannel.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3WritableByteChannel.java @@ -97,6 +97,7 @@ class S3WritableByteChannel implements WritableByteChannel { .sseCustomerAlgorithm(config.getSSECustomerKey().getAlgorithm()) .ssekmsKeyId(config.getSSEKMSKeyId()) .sseCustomerKeyMD5(config.getSSECustomerKey().getMD5()) + .bucketKeyEnabled(config.getBucketKeyEnabled()) .build(); CreateMultipartUploadResponse response; try { diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3TestUtils.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3TestUtils.java index feb9fc401235..f90c5bb8d576 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3TestUtils.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3TestUtils.java @@ -97,7 +97,11 @@ static S3Options s3OptionsWithSSECustomerKey() { static S3FileSystemConfiguration s3ConfigWithSSEKMSKeyId(String scheme) { String ssekmsKeyId = "arn:aws:kms:eu-west-1:123456789012:key/dc123456-7890-ABCD-EF01-234567890ABC"; - return configBuilder(scheme).setSSEAlgorithm("aws:kms").setSSEKMSKeyId(ssekmsKeyId).build(); + return configBuilder(scheme) + .setSSEAlgorithm("aws:kms") + .setSSEKMSKeyId(ssekmsKeyId) + .setBucketKeyEnabled(true) + .build(); } static S3Options s3OptionsWithSSEKMSKeyId() { @@ -106,6 +110,7 @@ static S3Options s3OptionsWithSSEKMSKeyId() { "arn:aws:kms:eu-west-1:123456789012:key/dc123456-7890-ABCD-EF01-234567890ABC"; options.setSSEKMSKeyId(ssekmsKeyId); options.setSSEAlgorithm("aws:kms"); + options.setBucketKeyEnabled(true); return options; } diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3WritableByteChannelTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3WritableByteChannelTest.java index a8b4361b8958..4f2b03361f1a 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3WritableByteChannelTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/s3/S3WritableByteChannelTest.java @@ -111,6 +111,7 @@ private void writeFromOptions(S3Options options, boolean writeReadOnlyBuffer) th toMd5(options.getSSECustomerKey()), options.getSSEKMSKeyId(), options.getS3UploadBufferSizeBytes(), + options.getBucketKeyEnabled(), writeReadOnlyBuffer); } @@ -127,6 +128,7 @@ private void writeFromConfig(S3FileSystemConfiguration config, boolean writeRead toMd5(config.getSSECustomerKey()), config.getSSEKMSKeyId(), config.getS3UploadBufferSizeBytes(), + config.getBucketKeyEnabled(), writeReadOnlyBuffer); } @@ -138,6 +140,7 @@ private void write( String sseCustomerKeyMd5, String ssekmsKeyId, long s3UploadBufferSizeBytes, + boolean bucketKeyEnabled, boolean writeReadOnlyBuffer) throws IOException { CreateMultipartUploadResponse.Builder builder = @@ -154,6 +157,7 @@ private void write( sseAlgorithm = ServerSideEncryption.AWS_KMS; builder.serverSideEncryption(sseAlgorithm); } + builder.bucketKeyEnabled(bucketKeyEnabled); CreateMultipartUploadResponse createMultipartUploadResponse = builder.build(); doReturn(createMultipartUploadResponse) .when(mockS3Client) @@ -165,6 +169,7 @@ private void write( mockS3Client.createMultipartUpload(createMultipartUploadRequest); assertEquals(sseAlgorithm, mockCreateMultipartUploadResponse1.serverSideEncryption()); assertEquals(sseCustomerKeyMd5, mockCreateMultipartUploadResponse1.sseCustomerKeyMD5()); + assertEquals(bucketKeyEnabled, mockCreateMultipartUploadResponse1.bucketKeyEnabled()); UploadPartResponse.Builder uploadPartResponseBuilder = UploadPartResponse.builder().eTag("etag");