From c651a9c1231de26b054cc9ef0223d493e7496f26 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 7 Oct 2025 20:12:29 -0400 Subject: [PATCH 1/4] Use consistent encoding for GBEK across languages --- .../org/apache/beam/sdk/options/PipelineOptions.java | 11 ++++++++--- .../beam/sdk/transforms/GroupByEncryptedKey.java | 10 ++++++---- .../beam/sdk/transforms/GroupByEncryptedKeyTest.java | 7 +++++-- .../org/apache/beam/sdk/transforms/GroupByKeyIT.java | 6 +++++- .../apache/beam/sdk/transforms/GroupByKeyTest.java | 5 ++++- sdks/python/apache_beam/options/pipeline_options.py | 8 ++++++-- 6 files changed, 34 insertions(+), 13 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 62022b219c2a..989e3a1e3193 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -420,7 +420,7 @@ public Long create(PipelineOptions options) { *

Beam will infer the secret type and value based on the secret itself. This guarantees that * any data at rest during the performing a GBK, so this can be used to guarantee that data is not * unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The - * option should be structured like: + * secret should be a url safe base64 encoded 32 byte value. The option should be structured like: * *


    * --gbek=type:;:
@@ -432,14 +432,19 @@ public Long create(PipelineOptions options) {
    * --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
    * 
* - * All variables should use snake case to allow consistency across languages. + * All variables should use snake case to allow consistency across languages. For an example of + * generating a properly formatted secret, see + * https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82 */ @Description( "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" + " infer the secret type and value based on the secret itself. This guarantees that" + " any data at rest during the performing a GBK, so this can be used to guarantee" + " that data is not unencrypted. Runners with this behavior include the Dataflow," - + " Flink, and Spark runners. The option should be structured like:" + + " Flink, and Spark runners. The secret should be a url safe base64 encoded 32 byte" + + " value. For an example of generating a properly formatted secret, see" + + " https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java#L82" + + " When passing in the gbek option, it should be structured like:" + " --gbek=type:;:, for example " + " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + " should use snake case to allow consistency across languages.") diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 6ed0a31b3b95..1f4b7535d89e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -39,8 +39,8 @@ * the output. This is useful when the keys contain sensitive data that should not be stored at rest * by the runner. * - *

The transform requires a {@link Secret} which returns a 32 byte secret which can be used to - * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. + *

The transform requires a {@link Secret} which returns a base64 encoded 32 byte secret which + * can be used to generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. * *

Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this * does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2) @@ -153,7 +153,7 @@ private static class EncryptMessage extends DoFn, KV public void setup() { try { this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); - this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES"); + this.secretKeySpec = + new SecretKeySpec( + java.util.Base64.getUrlDecoder().decode(this.hmacKey.getSecretBytes()), "AES"); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index ba4c50e5a41e..876483a7ddbb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -58,7 +58,7 @@ public class GroupByEncryptedKeyTest implements Serializable { private static class FakeSecret implements Secret { private final byte[] secret = - "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset()); + "YUt3STJQbXFZRnQycDV0TktDeUJTNXFZV0hoSHNHWmM".getBytes(Charset.defaultCharset()); @Override public byte[] getSecretBytes() { @@ -123,7 +123,10 @@ public static void setup() throws IOException { byte[] secretBytes = new byte[32]; new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( - secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + secretName, + SecretPayload.newBuilder() + .setData( + ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) } gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 60477a4c242f..3dd9975463be 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -82,7 +82,11 @@ public static void setup() throws IOException { byte[] secretBytes = new byte[32]; new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( - secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + secretName, + SecretPayload.newBuilder() + .setData( + ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 326da99f1a81..c6e84fedf85a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -153,7 +153,10 @@ public static void setup() throws IOException { new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( secretName, - SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + SecretPayload.newBuilder() + .setData( + ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; } diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 2d3b8b49d8d7..d4903cfb0092 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1726,8 +1726,12 @@ def _add_argparse_args(cls, parser): 'secret itself. This guarantees that any data at rest during the ' 'GBK will be encrypted. Many runners only store data at rest when ' 'performing a GBK, so this can be used to guarantee that data is ' - 'not unencrypted. Runners with this behavior include the ' - 'Dataflow, Flink, and Spark runners. The option should be ' + 'not unencrypted. The secret should be a url safe base64 encoded ' + '32 byte value. To generate a secret in this format, you can use ' + 'Secret.generate_secret_bytes(). For an example of this, see ' + 'https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/python/apache_beam/transforms/util_test.py#L356. ' + 'Runners with this behavior include the Dataflow, ' + 'Flink, and Spark runners. The option should be ' 'structured like: ' '--gbek=type:;:, for example ' '--gbek=type:GcpSecret;version_name:my_secret/versions/latest')) From c63acdca0acd426b67e48124ea2f05c831dc614c Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 7 Oct 2025 20:33:36 -0400 Subject: [PATCH 2/4] syntax --- .../org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 876483a7ddbb..f149b61b3593 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -126,7 +126,7 @@ public static void setup() throws IOException { secretName, SecretPayload.newBuilder() .setData( - ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))); } gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); } From 987fbb916d82a67dec6b32237ef195b0764b4b87 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 7 Oct 2025 20:34:36 -0400 Subject: [PATCH 3/4] build --- .../apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index f149b61b3593..8db45c309ecc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -126,7 +126,8 @@ public static void setup() throws IOException { secretName, SecretPayload.newBuilder() .setData( - ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))); + ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .build()); } gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); } From ce5a918f9468cba9b67e5c23ce8486c53d70470e Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 7 Oct 2025 21:09:33 -0400 Subject: [PATCH 4/4] fmt --- .../apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java | 3 +-- .../test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java | 3 +-- .../java/org/apache/beam/sdk/transforms/GroupByKeyTest.java | 3 +-- sdks/python/apache_beam/options/pipeline_options.py | 2 +- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 8db45c309ecc..3a2fc2f08c04 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -125,8 +125,7 @@ public static void setup() throws IOException { client.addSecretVersion( secretName, SecretPayload.newBuilder() - .setData( - ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) .build()); } gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 3dd9975463be..141d6dae64b2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -84,8 +84,7 @@ public static void setup() throws IOException { client.addSecretVersion( secretName, SecretPayload.newBuilder() - .setData( - ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) .build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index c6e84fedf85a..d9a3e3ed20d4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -154,8 +154,7 @@ public static void setup() throws IOException { client.addSecretVersion( secretName, SecretPayload.newBuilder() - .setData( - ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) + .setData(ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes))) .build()); } gcpSecretVersionName = secretName.toString() + "/versions/latest"; diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index d4903cfb0092..3fc5151156f1 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1729,7 +1729,7 @@ def _add_argparse_args(cls, parser): 'not unencrypted. The secret should be a url safe base64 encoded ' '32 byte value. To generate a secret in this format, you can use ' 'Secret.generate_secret_bytes(). For an example of this, see ' - 'https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/python/apache_beam/transforms/util_test.py#L356. ' + 'https://github.com/apache/beam/blob/c8df4da229da49d533491857e1bb4ab5dbf4fd37/sdks/python/apache_beam/transforms/util_test.py#L356. ' # pylint: disable=line-too-long 'Runners with this behavior include the Dataflow, ' 'Flink, and Spark runners. The option should be ' 'structured like: '