diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 920c8d132e4a..8784d0786c02 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1 + "modification": 2 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index bba1872a33e8..42fb8f985ba1 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -3,6 +3,6 @@ "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 1, + "modification": 2, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 78b2bdb93e2b..3717f48ee492 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,8 +1,4 @@ { - "https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling", - "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", - "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", - "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 3, + "modification": 4, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java index 89135641689e..10030aa892a2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java @@ -25,10 +25,13 @@ import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.GroupByEncryptedKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; @@ -36,6 +39,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.WindowingStrategy; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Specialized implementation of {@code GroupByKey} for translating Redistribute transform into @@ -46,9 +50,13 @@ public class DataflowGroupByKey // Plumbed from Redistribute transform. private final boolean allowDuplicates; + private boolean insideGBEK; + private boolean surroundsGBEK; private DataflowGroupByKey(boolean allowDuplicates) { this.allowDuplicates = allowDuplicates; + this.insideGBEK = false; + this.surroundsGBEK = false; } /** @@ -79,6 +87,22 @@ public boolean allowDuplicates() { return allowDuplicates; } + /** + * For Beam internal use only. Tells runner that this is an inner GBK inside of a + * GroupByEncryptedKey + */ + public void setInsideGBEK() { + this.insideGBEK = true; + } + + /** + * For Beam internal use only. Tells runner that this is a GBK wrapped around of a + * GroupByEncryptedKey + */ + public boolean surroundsGBEK() { + return this.surroundsGBEK; + } + ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { @@ -117,6 +141,20 @@ public PCollection>> expand(PCollection> input) { "the keyCoder of a DataflowGroupByKey must be deterministic", e); } + PipelineOptions options = input.getPipeline().getOptions(); + String gbekOveride = options.getGbek(); + if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { + this.surroundsGBEK = true; + Secret hmacSecret = Secret.parseSecretOption(gbekOveride); + DataflowGroupByKey> gbk = DataflowGroupByKey.create(); + if (this.allowDuplicates) { + gbk = DataflowGroupByKey.createWithAllowDuplicates(); + } + gbk.setInsideGBEK(); + GroupByEncryptedKey gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk); + return input.apply(gbek); + } + // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the @@ -171,10 +209,22 @@ public String getUrn() { return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; } + @Override + public String getUrn(DataflowGroupByKey transform) { + if (transform.surroundsGBEK()) { + return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN; + } + return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; + } + @Override @SuppressWarnings("nullness") - public RunnerApi.FunctionSpec translate( + public RunnerApi.@Nullable FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (transform.getTransform().surroundsGBEK()) { + // Can use null for spec for empty composite. + return null; + } return RunnerApi.FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index c103ab7f5b1d..52e8467b1624 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -58,6 +58,8 @@ + + diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java index 2eba8c6ef68d..62022b219c2a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; @@ -413,6 +414,40 @@ public Long create(PipelineOptions options) { void setUserAgent(String userAgent); + /** + * A string defining whether GroupByKey transforms should be replaced by GroupByEncryptedKey + * + *

Beam will infer the secret type and value based on the secret itself. This guarantees that + * any data at rest during the performing a GBK, so this can be used to guarantee that data is not + * unencrypted. Runners with this behavior include the Dataflow, Flink, and Spark runners. The + * option should be structured like: + * + *


+   * --gbek=type:;:
+   * 
+ * + * for example: + * + *

