From b98663c2fa8644364c639b9e08de94fc7fae47ec Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 19 Sep 2025 14:03:00 -0400 Subject: [PATCH 01/24] First pass at Java GBEK (AI generated) --- sdks/java/core/build.gradle | 1 + .../apache/beam/sdk/transforms/GcpSecret.java | 52 ++++++ .../sdk/transforms/GroupByEncryptedKey.java | 138 ++++++++++++++ .../apache/beam/sdk/transforms/Secret.java | 31 ++++ .../transforms/GroupByEncryptedKeyTest.java | 169 ++++++++++++++++++ 5 files changed, 391 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e849ae597791..8ca227a6ae8f 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,6 +100,7 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) + implementation 'com.google.cloud:google-cloud-secretmanager' permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java new file mode 100644 index 000000000000..00271dd7ad86 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; +import java.io.IOException; + +/** + * A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. + */ +public class GcpSecret implements Secret { + private final String version_name; + + /** + * Initializes a GcpSecret object. + * + * @param version_name The full version name of the secret in Google Cloud Secret Manager. For + * example: projects//secrets//versions/1. For more info, see + * https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version + */ + public GcpSecret(String version_name) { + this.version_name = version_name; + } + + @Override + public byte[] getSecretBytes() { + try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { + SecretVersionName secretVersionName = SecretVersionName.parse(version_name); + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + return response.getPayload().getData().toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Failed to retrieve secret bytes", e); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java new file mode 100644 index 000000000000..29fe57e5d856 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import javax.crypto.Cipher; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * A {@link PTransform} that provides a secure alternative to {@link GroupByKey}. + * + *

This transform encrypts the keys of the input {@link PCollection}, performs a {@link + * GroupByKey} on the encrypted keys, and then decrypts the keys in the output. This is useful when + * the keys contain sensitive data that should not be stored at rest by the runner. + */ +public class GroupByEncryptedKey + extends PTransform>, PCollection>>> { + + private final Secret hmacKey; + + private GroupByEncryptedKey(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + public static GroupByEncryptedKey create(Secret hmacKey) { + return new GroupByEncryptedKey<>(hmacKey); + } + + @Override + public PCollection>> expand(PCollection> input) { + return input + .apply("EncryptMessage", ParDo.of(new _EncryptMessage<>())) + .apply(GroupByKey.create()) + .apply("DecryptMessage", ParDo.of(new _DecryptMessage<>())); + } + + private static class _EncryptMessage extends DoFn, KV>> { + private final Secret hmacKey; + private transient Mac mac; + private transient Cipher cipher; + + _EncryptMessage(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + @Setup + public void setup() throws Exception { + mac = Mac.getInstance("HmacSHA256"); + mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); + cipher = Cipher.getInstance("AES"); + cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Coder keyCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); + Coder valueCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); + + byte[] encodedKey = encode(keyCoder, c.element().getKey()); + byte[] encodedValue = encode(valueCoder, c.element().getValue()); + + byte[] hmac = mac.doFinal(encodedKey); + byte[] encryptedKey = cipher.doFinal(encodedKey); + byte[] encryptedValue = cipher.doFinal(encodedValue); + + c.output(KV.of(hmac, KV.of(encryptedKey, encryptedValue))); + } + + private byte[] encode(Coder coder, T value) throws Exception { + java.io.ByteArrayOutputStream os = new java.io.ByteArrayOutputStream(); + coder.encode(value, os); + return os.toByteArray(); + } + } + + private static class _DecryptMessage + extends DoFn>>, KV>> { + private final Secret hmacKey; + private transient Cipher cipher; + + _DecryptMessage(Secret hmacKey) { + this.hmacKey = hmacKey; + } + + @Setup + public void setup() throws Exception { + cipher = Cipher.getInstance("AES"); + cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Coder keyCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); + Coder valueCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); + + java.util.Map> decryptedKvs = new java.util.HashMap<>(); + for (KV encryptedKv : c.element().getValue()) { + byte[] decryptedKeyBytes = cipher.doFinal(encryptedKv.getKey()); + K key = decode(keyCoder, decryptedKeyBytes); + + if (!decryptedKvs.containsKey(key)) { + decryptedKvs.put(key, new java.util.ArrayList<>()); + } + byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); + V value = decode(valueCoder, decryptedValueBytes); + decryptedKvs.get(key).add(value); + } + + for (java.util.Map.Entry> entry : decryptedKvs.entrySet()) { + c.output(KV.of(entry.getKey(), entry.getValue())); + } + } + + private T decode(Coder coder, byte[] bytes) throws Exception { + java.io.ByteArrayInputStream is = new java.io.ByteArrayInputStream(bytes); + return coder.decode(is); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java new file mode 100644 index 000000000000..ae34fb6ee630 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; + +/** + * A secret management interface used for handling sensitive data. + * + *

This interface provides a generic way to handle secrets. Implementations of this interface + * should handle fetching secrets from a secret management system. + */ +public interface Secret extends Serializable { + /** Returns the secret as a byte array. */ + byte[] getSecretBytes(); +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java new file mode 100644 index 000000000000..5860448140fd --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.SecureRandom; +import org.junit.After; +import org.junit.Before; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GroupByEncryptedKey}. */ +@RunWith(JUnit4.class) +public class GroupByEncryptedKeyTest implements Serializable { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + private static class FakeSecret implements Secret { + private final byte[] secret = "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(); + + @Override + public byte[] getSecretBytes() { + return secret; + } + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyFakeSecret() { + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = + input.apply(GroupByEncryptedKey.create(new FakeSecret())); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private Secret gcpSecret; + + @Before + public void setup() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, SECRET_ID, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); + } + + @After + public void tearDown() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpSecret() { + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = + input.apply(GroupByEncryptedKey.create(gcpSecret)); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyGcpSecretThrows() { + Secret gcpSecret = new GcpSecret("bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))) + .apply(GroupByEncryptedKey.create(gcpSecret)); + assertThrows(RuntimeException.class, () -> p.run()); + } +} From c5aacfb2556367bb2261c49f6b414ff91d46fb4a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 19 Sep 2025 16:39:30 -0400 Subject: [PATCH 02/24] Compile --- .../apache/beam/sdk/transforms/GcpSecret.java | 4 +- .../sdk/transforms/GroupByEncryptedKey.java | 103 +++++++++++------- .../transforms/GroupByEncryptedKeyTest.java | 21 ++-- 3 files changed, 74 insertions(+), 54 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java index 00271dd7ad86..b67431b5c00f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java @@ -22,9 +22,7 @@ import com.google.cloud.secretmanager.v1.SecretVersionName; import java.io.IOException; -/** - * A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. - */ +/** A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. */ public class GcpSecret implements Secret { private final String version_name; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 29fe57e5d856..204cddda4a84 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import java.util.Arrays; import javax.crypto.Cipher; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; @@ -26,11 +27,13 @@ import org.apache.beam.sdk.values.PCollection; /** - * A {@link PTransform} that provides a secure alternative to {@link GroupByKey}. + * A {@link PTransform} that provides a secure alternative to {@link + * org.apache.beam.sdk.transforms.GroupByKey}. * *

This transform encrypts the keys of the input {@link PCollection}, performs a {@link - * GroupByKey} on the encrypted keys, and then decrypts the keys in the output. This is useful when - * the keys contain sensitive data that should not be stored at rest by the runner. + * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in + * the output. This is useful when the keys contain sensitive data that should not be stored at rest + * by the runner. */ public class GroupByEncryptedKey extends PTransform>, PCollection>>> { @@ -47,36 +50,48 @@ public static GroupByEncryptedKey create(Secret hmacKey) { @Override public PCollection>> expand(PCollection> input) { + Coder> inputCoder = input.getCoder(); + if (!(inputCoder instanceof KvCoder)) { + throw new IllegalStateException("GroupByEncryptedKey requires its input to use KvCoder"); + } + KvCoder inputKvCoder = (KvCoder) inputCoder; + Coder keyCoder = inputKvCoder.getKeyCoder(); + Coder valueCoder = inputKvCoder.getValueCoder(); + return input - .apply("EncryptMessage", ParDo.of(new _EncryptMessage<>())) + .apply("EncryptMessage", ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) .apply(GroupByKey.create()) - .apply("DecryptMessage", ParDo.of(new _DecryptMessage<>())); + .apply( + "DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))); } - private static class _EncryptMessage extends DoFn, KV>> { + private static class EncryptMessage extends DoFn, KV>> { private final Secret hmacKey; - private transient Mac mac; - private transient Cipher cipher; + private final Coder keyCoder; + private final Coder valueCoder; + private final Mac mac; + private final Cipher cipher; - _EncryptMessage(Secret hmacKey) { + EncryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; - } - - @Setup - public void setup() throws Exception { - mac = Mac.getInstance("HmacSHA256"); - mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); - cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + + try { + this.mac = Mac.getInstance("HmacSHA256"); + this.mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); + this.cipher = Cipher.getInstance("AES"); + this.cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } catch (Exception ex) { + throw new RuntimeException( + "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); + } } @ProcessElement public void processElement(ProcessContext c) throws Exception { - Coder keyCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); - Coder valueCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); - - byte[] encodedKey = encode(keyCoder, c.element().getKey()); - byte[] encodedValue = encode(valueCoder, c.element().getValue()); + byte[] encodedKey = encode(this.keyCoder, c.element().getKey()); + byte[] encodedValue = encode(this.valueCoder, c.element().getValue()); byte[] hmac = mac.doFinal(encodedKey); byte[] encryptedKey = cipher.doFinal(encodedKey); @@ -92,37 +107,45 @@ private byte[] encode(Coder coder, T value) throws Exception { } } - private static class _DecryptMessage + private static class DecryptMessage extends DoFn>>, KV>> { private final Secret hmacKey; + private final Coder keyCoder; + private final Coder valueCoder; private transient Cipher cipher; - _DecryptMessage(Secret hmacKey) { + DecryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; - } - - @Setup - public void setup() throws Exception { - cipher = Cipher.getInstance("AES"); - cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + + try { + this.cipher = Cipher.getInstance("AES"); + this.cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + } catch (Exception ex) { + throw new RuntimeException( + "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); + } } @ProcessElement public void processElement(ProcessContext c) throws Exception { - Coder keyCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getKeyCoder(); - Coder valueCoder = ((KvCoder) c.getPipeline().getCoderRegistry().getCoder(c.element().getClass())).getValueCoder(); - java.util.Map> decryptedKvs = new java.util.HashMap<>(); for (KV encryptedKv : c.element().getValue()) { byte[] decryptedKeyBytes = cipher.doFinal(encryptedKv.getKey()); - K key = decode(keyCoder, decryptedKeyBytes); - - if (!decryptedKvs.containsKey(key)) { - decryptedKvs.put(key, new java.util.ArrayList<>()); + K key = decode(this.keyCoder, decryptedKeyBytes); + + if (key != null) { + if (!decryptedKvs.containsKey(key)) { + decryptedKvs.put(key, new java.util.ArrayList<>()); + } + byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); + V value = decode(this.valueCoder, decryptedValueBytes); + decryptedKvs.get(key).add(value); + } else { + throw new RuntimeException( + "Found null key when decoding " + Arrays.toString(decryptedKeyBytes)); } - byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); - V value = decode(valueCoder, decryptedValueBytes); - decryptedKvs.get(key).add(value); } for (java.util.Map.Entry> entry : decryptedKvs.entrySet()) { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 5860448140fd..456f9d770da7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -17,7 +17,14 @@ */ package org.apache.beam.sdk.transforms; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; import java.io.Serializable; +import java.security.SecureRandom; import java.util.Arrays; import java.util.List; import org.apache.beam.sdk.coders.KvCoder; @@ -28,18 +35,11 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.junit.After; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.security.SecureRandom; -import org.junit.After; -import org.junit.Before; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -162,8 +162,7 @@ public void testGroupByKeyGcpSecret() { @Category(NeedsRunner.class) public void testGroupByKeyGcpSecretThrows() { Secret gcpSecret = new GcpSecret("bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))) - .apply(GroupByEncryptedKey.create(gcpSecret)); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByEncryptedKey.create(gcpSecret)); assertThrows(RuntimeException.class, () -> p.run()); } } From 45ed98fd22c446e956569fe78a6b1cb6c3e94e5a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 19 Sep 2025 16:54:49 -0400 Subject: [PATCH 03/24] Compiletest --- .../apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 456f9d770da7..a093e4d4a5fb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.transforms; +import static org.junit.Assert.assertThrows; + import com.google.cloud.secretmanager.v1.ProjectName; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretName; @@ -24,6 +26,7 @@ import com.google.protobuf.ByteString; import java.io.IOException; import java.io.Serializable; +import java.nio.charset.Charset; import java.security.SecureRandom; import java.util.Arrays; import java.util.List; @@ -50,7 +53,8 @@ public class GroupByEncryptedKeyTest implements Serializable { @Rule public transient TestPipeline p = TestPipeline.create(); private static class FakeSecret implements Secret { - private final byte[] secret = "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(); + private final byte[] secret = + "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(Charset.defaultCharset()); @Override public byte[] getSecretBytes() { From 4bf3c9a58169547a7e233de0670386245cb898e3 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 19 Sep 2025 17:28:28 -0400 Subject: [PATCH 04/24] checkstyle --- .../main/resources/beam/checkstyle/suppressions.xml | 1 + .../java/org/apache/beam/sdk/transforms/GcpSecret.java | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index e8d4e8888da1..5ee8872d006e 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -56,6 +56,7 @@ + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java index b67431b5c00f..d7d04be23a83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java @@ -24,23 +24,23 @@ /** A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. */ public class GcpSecret implements Secret { - private final String version_name; + private final String versionName; /** * Initializes a GcpSecret object. * - * @param version_name The full version name of the secret in Google Cloud Secret Manager. For + * @param versionName The full version name of the secret in Google Cloud Secret Manager. For * example: projects//secrets//versions/1. For more info, see * https://cloud.google.com/python/docs/reference/secretmanager/latest/google.cloud.secretmanager_v1beta1.services.secret_manager_service.SecretManagerServiceClient#google_cloud_secretmanager_v1beta1_services_secret_manager_service_SecretManagerServiceClient_access_secret_version */ - public GcpSecret(String version_name) { - this.version_name = version_name; + public GcpSecret(String versionName) { + this.versionName = versionName; } @Override public byte[] getSecretBytes() { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { - SecretVersionName secretVersionName = SecretVersionName.parse(version_name); + SecretVersionName secretVersionName = SecretVersionName.parse(versionName); AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); return response.getPayload().getData().toByteArray(); } catch (IOException e) { From c62d8e0d68acbba733bada273636f9af294b3952 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 11:47:40 -0400 Subject: [PATCH 05/24] tests passing --- sdks/java/core/build.gradle | 3 +- .../sdk/transforms/GroupByEncryptedKey.java | 46 +++++++++++------ .../transforms/GroupByEncryptedKeyTest.java | 49 ++++++++++++------- 3 files changed, 65 insertions(+), 33 deletions(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 8ca227a6ae8f..9fbd9a70385d 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,7 +100,7 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) - implementation 'com.google.cloud:google-cloud-secretmanager' + implementation 'com.google.cloud:google-cloud-secretmanager:2.75.0' permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema @@ -124,6 +124,7 @@ dependencies { shadowTest library.java.log4j shadowTest library.java.log4j2_api shadowTest library.java.jamm + shadowTest 'com.google.cloud:google-cloud-secretmanager:2.75.0' testRuntimeOnly library.java.slf4j_jdk14 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 204cddda4a84..ef7a21ee7e43 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -22,6 +22,7 @@ import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -58,30 +59,40 @@ public PCollection>> expand(PCollection> input) { Coder keyCoder = inputKvCoder.getKeyCoder(); Coder valueCoder = inputKvCoder.getValueCoder(); - return input - .apply("EncryptMessage", ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) - .apply(GroupByKey.create()) - .apply( - "DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))); + PCollection>>> grouped = + input + .apply( + "EncryptMessage", + ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .apply(GroupByKey.create()); + + return grouped + .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) + .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder))); } + @SuppressWarnings("initialization.fields.uninitialized") private static class EncryptMessage extends DoFn, KV>> { private final Secret hmacKey; private final Coder keyCoder; private final Coder valueCoder; - private final Mac mac; - private final Cipher cipher; + private transient Mac mac; + private transient Cipher cipher; EncryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; this.keyCoder = keyCoder; this.valueCoder = valueCoder; + } + @Setup + public void setup() { try { this.mac = Mac.getInstance("HmacSHA256"); - this.mac.init(new SecretKeySpec(hmacKey.getSecretBytes(), "HmacSHA256")); + this.mac.init(new SecretKeySpec(this.hmacKey.getSecretBytes(), "HmacSHA256")); this.cipher = Cipher.getInstance("AES"); - this.cipher.init(Cipher.ENCRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + this.cipher.init( + Cipher.ENCRYPT_MODE, new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES")); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); @@ -93,9 +104,9 @@ public void processElement(ProcessContext c) throws Exception { byte[] encodedKey = encode(this.keyCoder, c.element().getKey()); byte[] encodedValue = encode(this.valueCoder, c.element().getValue()); - byte[] hmac = mac.doFinal(encodedKey); - byte[] encryptedKey = cipher.doFinal(encodedKey); - byte[] encryptedValue = cipher.doFinal(encodedValue); + byte[] hmac = this.mac.doFinal(encodedKey); + byte[] encryptedKey = this.cipher.doFinal(encodedKey); + byte[] encryptedValue = this.cipher.doFinal(encodedValue); c.output(KV.of(hmac, KV.of(encryptedKey, encryptedValue))); } @@ -107,6 +118,7 @@ private byte[] encode(Coder coder, T value) throws Exception { } } + @SuppressWarnings("initialization.fields.uninitialized") private static class DecryptMessage extends DoFn>>, KV>> { private final Secret hmacKey; @@ -118,10 +130,14 @@ private static class DecryptMessage this.hmacKey = hmacKey; this.keyCoder = keyCoder; this.valueCoder = valueCoder; + } + @Setup + public void setup() { try { this.cipher = Cipher.getInstance("AES"); - this.cipher.init(Cipher.DECRYPT_MODE, new SecretKeySpec(hmacKey.getSecretBytes(), "AES")); + this.cipher.init( + Cipher.DECRYPT_MODE, new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES")); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); @@ -132,14 +148,14 @@ private static class DecryptMessage public void processElement(ProcessContext c) throws Exception { java.util.Map> decryptedKvs = new java.util.HashMap<>(); for (KV encryptedKv : c.element().getValue()) { - byte[] decryptedKeyBytes = cipher.doFinal(encryptedKv.getKey()); + byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKv.getKey()); K key = decode(this.keyCoder, decryptedKeyBytes); if (key != null) { if (!decryptedKvs.containsKey(key)) { decryptedKvs.put(key, new java.util.ArrayList<>()); } - byte[] decryptedValueBytes = cipher.doFinal(encryptedKv.getValue()); + byte[] decryptedValueBytes = this.cipher.doFinal(encryptedKv.getValue()); V value = decode(this.valueCoder, decryptedValueBytes); decryptedKvs.get(key).add(value); } else { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index a093e4d4a5fb..5ccb1ec41cc5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -30,6 +30,8 @@ import java.security.SecureRandom; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -38,8 +40,8 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -54,7 +56,7 @@ public class GroupByEncryptedKeyTest implements Serializable { private static class FakeSecret implements Secret { private final byte[] secret = - "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZcyZrnZQiQ-uE=".getBytes(Charset.defaultCharset()); + "aKwI2PmqYFt2p5tNKCyBS5qYmHhHsGZc".getBytes(Charset.defaultCharset()); @Override public byte[] getSecretBytes() { @@ -81,13 +83,13 @@ public void testGroupByKeyFakeSecret() { .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); PCollection>> output = - input.apply(GroupByEncryptedKey.create(new FakeSecret())); + input.apply(GroupByEncryptedKey.create(new FakeSecret())); - PAssert.that(output) + PAssert.that(output.apply("Sort", MapElements.via(new SortValues()))) .containsInAnyOrder( KV.of("k1", Arrays.asList(3, 4)), - KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), - KV.of("k2", Arrays.asList(66, -33)), + KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)), + KV.of("k2", Arrays.asList(-33, 66)), KV.of("k3", Arrays.asList(0))); p.run(); @@ -95,10 +97,10 @@ public void testGroupByKeyFakeSecret() { private static final String PROJECT_ID = "apache-beam-testing"; private static final String SECRET_ID = "gbek-test"; - private Secret gcpSecret; + private static Secret gcpSecret; - @Before - public void setup() throws IOException { + @BeforeClass + public static void setup() throws IOException { SecretManagerServiceClient client = SecretManagerServiceClient.create(); ProjectName projectName = ProjectName.of(PROJECT_ID); SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); @@ -124,8 +126,8 @@ public void setup() throws IOException { gcpSecret = new GcpSecret(secretName.toString() + "/versions/latest"); } - @After - public void tearDown() throws IOException { + @AfterClass + public static void tearDown() throws IOException { SecretManagerServiceClient client = SecretManagerServiceClient.create(); SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); client.deleteSecret(secretName); @@ -150,13 +152,13 @@ public void testGroupByKeyGcpSecret() { .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); PCollection>> output = - input.apply(GroupByEncryptedKey.create(gcpSecret)); + input.apply(GroupByEncryptedKey.create(gcpSecret)); - PAssert.that(output) + PAssert.that(output.apply("Sort", MapElements.via(new SortValues()))) .containsInAnyOrder( KV.of("k1", Arrays.asList(3, 4)), - KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), - KV.of("k2", Arrays.asList(66, -33)), + KV.of("k5", Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE)), + KV.of("k2", Arrays.asList(-33, 66)), KV.of("k3", Arrays.asList(0))); p.run(); @@ -166,7 +168,20 @@ public void testGroupByKeyGcpSecret() { @Category(NeedsRunner.class) public void testGroupByKeyGcpSecretThrows() { Secret gcpSecret = new GcpSecret("bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByEncryptedKey.create(gcpSecret)); + p.apply(Create.of(KV.of("k1", 1))) + .apply(GroupByEncryptedKey.create(gcpSecret)); assertThrows(RuntimeException.class, () -> p.run()); } + + private static class SortValues + extends SimpleFunction>, KV>> { + @Override + public KV> apply(KV> input) { + List sorted = + StreamSupport.stream(input.getValue().spliterator(), false) + .sorted() + .collect(Collectors.toList()); + return KV.of(input.getKey(), sorted); + } + } } From 9230d2e87fd96d9f1e7687cc5904b2f39394c4cb Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 12:08:08 -0400 Subject: [PATCH 06/24] Move secret code into utils --- .../org/apache/beam/sdk/transforms/GroupByEncryptedKey.java | 1 + .../org/apache/beam/sdk/{transforms => util}/GcpSecret.java | 2 +- .../java/org/apache/beam/sdk/{transforms => util}/Secret.java | 2 +- .../org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java | 2 ++ 4 files changed, 5 insertions(+), 2 deletions(-) rename sdks/java/core/src/main/java/org/apache/beam/sdk/{transforms => util}/GcpSecret.java (98%) rename sdks/java/core/src/main/java/org/apache/beam/sdk/{transforms => util}/Secret.java (96%) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index ef7a21ee7e43..de50626596fd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java similarity index 98% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index d7d04be23a83..8a33f19d33d9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.sdk.util; import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java similarity index 96% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index ae34fb6ee630..b9c974cee352 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.transforms; +package org.apache.beam.sdk.util; import java.io.Serializable; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java index 5ccb1ec41cc5..ba4c50e5a41e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByEncryptedKeyTest.java @@ -38,6 +38,8 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.util.GcpSecret; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.AfterClass; From 56b7130a083807452823385376654dffabc1af0b Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 13:24:54 -0400 Subject: [PATCH 07/24] Use secret manager from bom --- sdks/java/core/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 9fbd9a70385d..5319b5e3f82c 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -100,7 +100,7 @@ dependencies { shadow library.java.snappy_java shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) - implementation 'com.google.cloud:google-cloud-secretmanager:2.75.0' + implementation library.java.google_cloud_secret_manager permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema From 9f2204fca4b71a5e21783a9bfe2930396e8765a6 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 13:29:02 -0400 Subject: [PATCH 08/24] Docs --- .../apache/beam/sdk/transforms/GroupByEncryptedKey.java | 8 ++++++++ .../src/main/java/org/apache/beam/sdk/util/GcpSecret.java | 6 ++++-- .../src/main/java/org/apache/beam/sdk/util/Secret.java | 6 +++++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index de50626596fd..03e9ba41bbf7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -46,6 +46,14 @@ private GroupByEncryptedKey(Secret hmacKey) { this.hmacKey = hmacKey; } + /** + * Creates a {@link GroupByEncryptedKey} transform. + * + * @param hmacKey The secret key to use for HMAC. + * @param The type of the keys in the input PCollection. + * @param The type of the values in the input PCollection. + * @return A {@link GroupByEncryptedKey} transform. + */ public static GroupByEncryptedKey create(Secret hmacKey) { return new GroupByEncryptedKey<>(hmacKey); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index 8a33f19d33d9..2fcea6f0619f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -22,12 +22,14 @@ import com.google.cloud.secretmanager.v1.SecretVersionName; import java.io.IOException; -/** A secret manager implementation that retrieves secrets from Google Cloud Secret Manager. */ +/** + * A {@link Secret} manager implementation that retrieves secrets from Google Cloud Secret Manager. + */ public class GcpSecret implements Secret { private final String versionName; /** - * Initializes a GcpSecret object. + * Initializes a {@link GcpSecret} object. * * @param versionName The full version name of the secret in Google Cloud Secret Manager. For * example: projects//secrets//versions/1. For more info, see diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index b9c974cee352..874fc4efd8b3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -26,6 +26,10 @@ * should handle fetching secrets from a secret management system. */ public interface Secret extends Serializable { - /** Returns the secret as a byte array. */ + /** + * Returns the secret as a byte array. + * + * @return The secret as a byte array. + */ byte[] getSecretBytes(); } From 25d96bd2b788ff15112c695d3320042fe7ceff60 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 26 Sep 2025 15:42:07 -0400 Subject: [PATCH 09/24] Better docs --- .../sdk/transforms/GroupByEncryptedKey.java | 31 ++++++++++++++++++- .../org/apache/beam/sdk/util/GcpSecret.java | 6 ++++ .../java/org/apache/beam/sdk/util/Secret.java | 3 +- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 03e9ba41bbf7..f7b2958de9c8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -36,6 +36,17 @@ * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in * the output. This is useful when the keys contain sensitive data that should not be stored at rest * by the runner. + * + * The transform requires a {@link Secret} which returns a 32 byte secret which can be used to + * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. + * + * Note the following caveats: + * 1) Runners can implement arbitrary materialization steps, so this does not guarantee that the + * whole pipeline will not have unencrypted data at rest by itself. + * 2) If using this transform in streaming mode, this transform may not properly handle update + * compatibility checks around coders. This means that an improper update could lead to invalid + * coders, causing pipeline failure or data corruption. If you need to update, make sure that the + * input type passed into this transform does not change. */ public class GroupByEncryptedKey extends PTransform>, PCollection>>> { @@ -49,7 +60,7 @@ private GroupByEncryptedKey(Secret hmacKey) { /** * Creates a {@link GroupByEncryptedKey} transform. * - * @param hmacKey The secret key to use for HMAC. + * @param hmacKey The {@link Secret} key to use for encryption. * @param The type of the keys in the input PCollection. * @param The type of the values in the input PCollection. * @return A {@link GroupByEncryptedKey} transform. @@ -80,6 +91,12 @@ public PCollection>> expand(PCollection> input) { .setCoder(KvCoder.of(keyCoder, IterableCoder.of(valueCoder))); } + /** + * A {@link PTransform} that encrypts the key and value of an element. + * + * The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, + * and the value being a KV pair of the encrypted key and value. + */ @SuppressWarnings("initialization.fields.uninitialized") private static class EncryptMessage extends DoFn, KV>> { private final Secret hmacKey; @@ -127,6 +144,18 @@ private byte[] encode(Coder coder, T value) throws Exception { } } + /** + * A {@link PTransform} that decrypts the key and values of an element. + * + * The input PCollection will be a KV pair with the key being the HMAC of the encoded key, + * and the value being a list of KV pairs of the encrypted key and value. + * + * This will return a tuple containing the decrypted key and a list of decrypted values. + * + * Since there is some loss of precision in the HMAC encoding of the key (but not the key + * encryption), there is some extra work done here to ensure that all key/value pairs are + * mapped out appropriately. + */ @SuppressWarnings("initialization.fields.uninitialized") private static class DecryptMessage extends DoFn>>, KV>> { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index 2fcea6f0619f..d19338b1afba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -39,6 +39,12 @@ public GcpSecret(String versionName) { this.versionName = versionName; } + /** + * Returns the secret as a byte array. Assumes that the current active service account + * has permissions to read the secret. + * + * @return The secret as a byte array. + */ @Override public byte[] getSecretBytes() { try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index 874fc4efd8b3..269a94886311 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -23,7 +23,8 @@ * A secret management interface used for handling sensitive data. * *

This interface provides a generic way to handle secrets. Implementations of this interface - * should handle fetching secrets from a secret management system. + * should handle fetching secrets from a secret management system. The underlying secret + * management system should be able to return a valid byte array representing the secret. */ public interface Secret extends Serializable { /** From cbd441abd64dfd0776f8104acf8d8f579be00087 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 29 Sep 2025 13:59:19 -0400 Subject: [PATCH 10/24] Updates --- sdks/java/core/build.gradle | 2 + .../sdk/transforms/GroupByEncryptedKey.java | 44 +++++++++++-------- .../org/apache/beam/sdk/util/GcpSecret.java | 4 +- .../java/org/apache/beam/sdk/util/Secret.java | 4 +- 4 files changed, 32 insertions(+), 22 deletions(-) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 5319b5e3f82c..953caee27793 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -101,6 +101,8 @@ dependencies { shadow library.java.joda_time implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom) implementation library.java.google_cloud_secret_manager + implementation library.java.proto_google_cloud_secret_manager_v1 + implementation library.java.protobuf_java permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index f7b2958de9c8..0e5a873799a2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -22,6 +22,7 @@ import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.util.Secret; @@ -36,14 +37,13 @@ * org.apache.beam.sdk.transforms.GroupByKey} on the encrypted keys, and then decrypts the keys in * the output. This is useful when the keys contain sensitive data that should not be stored at rest * by the runner. - * - * The transform requires a {@link Secret} which returns a 32 byte secret which can be used to + * + *

The transform requires a {@link Secret} which returns a 32 byte secret which can be used to * generate a {@link SecretKeySpec} object using the HmacSHA256 algorithm. - * - * Note the following caveats: - * 1) Runners can implement arbitrary materialization steps, so this does not guarantee that the - * whole pipeline will not have unencrypted data at rest by itself. - * 2) If using this transform in streaming mode, this transform may not properly handle update + * + *

Note the following caveats: 1) Runners can implement arbitrary materialization steps, so this + * does not guarantee that the whole pipeline will not have unencrypted data at rest by itself. 2) + * If using this transform in streaming mode, this transform may not properly handle update * compatibility checks around coders. This means that an improper update could lead to invalid * coders, causing pipeline failure or data corruption. If you need to update, make sure that the * input type passed into this transform does not change. @@ -77,6 +77,14 @@ public PCollection>> expand(PCollection> input) { } KvCoder inputKvCoder = (KvCoder) inputCoder; Coder keyCoder = inputKvCoder.getKeyCoder(); + + try { + keyCoder.verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new IllegalStateException( + "the keyCoder of a GroupByEncryptedKey must be deterministic", e); + } + Coder valueCoder = inputKvCoder.getValueCoder(); PCollection>>> grouped = @@ -93,8 +101,8 @@ public PCollection>> expand(PCollection> input) { /** * A {@link PTransform} that encrypts the key and value of an element. - * - * The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, + * + *

The resulting PCollection will be a KV pair with the key being the HMAC of the encoded key, * and the value being a KV pair of the encrypted key and value. */ @SuppressWarnings("initialization.fields.uninitialized") @@ -146,15 +154,15 @@ private byte[] encode(Coder coder, T value) throws Exception { /** * A {@link PTransform} that decrypts the key and values of an element. - * - * The input PCollection will be a KV pair with the key being the HMAC of the encoded key, - * and the value being a list of KV pairs of the encrypted key and value. - * - * This will return a tuple containing the decrypted key and a list of decrypted values. - * - * Since there is some loss of precision in the HMAC encoding of the key (but not the key - * encryption), there is some extra work done here to ensure that all key/value pairs are - * mapped out appropriately. + * + *

The input PCollection will be a KV pair with the key being the HMAC of the encoded key, and + * the value being a list of KV pairs of the encrypted key and value. + * + *

This will return a tuple containing the decrypted key and a list of decrypted values. + * + *

Since there is some loss of precision in the HMAC encoding of the key (but not the key + * encryption), there is some extra work done here to ensure that all key/value pairs are mapped + * out appropriately. */ @SuppressWarnings("initialization.fields.uninitialized") private static class DecryptMessage diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index d19338b1afba..80bc3a54535e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -40,8 +40,8 @@ public GcpSecret(String versionName) { } /** - * Returns the secret as a byte array. Assumes that the current active service account - * has permissions to read the secret. + * Returns the secret as a byte array. Assumes that the current active service account has + * permissions to read the secret. * * @return The secret as a byte array. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index 269a94886311..fe476ef6cb1d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -23,8 +23,8 @@ * A secret management interface used for handling sensitive data. * *

This interface provides a generic way to handle secrets. Implementations of this interface - * should handle fetching secrets from a secret management system. The underlying secret - * management system should be able to return a valid byte array representing the secret. + * should handle fetching secrets from a secret management system. The underlying secret management + * system should be able to return a valid byte array representing the secret. */ public interface Secret extends Serializable { /** From 90fa3fd2887acfdf221d6c3ba9fa3a2a0c47db35 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 1 Oct 2025 09:57:27 -0400 Subject: [PATCH 11/24] [WIP] Add pipeline option to force GBEK (Java) --- .../dataflow/internal/DataflowGroupByKey.java | 49 ++++++++++++++ .../beam/sdk/options/PipelineOptions.java | 35 ++++++++++ .../sdk/transforms/GroupByEncryptedKey.java | 37 ++++++++-- .../beam/sdk/transforms/GroupByKey.java | 34 ++++++++++ .../org/apache/beam/sdk/util/GcpSecret.java | 9 +++ .../java/org/apache/beam/sdk/util/Secret.java | 49 ++++++++++++++ .../construction/GroupByKeyTranslation.java | 12 ++++ .../org/apache/beam/sdk/util/SecretTest.java | 67 +++++++++++++++++++ 8 files changed, 288 insertions(+), 4 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java index 89135641689e..82affe15f0c9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java @@ -25,10 +25,13 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.GroupByEncryptedKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; @@ -46,9 +49,13 @@ public class DataflowGroupByKey // Plumbed from Redistribute transform. private final boolean allowDuplicates; + private boolean insideGBEK; + private boolean surroundsGBEK; private DataflowGroupByKey(boolean allowDuplicates) { this.allowDuplicates = allowDuplicates; + this.insideGBEK = false; + this.surroundsGBEK = false; } /** @@ -79,6 +86,22 @@ public boolean allowDuplicates() { return allowDuplicates; } + /** + * For Beam internal use only. Tells runner that this is an inner GBK inside of a + * GroupByEncryptedKey + */ + public void setInsideGBEK() { + this.insideGBEK = true; + } + + /** + * For Beam internal use only. Tells runner that this is a GBK wrapped around of a + * GroupByEncryptedKey + */ + public boolean surroundsGBEK() { + return this.surroundsGBEK; + } + ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { @@ -117,6 +140,20 @@ public PCollection>> expand(PCollection> input) { "the keyCoder of a DataflowGroupByKey must be deterministic", e); } + PipelineOptions options = input.getPipeline().getOptions(); + String gbekOveride = options.getGBEK(); + if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { + this.surroundsGBEK = true; + Secret hmacSecret = Secret.parseSecretOption(gbekOveride); + DataflowGroupByKey> gbk = DataflowGroupByKey.create(); + if (this.allowDuplicates) { + gbk = DataflowGroupByKey.createWithAllowDuplicates(); + } + gbk.setInsideGBEK(); + GroupByEncryptedKey gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk); + return input.apply(gbek); + } + // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the @@ -171,10 +208,22 @@ public String getUrn() { return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; } + @Override + public String getUrn(DataflowGroupByKey transform) { + if (transform.surroundsGBEK()) { + return "beam:transform:group_by_key_wrapper:v1"; + } + return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; + } + @Override @SuppressWarnings("nullness") public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (transform.getTransform().surroundsGBEK()) { + // Can use null for spec for empty composite. + return null; + } return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 2eba8c6ef68d..8d34345306ce 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -413,6 +414,40 @@ public Long create(PipelineOptions options) { void setUserAgent(String userAgent); + /** + * A string defining whether GroupByKey transforms should be replaced by GroupByEncryptedKey + * + *

Beam will infer the secret type and value based on the secret itself. This guarantees that + * any data at rest during the performing a GBK, so this can be used to guarantee that data is not + * unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The + * option should be structured like: + * + *


+   * --encrypt=type:;:
+   * 
+ * + * for example: + * + *

+   * --encrypt=type:GcpSecret;version_name:my_secret/versions/latest"
+   * 
+ * + * All variables should use snake case to allow consistency across languages. + */ + @Description( + "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" + + " infer the secret type and value based on the secret itself. This guarantees that" + + " any data at rest during the performing a GBK, so this can be used to guarantee" + + " that data is not unencrypted. Runners with this behavior include the Dataflow," + + " Flink, and Spark runners. The option should be structured like:" + + " --encrypt=type:;:, for example " + + " --encrypt=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + + " should use snake case to allow consistency across languages.") + @Nullable + String getGBEK(); + + void setGBEK(String gbek); + /** * Returns a user agent string constructed from {@link ReleaseInfo#getName()} and {@link * ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 0e5a873799a2..56806633c3ca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -52,9 +52,19 @@ public class GroupByEncryptedKey extends PTransform>, PCollection>>> { private final Secret hmacKey; - - private GroupByEncryptedKey(Secret hmacKey) { + private final PTransform< + PCollection>>, + PCollection>>>> + gbk; + + private GroupByEncryptedKey( + Secret hmacKey, + PTransform< + PCollection>>, + PCollection>>>> + gbk) { this.hmacKey = hmacKey; + this.gbk = gbk; } /** @@ -66,7 +76,26 @@ private GroupByEncryptedKey(Secret hmacKey) { * @return A {@link GroupByEncryptedKey} transform. */ public static GroupByEncryptedKey create(Secret hmacKey) { - return new GroupByEncryptedKey<>(hmacKey); + GroupByKey> gbk = GroupByKey.create(); + return new GroupByEncryptedKey<>(hmacKey, GroupByKey.create()); + } + + /** + * Creates a {@link GroupByEncryptedKey} transform with a custom GBK in the middle. + * + * @param hmacKey The {@link Secret} key to use for encryption. + * @param gbk The custom GBK transform to use in the middle of the GBEK. + * @param The type of the keys in the input PCollection. + * @param The type of the values in the input PCollection. + * @return A {@link GroupByEncryptedKey} transform. + */ + public static GroupByEncryptedKey createWithCustomGbk( + Secret hmacKey, + PTransform< + PCollection>>, + PCollection>>>> + gbk) { + return new GroupByEncryptedKey<>(hmacKey, gbk); } @Override @@ -92,7 +121,7 @@ public PCollection>> expand(PCollection> input) { .apply( "EncryptMessage", ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) - .apply(GroupByKey.create()); + .apply(this.gbk); return grouped .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index d0b320a87654..da3fcd1de6af 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -115,9 +116,13 @@ public class GroupByKey extends PTransform>, PCollection>>> { private final boolean fewKeys; + private boolean insideGBEK; + private boolean surroundsGBEK; private GroupByKey(boolean fewKeys) { this.fewKeys = fewKeys; + this.insideGBEK = false; + surroundsGBEK = false; } /** @@ -148,6 +153,21 @@ public boolean fewKeys() { return fewKeys; } + /** + * For Beam internal use only. Tells runner that this is an inner GBK inside a GroupByEncryptedKey + */ + public void setInsideGBEK() { + this.insideGBEK = true; + } + + /** + * For Beam internal use only. Tells runner that this is a GBK wrapped around of a + * GroupByEncryptedKey + */ + public boolean surroundsGBEK() { + return this.surroundsGBEK; + } + ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { @@ -244,6 +264,20 @@ public PCollection>> expand(PCollection> input) { throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e); } + PipelineOptions options = input.getPipeline().getOptions(); + String gbekOveride = options.getGBEK(); + if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { + this.surroundsGBEK = true; + Secret hmacSecret = Secret.parseSecretOption(gbekOveride); + GroupByKey> gbk = GroupByKey.create(); + if (this.fewKeys) { + gbk = GroupByKey.createWithFewKeys(); + } + gbk.setInsideGBEK(); + GroupByEncryptedKey gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk); + return input.apply(gbek); + } + // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index 80bc3a54535e..8effae7f61cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -55,4 +55,13 @@ public byte[] getSecretBytes() { throw new RuntimeException("Failed to retrieve secret bytes", e); } } + + /** + * Returns the version name of the secret. + * + * @return The version name as a String. + */ + public String getVersionName() { + return versionName; + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index fe476ef6cb1d..a75e01c9543f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -18,6 +18,11 @@ package org.apache.beam.sdk.util; import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; /** * A secret management interface used for handling sensitive data. @@ -33,4 +38,48 @@ public interface Secret extends Serializable { * @return The secret as a byte array. */ byte[] getSecretBytes(); + + static Secret parseSecretOption(String secretOption) { + Map paramMap = new HashMap<>(); + for (String param : secretOption.split(";", -1)) { + String[] parts = param.split(":", 2); + if (parts.length == 2) { + paramMap.put(parts[0], parts[1]); + } + } + + if (!paramMap.containsKey("type")) { + throw new RuntimeException("Secret string must contain a valid type parameter"); + } + + String secretType = paramMap.get("type"); + paramMap.remove("type"); + + if (secretType == null) { + throw new RuntimeException("Secret string must contain a valid value for type parameter"); + } + + switch (secretType.toLowerCase()) { + case "gcpsecret": + Set gcpSecretParams = new HashSet<>(Arrays.asList("version_name")); + for (String paramName : paramMap.keySet()) { + if (!gcpSecretParams.contains(paramName)) { + throw new RuntimeException( + String.format( + "Invalid secret parameter %s, GcpSecret only supports the following parameters: %s", + paramName, gcpSecretParams)); + } + } + String versionName = paramMap.get("version_name"); + if (versionName == null) { + throw new RuntimeException( + "version_name must contain a valid value for versionName parameter"); + } + return new GcpSecret(versionName); + default: + throw new RuntimeException( + String.format( + "Invalid secret type %s, currently only GcpSecret is supported", secretType)); + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java index d08a48d0e5e6..1471c790a686 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java @@ -43,9 +43,21 @@ public String getUrn() { return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; } + @Override + public String getUrn(GroupByKey transform) { + if (transform.surroundsGBEK()) { + return "beam:transform:group_by_key_wrapper:v1"; + } + return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; + } + @Override public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (transform.getTransform().surroundsGBEK()) { + // Can use null for spec for empty composite. + return null; + } return FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java new file mode 100644 index 000000000000..dd4b125d73fe --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link org.apache.beam.sdk.util.Secret}. */ +@RunWith(JUnit4.class) +public class SecretTest { + + @Test + public void testParseSecretOptionWithValidGcpSecret() { + String secretOption = "type:gcpsecret;version_name:my_secret/versions/latest"; + Secret secret = Secret.parseSecretOption(secretOption); + assertTrue(secret instanceof GcpSecret); + assertEquals("my_secret/versions/latest", ((GcpSecret) secret).getVersionName()); + } + + @Test + public void testParseSecretOptionWithMissingType() { + String secretOption = "version_name:my_secret/versions/latest"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals("Secret string must contain a valid type parameter", exception.getMessage()); + } + + @Test + public void testParseSecretOptionWithUnsupportedType() { + String secretOption = "type:unsupported;version_name:my_secret/versions/latest"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals( + "Invalid secret type unsupported, currently only GcpSecret is supported", + exception.getMessage()); + } + + @Test + public void testParseSecretOptionWithInvalidGcpSecretParameter() { + String secretOption = "type:gcpsecret;invalid_param:some_value"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals( + "Invalid secret parameter invalid_param, GcpSecret only supports the following parameters: [version_name]", + exception.getMessage()); + } +} From f34d8c5b7c731b806a1634d0a8f90ceb4080d71b Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 1 Oct 2025 10:00:07 -0400 Subject: [PATCH 12/24] Trigger some postcommits --- .github/trigger_files/beam_PostCommit_Java.json | 2 +- .github/trigger_files/beam_PostCommit_Java_DataflowV1.json | 2 +- .github/trigger_files/beam_PostCommit_Java_DataflowV2.json | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 920c8d132e4a..8784d0786c02 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index aaf5ab50160a..393fa764afdf 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -2,6 +2,6 @@ "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1, + "modification": 2, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index d266aa094efa..59645fbcbe09 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -2,6 +2,6 @@ "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3, + "modification": 4, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } From 8c25c5644addb26ea4c63ec65e18d1fe15792bcc Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 1 Oct 2025 10:29:16 -0400 Subject: [PATCH 13/24] Update triggers --- .github/trigger_files/beam_PostCommit_Java_DataflowV2.json | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 59645fbcbe09..3717f48ee492 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,7 +1,4 @@ { - "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", - "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", - "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 4, + "modification": 4, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } From 6c8f959d360c92e72430cda47b2313985bf4a8a7 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 1 Oct 2025 11:38:51 -0400 Subject: [PATCH 14/24] Tests --- .../transforms/DataflowGroupByKeyTest.java | 95 +++++++++++++++++++ .../beam/checkstyle/suppressions.xml | 2 + .../beam/sdk/transforms/GroupByKeyTest.java | 88 +++++++++++++++++ 3 files changed, 185 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java index 915dc74a2f1b..fafaca2dc675 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java @@ -18,6 +18,15 @@ package org.apache.beam.runners.dataflow.transforms; import com.google.api.services.dataflow.Dataflow; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.List; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; @@ -26,15 +35,21 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.WindowingStrategy; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -48,11 +63,49 @@ public class DataflowGroupByKeyTest { @Mock private Dataflow dataflow; + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private static String gcpSecretVersionName; + @Before public void setUp() { MockitoAnnotations.initMocks(this); } + @BeforeClass + public static void setup() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, SECRET_ID, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } + /** * Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey} is not * expanded. This is used for verifying that even without expansion the proper errors show up. @@ -92,4 +145,46 @@ public PCollection> expand(PBegin input) { input.apply("GroupByKey", GroupByKey.create()); } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithValidGcpSecretOption() { + Pipeline p = createTestServiceRunner(); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + p.getOptions().setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + PCollection>> output = input.apply(new DataflowGroupByKey<>()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithInvalidGcpSecretOption() { + Pipeline p = createTestServiceRunner(); + p.getOptions().setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))).apply(new DataflowGroupByKey<>()); + thrown.expect(RuntimeException.class); + p.run(); + } } diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index 5ee8872d006e..2b43b45f36c1 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -57,6 +57,8 @@ + + diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 5464838ad4db..608d53886433 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -26,12 +26,18 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertThrows; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -90,7 +96,9 @@ import org.hamcrest.Matcher; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -109,6 +117,45 @@ public class GroupByKeyTest implements Serializable { /** Shared test base class with setup/teardown helpers. */ public abstract static class SharedTestBase { @Rule public transient TestPipeline p = TestPipeline.create(); + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + public static String gcpSecretVersionName; + + @BeforeClass + public static void setup() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, SECRET_ID, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, + SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } } /** Tests validating basic {@link GroupByKey} scenarios. */ @@ -614,6 +661,47 @@ public void testLargeKeys10MB() throws Exception { public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithValidGcpSecretOption() { + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + p.getOptions().setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + PCollection>> output = input.apply(GroupByKey.create()); + + SerializableFunction>>, Void> checker = + containsKvs( + kv("k1", 3, 4), + kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), + kv("k2", 66, -33), + kv("k3", 0)); + PAssert.that(output).satisfies(checker); + PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithInvalidGcpSecretOption() { + p.getOptions().setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + assertThrows(RuntimeException.class, () -> p.run()); + } } /** Tests validating GroupByKey behaviors with windowing. */ From 705213a7d2789d3699e89bd8b7bb72871583d0c7 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Wed, 1 Oct 2025 12:48:57 -0400 Subject: [PATCH 15/24] test fixes --- .../transforms/DataflowGroupByKeyTest.java | 28 +++++++++++++++---- .../beam/sdk/transforms/GroupByKeyTest.java | 24 +++++++++++++--- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java index fafaca2dc675..5b916f4afee8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java @@ -74,7 +74,13 @@ public void setUp() { @BeforeClass public static void setup() throws IOException { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } ProjectName projectName = ProjectName.of(PROJECT_ID); SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); @@ -101,9 +107,11 @@ public static void setup() throws IOException { @AfterClass public static void tearDown() throws IOException { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); - client.deleteSecret(secretName); + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } } /** @@ -149,6 +157,10 @@ public PCollection> expand(PBegin input) { @Test @Category(NeedsRunner.class) public void testGroupByKeyWithValidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } Pipeline p = createTestServiceRunner(); List> ungroupedPairs = Arrays.asList( @@ -166,7 +178,7 @@ public void testGroupByKeyWithValidGcpSecretOption() { .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); p.getOptions().setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - PCollection>> output = input.apply(new DataflowGroupByKey<>()); + PCollection>> output = input.apply(GroupByKey.create()); PAssert.that(output) .containsInAnyOrder( @@ -181,9 +193,13 @@ public void testGroupByKeyWithValidGcpSecretOption() { @Test @Category(NeedsRunner.class) public void testGroupByKeyWithInvalidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } Pipeline p = createTestServiceRunner(); p.getOptions().setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))).apply(new DataflowGroupByKey<>()); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); thrown.expect(RuntimeException.class); p.run(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 608d53886433..b8a1fa2976ce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -124,7 +124,13 @@ public abstract static class SharedTestBase { @BeforeClass public static void setup() throws IOException { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } ProjectName projectName = ProjectName.of(PROJECT_ID); SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); @@ -152,9 +158,11 @@ public static void setup() throws IOException { @AfterClass public static void tearDown() throws IOException { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); - client.deleteSecret(secretName); + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } } } @@ -665,6 +673,10 @@ public void testLargeKeys100MB() throws Exception { @Test @Category(NeedsRunner.class) public void testGroupByKeyWithValidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } List> ungroupedPairs = Arrays.asList( KV.of("k1", 3), @@ -698,6 +710,10 @@ public void testGroupByKeyWithValidGcpSecretOption() { @Test @Category(NeedsRunner.class) public void testGroupByKeyWithInvalidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } p.getOptions().setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); assertThrows(RuntimeException.class, () -> p.run()); From 6dfdea9672358c15df2bf362fb4fe40af715855b Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 2 Oct 2025 13:25:23 -0400 Subject: [PATCH 16/24] Move tests to IT --- .../transforms/DataflowGroupByKeyTest.java | 111 ---------- .../beam/checkstyle/suppressions.xml | 2 +- .../beam/sdk/transforms/GroupByKeyIT.java | 190 ++++++++++++++++++ 3 files changed, 191 insertions(+), 112 deletions(-) create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java index 5b916f4afee8..915dc74a2f1b 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/transforms/DataflowGroupByKeyTest.java @@ -18,15 +18,6 @@ package org.apache.beam.runners.dataflow.transforms; import com.google.api.services.dataflow.Dataflow; -import com.google.cloud.secretmanager.v1.ProjectName; -import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; -import com.google.cloud.secretmanager.v1.SecretName; -import com.google.cloud.secretmanager.v1.SecretPayload; -import com.google.protobuf.ByteString; -import java.io.IOException; -import java.security.SecureRandom; -import java.util.Arrays; -import java.util.List; import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; @@ -35,21 +26,15 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.WindowingStrategy; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -63,57 +48,11 @@ public class DataflowGroupByKeyTest { @Mock private Dataflow dataflow; - private static final String PROJECT_ID = "apache-beam-testing"; - private static final String SECRET_ID = "gbek-test"; - private static String gcpSecretVersionName; - @Before public void setUp() { MockitoAnnotations.initMocks(this); } - @BeforeClass - public static void setup() throws IOException { - SecretManagerServiceClient client; - try { - client = SecretManagerServiceClient.create(); - } catch (IOException e) { - gcpSecretVersionName = null; - return; - } - ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); - - try { - client.getSecret(secretName); - } catch (Exception e) { - com.google.cloud.secretmanager.v1.Secret secret = - com.google.cloud.secretmanager.v1.Secret.newBuilder() - .setReplication( - com.google.cloud.secretmanager.v1.Replication.newBuilder() - .setAutomatic( - com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() - .build()) - .build()) - .build(); - client.createSecret(projectName, SECRET_ID, secret); - byte[] secretBytes = new byte[32]; - new SecureRandom().nextBytes(secretBytes); - client.addSecretVersion( - secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); - } - gcpSecretVersionName = secretName.toString() + "/versions/latest"; - } - - @AfterClass - public static void tearDown() throws IOException { - if (gcpSecretVersionName != null) { - SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); - client.deleteSecret(secretName); - } - } - /** * Create a test pipeline that uses the {@link DataflowRunner} so that {@link GroupByKey} is not * expanded. This is used for verifying that even without expansion the proper errors show up. @@ -153,54 +92,4 @@ public PCollection> expand(PBegin input) { input.apply("GroupByKey", GroupByKey.create()); } - - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithValidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - Pipeline p = createTestServiceRunner(); - List> ungroupedPairs = - Arrays.asList( - KV.of("k1", 3), - KV.of("k5", Integer.MAX_VALUE), - KV.of("k5", Integer.MIN_VALUE), - KV.of("k2", 66), - KV.of("k1", 4), - KV.of("k2", -33), - KV.of("k3", 0)); - - PCollection> input = - p.apply( - Create.of(ungroupedPairs) - .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); - - p.getOptions().setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); - PCollection>> output = input.apply(GroupByKey.create()); - - PAssert.that(output) - .containsInAnyOrder( - KV.of("k1", Arrays.asList(3, 4)), - KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), - KV.of("k2", Arrays.asList(66, -33)), - KV.of("k3", Arrays.asList(0))); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testGroupByKeyWithInvalidGcpSecretOption() { - if (gcpSecretVersionName == null) { - // Skip test if we couldn't set up secret manager - return; - } - Pipeline p = createTestServiceRunner(); - p.getOptions().setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); - p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); - thrown.expect(RuntimeException.class); - p.run(); - } } diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index 2b43b45f36c1..4ba84d31a76b 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -58,7 +58,7 @@ - + diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java new file mode 100644 index 000000000000..1ac9d61547c1 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration test for GroupByKey transforms and some other transforms which use GBK. */ +@RunWith(JUnit4.class) +public class GroupByKeyIT { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private static String gcpSecretVersionName; + + @BeforeClass + public static void setup() throws IOException { + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, SECRET_ID, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + client.deleteSecret(secretName); + } + } + + @Test + public void testGroupByKeyWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = input.apply(GroupByKey.create()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + thrown.expect(RuntimeException.class); + p.run(); + } + + // Redistribute depends on GBK under the hood and can have runner-specific implementations + @Test + public void redistributeWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + PCollection> output = input.apply(Redistribute.byKey()); + PAssert.that(output).containsInAnyOrder(ungroupedPairs); + + p.run(); + } + + @Test + public void testRedistributeWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey()); + thrown.expect(RuntimeException.class); + p.run(); + } +} From 7c821e40dc5dd5ebc891b7a27971e720bc1df372 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Thu, 2 Oct 2025 14:41:28 -0400 Subject: [PATCH 17/24] Randomized secret postfix --- .../java/org/apache/beam/sdk/transforms/GroupByKeyIT.java | 8 +++++--- .../org/apache/beam/sdk/transforms/GroupByKeyTest.java | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 1ac9d61547c1..d1fa43be2853 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -51,9 +51,11 @@ public class GroupByKeyIT { private static final String PROJECT_ID = "apache-beam-testing"; private static final String SECRET_ID = "gbek-test"; private static String gcpSecretVersionName; + private static String secretId; @BeforeClass public static void setup() throws IOException { + secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); SecretManagerServiceClient client; try { client = SecretManagerServiceClient.create(); @@ -62,7 +64,7 @@ public static void setup() throws IOException { return; } ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); try { client.getSecret(secretName); @@ -76,7 +78,7 @@ public static void setup() throws IOException { .build()) .build()) .build(); - client.createSecret(projectName, SECRET_ID, secret); + client.createSecret(projectName, secretId, secret); byte[] secretBytes = new byte[32]; new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( @@ -89,7 +91,7 @@ public static void setup() throws IOException { public static void tearDown() throws IOException { if (gcpSecretVersionName != null) { SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); client.deleteSecret(secretName); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index b8a1fa2976ce..39d1318eda4a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -121,9 +121,11 @@ public abstract static class SharedTestBase { private static final String PROJECT_ID = "apache-beam-testing"; private static final String SECRET_ID = "gbek-test"; public static String gcpSecretVersionName; + private static String secretId; @BeforeClass public static void setup() throws IOException { + secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); SecretManagerServiceClient client; try { client = SecretManagerServiceClient.create(); @@ -132,7 +134,7 @@ public static void setup() throws IOException { return; } ProjectName projectName = ProjectName.of(PROJECT_ID); - SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); try { client.getSecret(secretName); @@ -146,7 +148,7 @@ public static void setup() throws IOException { .build()) .build()) .build(); - client.createSecret(projectName, SECRET_ID, secret); + client.createSecret(projectName, secretId, secret); byte[] secretBytes = new byte[32]; new SecureRandom().nextBytes(secretBytes); client.addSecretVersion( @@ -160,7 +162,7 @@ public static void setup() throws IOException { public static void tearDown() throws IOException { if (gcpSecretVersionName != null) { SecretManagerServiceClient client = SecretManagerServiceClient.create(); - SecretName secretName = SecretName.of(PROJECT_ID, SECRET_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); client.deleteSecret(secretName); } } From b19082e53ae46cde331ba3b862ce792706b332f3 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 15:16:43 -0400 Subject: [PATCH 18/24] Update encryption mode --- .../sdk/transforms/GroupByEncryptedKey.java | 50 +++++++++++++++---- 1 file changed, 40 insertions(+), 10 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 56806633c3ca..512bc29d186f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -20,6 +20,7 @@ import java.util.Arrays; import javax.crypto.Cipher; import javax.crypto.Mac; +import javax.crypto.spec.GCMParameterSpec; import javax.crypto.spec.SecretKeySpec; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; @@ -141,6 +142,7 @@ private static class EncryptMessage extends DoFn, KV valueCoder; private transient Mac mac; private transient Cipher cipher; + private transient SecretKeySpec secretKeySpec; EncryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; @@ -151,11 +153,11 @@ private static class EncryptMessage extends DoFn, KV byte[] encode(Coder coder, T value) throws Exception { @@ -200,6 +217,7 @@ private static class DecryptMessage private final Coder keyCoder; private final Coder valueCoder; private transient Cipher cipher; + private transient SecretKeySpec secretKeySpec; DecryptMessage(Secret hmacKey, Coder keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; @@ -210,9 +228,8 @@ private static class DecryptMessage @Setup public void setup() { try { - this.cipher = Cipher.getInstance("AES"); - this.cipher.init( - Cipher.DECRYPT_MODE, new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES")); + this.cipher = Cipher.getInstance("AES/GCM/NoPadding"); + this.secretKeySpec = new SecretKeySpec(this.hmacKey.getSecretBytes(), "AES"); } catch (Exception ex) { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); @@ -223,14 +240,27 @@ public void setup() { public void processElement(ProcessContext c) throws Exception { java.util.Map> decryptedKvs = new java.util.HashMap<>(); for (KV encryptedKv : c.element().getValue()) { - byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKv.getKey()); + byte[] iv = Arrays.copyOfRange(encryptedKv.getKey(), 0, 12); + GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, iv); + this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec, gcmParameterSpec); + + byte[] encryptedKey = + Arrays.copyOfRange(encryptedKv.getKey(), 12, encryptedKv.getKey().length); + byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKey); K key = decode(this.keyCoder, decryptedKeyBytes); if (key != null) { if (!decryptedKvs.containsKey(key)) { decryptedKvs.put(key, new java.util.ArrayList<>()); } - byte[] decryptedValueBytes = this.cipher.doFinal(encryptedKv.getValue()); + + iv = Arrays.copyOfRange(encryptedKv.getValue(), 0, 12); + gcmParameterSpec = new GCMParameterSpec(128, iv); + this.cipher.init(Cipher.DECRYPT_MODE, this.secretKeySpec, gcmParameterSpec); + + byte[] encryptedValue = + Arrays.copyOfRange(encryptedKv.getValue(), 12, encryptedKv.getValue().length); + byte[] decryptedValueBytes = this.cipher.doFinal(encryptedValue); V value = decode(this.valueCoder, decryptedValueBytes); decryptedKvs.get(key).add(value); } else { From a18d241a0035244c54d93f06dac4beebf045f876 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 16:04:41 -0400 Subject: [PATCH 19/24] checkstyle --- .../src/main/resources/beam/checkstyle/suppressions.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index 4ba84d31a76b..52e8467b1624 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -52,6 +52,7 @@ + From 5016b6f4514ae7851c65d209eec1edc0fdbbb552 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 21:02:47 -0400 Subject: [PATCH 20/24] explicitly add dep --- sdks/java/core/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 953caee27793..4a6d2f11973e 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -106,6 +106,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema + implementation library.java.guava implementation library.java.snake_yaml shadowTest library.java.everit_json_schema provided library.java.junit From 14b8a640fc0545d0eea35f42d9677b6dca96182a Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Fri, 3 Oct 2025 21:39:30 -0400 Subject: [PATCH 21/24] spotbugs: only create generator once --- .../apache/beam/sdk/transforms/GroupByEncryptedKey.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 512bc29d186f..50b9cb9e16f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -143,6 +143,7 @@ private static class EncryptMessage extends DoFn, KV keyCoder, Coder valueCoder) { this.hmacKey = hmacKey; @@ -162,6 +163,7 @@ public void setup() { throw new RuntimeException( "Failed to initialize cryptography libraries needed for GroupByEncryptedKey", ex); } + this.generator = new java.security.SecureRandom(); } @ProcessElement @@ -173,9 +175,8 @@ public void processElement(ProcessContext c) throws Exception { byte[] keyIv = new byte[12]; byte[] valueIv = new byte[12]; - java.security.SecureRandom generator = new java.security.SecureRandom(); - generator.nextBytes(keyIv); - generator.nextBytes(valueIv); + this.generator.nextBytes(keyIv); + this.generator.nextBytes(valueIv); GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, keyIv); this.cipher.init(Cipher.ENCRYPT_MODE, this.secretKeySpec, gcmParameterSpec); byte[] encryptedKey = this.cipher.doFinal(encodedKey); From 241f43808b29ae60f7cf833d9931c2b2bc1a90b1 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 6 Oct 2025 09:58:48 -0400 Subject: [PATCH 22/24] Gemini nits --- .../org/apache/beam/sdk/options/PipelineOptions.java | 12 ++++++------ .../beam/sdk/transforms/GroupByEncryptedKey.java | 1 - 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 8d34345306ce..7366670c0191 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -423,25 +423,25 @@ public Long create(PipelineOptions options) { * option should be structured like: * *

-   * --encrypt=type:;:
+   * --gbek=type:;:
    * 
* * for example: * *

-   * --encrypt=type:GcpSecret;version_name:my_secret/versions/latest"
+   * --gbek=type:GcpSecret;version_name:my_secret/versions/latest
    * 
* * All variables should use snake case to allow consistency across languages. */ @Description( - "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" - + " infer the secret type and value based on the secret itself. This guarantees that" + "When set, will replace all GroupByKey transforms in the pipeline. Beam will infer" + + " the secret type and value based on the secret itself. This guarantees that" + " any data at rest during the performing a GBK, so this can be used to guarantee" + " that data is not unencrypted. Runners with this behavior include the Dataflow," + " Flink, and Spark runners. The option should be structured like:" - + " --encrypt=type:;:, for example " - + " --encrypt=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + + " --gbek=type:;:, for example " + + " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + " should use snake case to allow consistency across languages.") @Nullable String getGBEK(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index 50b9cb9e16f7..6ed0a31b3b95 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -77,7 +77,6 @@ private GroupByEncryptedKey( * @return A {@link GroupByEncryptedKey} transform. */ public static GroupByEncryptedKey create(Secret hmacKey) { - GroupByKey> gbk = GroupByKey.create(); return new GroupByEncryptedKey<>(hmacKey, GroupByKey.create()); } From 0fb7cf35acb33484ad8742d6027c40a681fa6221 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 6 Oct 2025 21:22:36 -0400 Subject: [PATCH 23/24] Feedback --- .../runners/dataflow/internal/DataflowGroupByKey.java | 6 ++++-- .../org/apache/beam/sdk/options/PipelineOptions.java | 10 +++++----- .../org/apache/beam/sdk/transforms/GroupByKey.java | 2 +- .../sdk/util/construction/GroupByKeyTranslation.java | 4 +++- .../sdk/util/construction/PTransformTranslation.java | 1 + .../org/apache/beam/sdk/transforms/GroupByKeyIT.java | 8 ++++---- .../org/apache/beam/sdk/transforms/GroupByKeyTest.java | 4 ++-- 7 files changed, 20 insertions(+), 15 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java index 82affe15f0c9..150975fadd08 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.WindowingStrategy; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Specialized implementation of {@code GroupByKey} for translating Redistribute transform into @@ -141,7 +142,7 @@ public PCollection>> expand(PCollection> input) { } PipelineOptions options = input.getPipeline().getOptions(); - String gbekOveride = options.getGBEK(); + String gbekOveride = options.getGbek(); if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { this.surroundsGBEK = true; Secret hmacSecret = Secret.parseSecretOption(gbekOveride); @@ -211,13 +212,14 @@ public String getUrn() { @Override public String getUrn(DataflowGroupByKey transform) { if (transform.surroundsGBEK()) { - return "beam:transform:group_by_key_wrapper:v1"; + return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN; } return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; } @Override @SuppressWarnings("nullness") + @Nullable public RunnerApi.FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { if (transform.getTransform().surroundsGBEK()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 7366670c0191..62022b219c2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -429,14 +429,14 @@ public Long create(PipelineOptions options) { * for example: * *

-   * --gbek=type:GcpSecret;version_name:my_secret/versions/latest
+   * --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
    * 
* * All variables should use snake case to allow consistency across languages. */ @Description( - "When set, will replace all GroupByKey transforms in the pipeline. Beam will infer" - + " the secret type and value based on the secret itself. This guarantees that" + "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" + + " infer the secret type and value based on the secret itself. This guarantees that" + " any data at rest during the performing a GBK, so this can be used to guarantee" + " that data is not unencrypted. Runners with this behavior include the Dataflow," + " Flink, and Spark runners. The option should be structured like:" @@ -444,9 +444,9 @@ public Long create(PipelineOptions options) { + " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + " should use snake case to allow consistency across languages.") @Nullable - String getGBEK(); + String getGbek(); - void setGBEK(String gbek); + void setGbek(String gbek); /** * Returns a user agent string constructed from {@link ReleaseInfo#getName()} and {@link diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index da3fcd1de6af..95ff73f55e74 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -265,7 +265,7 @@ public PCollection>> expand(PCollection> input) { } PipelineOptions options = input.getPipeline().getOptions(); - String gbekOveride = options.getGBEK(); + String gbekOveride = options.getGbek(); if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { this.surroundsGBEK = true; Secret hmacSecret = Secret.parseSecretOption(gbekOveride); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java index 1471c790a686..569c3cbe2989 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Utility methods for translating a {@link GroupByKey} to and from {@link RunnerApi} @@ -46,12 +47,13 @@ public String getUrn() { @Override public String getUrn(GroupByKey transform) { if (transform.surroundsGBEK()) { - return "beam:transform:group_by_key_wrapper:v1"; + return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN; } return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; } @Override + @Nullable public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { if (transform.getTransform().surroundsGBEK()) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index e4f00c706254..fac9a98ebb18 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -90,6 +90,7 @@ public class PTransformTranslation { public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1"; public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1"; public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1"; + public static final String GROUP_BY_KEY_WRAPPER_TRANSFORM_URN = "beam:transform:group_by_key_wrapper:v1"; public static final String IMPULSE_TRANSFORM_URN = "beam:transform:impulse:v1"; public static final String ASSIGN_WINDOWS_TRANSFORM_URN = "beam:transform:window_into:v1"; public static final String TEST_STREAM_TRANSFORM_URN = "beam:transform:teststream:v1"; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index d1fa43be2853..60477a4c242f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -103,7 +103,7 @@ public void testGroupByKeyWithValidGcpSecretOption() throws Exception { return; } PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); Pipeline p = Pipeline.create(options); List> ungroupedPairs = Arrays.asList( @@ -139,7 +139,7 @@ public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { return; } PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); Pipeline p = Pipeline.create(options); p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); thrown.expect(RuntimeException.class); @@ -154,7 +154,7 @@ public void redistributeWithValidGcpSecretOption() throws Exception { return; } PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); Pipeline p = Pipeline.create(options); List> ungroupedPairs = @@ -183,7 +183,7 @@ public void testRedistributeWithInvalidGcpSecretOption() throws Exception { return; } PipelineOptions options = TestPipeline.testingPipelineOptions(); - options.setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); Pipeline p = Pipeline.create(options); p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey()); thrown.expect(RuntimeException.class); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 39d1318eda4a..326da99f1a81 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -694,7 +694,7 @@ public void testGroupByKeyWithValidGcpSecretOption() { Create.of(ungroupedPairs) .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); - p.getOptions().setGBEK(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); PCollection>> output = input.apply(GroupByKey.create()); SerializableFunction>>, Void> checker = @@ -716,7 +716,7 @@ public void testGroupByKeyWithInvalidGcpSecretOption() { // Skip test if we couldn't set up secret manager return; } - p.getOptions().setGBEK("type:gcpsecret;version_name:bad_path/versions/latest"); + p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); assertThrows(RuntimeException.class, () -> p.run()); } From d848a38d4934e240efaeee86e7aa84ff7b9a9f8e Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Tue, 7 Oct 2025 06:33:22 -0400 Subject: [PATCH 24/24] Syntax + format --- .../beam/runners/dataflow/internal/DataflowGroupByKey.java | 3 +-- .../beam/sdk/util/construction/PTransformTranslation.java | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java index 150975fadd08..10030aa892a2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java @@ -219,8 +219,7 @@ public String getUrn(DataflowGroupByKey transform) { @Override @SuppressWarnings("nullness") - @Nullable - public RunnerApi.FunctionSpec translate( + public RunnerApi.@Nullable FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { if (transform.getTransform().surroundsGBEK()) { // Can use null for spec for empty composite. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index fac9a98ebb18..3e38aad1ad4b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -90,7 +90,8 @@ public class PTransformTranslation { public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1"; public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1"; public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1"; - public static final String GROUP_BY_KEY_WRAPPER_TRANSFORM_URN = "beam:transform:group_by_key_wrapper:v1"; + public static final String GROUP_BY_KEY_WRAPPER_TRANSFORM_URN = + "beam:transform:group_by_key_wrapper:v1"; public static final String IMPULSE_TRANSFORM_URN = "beam:transform:impulse:v1"; public static final String ASSIGN_WINDOWS_TRANSFORM_URN = "beam:transform:window_into:v1"; public static final String TEST_STREAM_TRANSFORM_URN = "beam:transform:teststream:v1";