+   * --gbek=type:GcpSecret;version_name:my_secret/versions/latest"
+   * 
+ * + * All variables should use snake case to allow consistency across languages. + */ + @Description( + "When set, will replace all GroupByKey transforms in the pipeline the option. Beam will" + + " infer the secret type and value based on the secret itself. This guarantees that" + + " any data at rest during the performing a GBK, so this can be used to guarantee" + + " that data is not unencrypted. Runners with this behavior include the Dataflow," + + " Flink, and Spark runners. The option should be structured like:" + + " --gbek=type:;:, for example " + + " --gbek=type:GcpSecret;version_name:my_secret/versions/latest. All variables " + + " should use snake case to allow consistency across languages.") + @Nullable + String getGbek(); + + void setGbek(String gbek); + /** * Returns a user agent string constructed from {@link ReleaseInfo#getName()} and {@link * ReleaseInfo#getVersion()}, in the format {@code [name]/[version]}. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java index e927efad44af..6ed0a31b3b95 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByEncryptedKey.java @@ -53,9 +53,19 @@ public class GroupByEncryptedKey extends PTransform>, PCollection>>> { private final Secret hmacKey; + private final PTransform< + PCollection>>, + PCollection>>>> + gbk; - private GroupByEncryptedKey(Secret hmacKey) { + private GroupByEncryptedKey( + Secret hmacKey, + PTransform< + PCollection>>, + PCollection>>>> + gbk) { this.hmacKey = hmacKey; + this.gbk = gbk; } /** @@ -67,7 +77,25 @@ private GroupByEncryptedKey(Secret hmacKey) { * @return A {@link GroupByEncryptedKey} transform. */ public static GroupByEncryptedKey create(Secret hmacKey) { - return new GroupByEncryptedKey<>(hmacKey); + return new GroupByEncryptedKey<>(hmacKey, GroupByKey.create()); + } + + /** + * Creates a {@link GroupByEncryptedKey} transform with a custom GBK in the middle. + * + * @param hmacKey The {@link Secret} key to use for encryption. + * @param gbk The custom GBK transform to use in the middle of the GBEK. + * @param The type of the keys in the input PCollection. + * @param The type of the values in the input PCollection. + * @return A {@link GroupByEncryptedKey} transform. + */ + public static GroupByEncryptedKey createWithCustomGbk( + Secret hmacKey, + PTransform< + PCollection>>, + PCollection>>>> + gbk) { + return new GroupByEncryptedKey<>(hmacKey, gbk); } @Override @@ -93,7 +121,7 @@ public PCollection>> expand(PCollection> input) { .apply( "EncryptMessage", ParDo.of(new EncryptMessage<>(this.hmacKey, keyCoder, valueCoder))) - .apply(GroupByKey.create()); + .apply(this.gbk); return grouped .apply("DecryptMessage", ParDo.of(new DecryptMessage<>(this.hmacKey, keyCoder, valueCoder))) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index d0b320a87654..95ff73f55e74 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.Secret; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -115,9 +116,13 @@ public class GroupByKey extends PTransform>, PCollection>>> { private final boolean fewKeys; + private boolean insideGBEK; + private boolean surroundsGBEK; private GroupByKey(boolean fewKeys) { this.fewKeys = fewKeys; + this.insideGBEK = false; + surroundsGBEK = false; } /** @@ -148,6 +153,21 @@ public boolean fewKeys() { return fewKeys; } + /** + * For Beam internal use only. Tells runner that this is an inner GBK inside a GroupByEncryptedKey + */ + public void setInsideGBEK() { + this.insideGBEK = true; + } + + /** + * For Beam internal use only. Tells runner that this is a GBK wrapped around of a + * GroupByEncryptedKey + */ + public boolean surroundsGBEK() { + return this.surroundsGBEK; + } + ///////////////////////////////////////////////////////////////////////////// public static void applicableTo(PCollection input) { @@ -244,6 +264,20 @@ public PCollection>> expand(PCollection> input) { throw new IllegalStateException("the keyCoder of a GroupByKey must be deterministic", e); } + PipelineOptions options = input.getPipeline().getOptions(); + String gbekOveride = options.getGbek(); + if (!this.insideGBEK && gbekOveride != null && !gbekOveride.trim().isEmpty()) { + this.surroundsGBEK = true; + Secret hmacSecret = Secret.parseSecretOption(gbekOveride); + GroupByKey> gbk = GroupByKey.create(); + if (this.fewKeys) { + gbk = GroupByKey.createWithFewKeys(); + } + gbk.setInsideGBEK(); + GroupByEncryptedKey gbek = GroupByEncryptedKey.createWithCustomGbk(hmacSecret, gbk); + return input.apply(gbek); + } + // This primitive operation groups by the combination of key and window, // merging windows as needed, using the windows assigned to the // key/value input elements and the window merge operation of the diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java index 80bc3a54535e..8effae7f61cf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcpSecret.java @@ -55,4 +55,13 @@ public byte[] getSecretBytes() { throw new RuntimeException("Failed to retrieve secret bytes", e); } } + + /** + * Returns the version name of the secret. + * + * @return The version name as a String. + */ + public String getVersionName() { + return versionName; + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java index fe476ef6cb1d..a75e01c9543f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java @@ -18,6 +18,11 @@ package org.apache.beam.sdk.util; import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; /** * A secret management interface used for handling sensitive data. @@ -33,4 +38,48 @@ public interface Secret extends Serializable { * @return The secret as a byte array. */ byte[] getSecretBytes(); + + static Secret parseSecretOption(String secretOption) { + Map paramMap = new HashMap<>(); + for (String param : secretOption.split(";", -1)) { + String[] parts = param.split(":", 2); + if (parts.length == 2) { + paramMap.put(parts[0], parts[1]); + } + } + + if (!paramMap.containsKey("type")) { + throw new RuntimeException("Secret string must contain a valid type parameter"); + } + + String secretType = paramMap.get("type"); + paramMap.remove("type"); + + if (secretType == null) { + throw new RuntimeException("Secret string must contain a valid value for type parameter"); + } + + switch (secretType.toLowerCase()) { + case "gcpsecret": + Set gcpSecretParams = new HashSet<>(Arrays.asList("version_name")); + for (String paramName : paramMap.keySet()) { + if (!gcpSecretParams.contains(paramName)) { + throw new RuntimeException( + String.format( + "Invalid secret parameter %s, GcpSecret only supports the following parameters: %s", + paramName, gcpSecretParams)); + } + } + String versionName = paramMap.get("version_name"); + if (versionName == null) { + throw new RuntimeException( + "version_name must contain a valid value for versionName parameter"); + } + return new GcpSecret(versionName); + default: + throw new RuntimeException( + String.format( + "Invalid secret type %s, currently only GcpSecret is supported", secretType)); + } + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java index d08a48d0e5e6..569c3cbe2989 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Utility methods for translating a {@link GroupByKey} to and from {@link RunnerApi} @@ -44,8 +45,21 @@ public String getUrn() { } @Override + public String getUrn(GroupByKey transform) { + if (transform.surroundsGBEK()) { + return PTransformTranslation.GROUP_BY_KEY_WRAPPER_TRANSFORM_URN; + } + return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; + } + + @Override + @Nullable public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) { + if (transform.getTransform().surroundsGBEK()) { + // Can use null for spec for empty composite. + return null; + } return FunctionSpec.newBuilder().setUrn(getUrn(transform.getTransform())).build(); } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java index e4f00c706254..3e38aad1ad4b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java @@ -90,6 +90,8 @@ public class PTransformTranslation { public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1"; public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1"; public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1"; + public static final String GROUP_BY_KEY_WRAPPER_TRANSFORM_URN = + "beam:transform:group_by_key_wrapper:v1"; public static final String IMPULSE_TRANSFORM_URN = "beam:transform:impulse:v1"; public static final String ASSIGN_WINDOWS_TRANSFORM_URN = "beam:transform:window_into:v1"; public static final String TEST_STREAM_TRANSFORM_URN = "beam:transform:teststream:v1"; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java new file mode 100644 index 000000000000..60477a4c242f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Integration test for GroupByKey transforms and some other transforms which use GBK. */ +@RunWith(JUnit4.class) +public class GroupByKeyIT { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + private static String gcpSecretVersionName; + private static String secretId; + + @BeforeClass + public static void setup() throws IOException { + secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, secretId, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + client.deleteSecret(secretName); + } + } + + @Test + public void testGroupByKeyWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + + PCollection>> output = input.apply(GroupByKey.create()); + + PAssert.that(output) + .containsInAnyOrder( + KV.of("k1", Arrays.asList(3, 4)), + KV.of("k5", Arrays.asList(Integer.MAX_VALUE, Integer.MIN_VALUE)), + KV.of("k2", Arrays.asList(66, -33)), + KV.of("k3", Arrays.asList(0))); + + p.run(); + } + + @Test + public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + thrown.expect(RuntimeException.class); + p.run(); + } + + // Redistribute depends on GBK under the hood and can have runner-specific implementations + @Test + public void redistributeWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + PCollection> output = input.apply(Redistribute.byKey()); + PAssert.that(output).containsInAnyOrder(ungroupedPairs); + + p.run(); + } + + @Test + public void testRedistributeWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(Redistribute.byKey()); + thrown.expect(RuntimeException.class); + p.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 5464838ad4db..326da99f1a81 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -26,12 +26,18 @@ import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder; import static org.junit.Assert.assertThrows; +import com.google.cloud.secretmanager.v1.ProjectName; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretName; +import com.google.cloud.secretmanager.v1.SecretPayload; +import com.google.protobuf.ByteString; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; +import java.security.SecureRandom; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -90,7 +96,9 @@ import org.hamcrest.Matcher; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.AfterClass; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -109,6 +117,55 @@ public class GroupByKeyTest implements Serializable { /** Shared test base class with setup/teardown helpers. */ public abstract static class SharedTestBase { @Rule public transient TestPipeline p = TestPipeline.create(); + + private static final String PROJECT_ID = "apache-beam-testing"; + private static final String SECRET_ID = "gbek-test"; + public static String gcpSecretVersionName; + private static String secretId; + + @BeforeClass + public static void setup() throws IOException { + secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000)); + SecretManagerServiceClient client; + try { + client = SecretManagerServiceClient.create(); + } catch (IOException e) { + gcpSecretVersionName = null; + return; + } + ProjectName projectName = ProjectName.of(PROJECT_ID); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + + try { + client.getSecret(secretName); + } catch (Exception e) { + com.google.cloud.secretmanager.v1.Secret secret = + com.google.cloud.secretmanager.v1.Secret.newBuilder() + .setReplication( + com.google.cloud.secretmanager.v1.Replication.newBuilder() + .setAutomatic( + com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder() + .build()) + .build()) + .build(); + client.createSecret(projectName, secretId, secret); + byte[] secretBytes = new byte[32]; + new SecureRandom().nextBytes(secretBytes); + client.addSecretVersion( + secretName, + SecretPayload.newBuilder().setData(ByteString.copyFrom(secretBytes)).build()); + } + gcpSecretVersionName = secretName.toString() + "/versions/latest"; + } + + @AfterClass + public static void tearDown() throws IOException { + if (gcpSecretVersionName != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + SecretName secretName = SecretName.of(PROJECT_ID, secretId); + client.deleteSecret(secretName); + } + } } /** Tests validating basic {@link GroupByKey} scenarios. */ @@ -614,6 +671,55 @@ public void testLargeKeys10MB() throws Exception { public void testLargeKeys100MB() throws Exception { runLargeKeysTest(p, 100 << 20); } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithValidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), + KV.of("k5", Integer.MAX_VALUE), + KV.of("k5", Integer.MIN_VALUE), + KV.of("k2", 66), + KV.of("k1", 4), + KV.of("k2", -33), + KV.of("k3", 0)); + + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()))); + + p.getOptions().setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + PCollection>> output = input.apply(GroupByKey.create()); + + SerializableFunction>>, Void> checker = + containsKvs( + kv("k1", 3, 4), + kv("k5", Integer.MIN_VALUE, Integer.MAX_VALUE), + kv("k2", 66, -33), + kv("k3", 0)); + PAssert.that(output).satisfies(checker); + PAssert.that(output).inWindow(GlobalWindow.INSTANCE).satisfies(checker); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testGroupByKeyWithInvalidGcpSecretOption() { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + p.getOptions().setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + p.apply(Create.of(KV.of("k1", 1))).apply(GroupByKey.create()); + assertThrows(RuntimeException.class, () -> p.run()); + } } /** Tests validating GroupByKey behaviors with windowing. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java new file mode 100644 index 000000000000..dd4b125d73fe --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SecretTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link org.apache.beam.sdk.util.Secret}. */ +@RunWith(JUnit4.class) +public class SecretTest { + + @Test + public void testParseSecretOptionWithValidGcpSecret() { + String secretOption = "type:gcpsecret;version_name:my_secret/versions/latest"; + Secret secret = Secret.parseSecretOption(secretOption); + assertTrue(secret instanceof GcpSecret); + assertEquals("my_secret/versions/latest", ((GcpSecret) secret).getVersionName()); + } + + @Test + public void testParseSecretOptionWithMissingType() { + String secretOption = "version_name:my_secret/versions/latest"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals("Secret string must contain a valid type parameter", exception.getMessage()); + } + + @Test + public void testParseSecretOptionWithUnsupportedType() { + String secretOption = "type:unsupported;version_name:my_secret/versions/latest"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals( + "Invalid secret type unsupported, currently only GcpSecret is supported", + exception.getMessage()); + } + + @Test + public void testParseSecretOptionWithInvalidGcpSecretParameter() { + String secretOption = "type:gcpsecret;invalid_param:some_value"; + Exception exception = + assertThrows(RuntimeException.class, () -> Secret.parseSecretOption(secretOption)); + assertEquals( + "Invalid secret parameter invalid_param, GcpSecret only supports the following parameters: [version_name]", + exception.getMessage()); + } +}