diff --git a/.github/trigger_files/beam_PostCommit_Java.json b/.github/trigger_files/beam_PostCommit_Java.json index 8784d0786c02..1bd74515152c 100644 --- a/.github/trigger_files/beam_PostCommit_Java.json +++ b/.github/trigger_files/beam_PostCommit_Java.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2 + "modification": 4 } \ No newline at end of file diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json index 42fb8f985ba1..5e7fbb916f4b 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV1.json @@ -3,6 +3,6 @@ "https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder", "https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners", "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 2, + "modification": 4, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json index 3717f48ee492..73012c45df18 100644 --- a/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json +++ b/.github/trigger_files/beam_PostCommit_Java_DataflowV2.json @@ -1,4 +1,4 @@ { - "modification": 4, + "modification": 6, "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index f1a964fa5a61..e138b32c58fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -41,6 +41,10 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.Globally; +import org.apache.beam.sdk.transforms.Combine.PerKey; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; @@ -1499,6 +1503,7 @@ public static class PerKey private final DisplayData.ItemSpec> fnDisplayData; private final boolean fewKeys; private final List> sideInputs; + private boolean shouldSkipReplacement; private PerKey( GlobalCombineFn fn, @@ -1508,6 +1513,7 @@ private PerKey( this.fnDisplayData = fnDisplayData; this.fewKeys = fewKeys; this.sideInputs = ImmutableList.of(); + this.shouldSkipReplacement = false; } private PerKey( @@ -1519,6 +1525,7 @@ private PerKey( this.fnDisplayData = fnDisplayData; this.fewKeys = fewKeys; this.sideInputs = sideInputs; + this.shouldSkipReplacement = false; } @Override @@ -1592,6 +1599,11 @@ public List> getSideInputs() { return sideInputs; } + /** Returns whether a runner should skip replacing this transform. For runner use only */ + public boolean shouldSkipReplacement() { + return this.shouldSkipReplacement; + } + /** * Returns the side inputs of this {@link Combine}, tagged with the tag of the {@link * PCollectionView}. The values of the returned map will be equal to the result of {@link @@ -1604,6 +1616,13 @@ public Map, PValue> getAdditionalInputs() { @Override public PCollection> expand(PCollection> input) { + PipelineOptions options = input.getPipeline().getOptions(); + String gbekOveride = options.getGbek(); + if (gbekOveride != null && !gbekOveride.trim().isEmpty()) { + // Don't replace this transform if we're using GBEK since the runner may insert + // its own GBK which doesn't perform encryption. + this.shouldSkipReplacement = true; + } return input .apply(fewKeys ? GroupByKey.createWithFewKeys() : GroupByKey.create()) .apply( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java index 1a1913d87f39..73a3ed84d820 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java @@ -61,12 +61,25 @@ public String getUrn() { return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN; } + @Override + public String getUrn(Combine.PerKey transform) { + if (transform.shouldSkipReplacement()) { + return "beam:transform:combine_per_key_wrapper:v1"; + } + return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN; + } + @Override public FunctionSpec translate( AppliedPTransform> transform, SdkComponents components) throws IOException { - if (transform.getTransform().getSideInputs().isEmpty()) { - GlobalCombineFn combineFn = transform.getTransform().getFn(); + Combine.PerKey underlyingCombine = transform.getTransform(); + if (underlyingCombine.shouldSkipReplacement()) { + // Can use null for spec for generic composite. + return null; + } + if (underlyingCombine.getSideInputs().isEmpty()) { + GlobalCombineFn combineFn = underlyingCombine.getFn(); Coder accumulatorCoder = extractAccumulatorCoder(combineFn, (AppliedPTransform) transform); return FunctionSpec.newBuilder() diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java index 60477a4c242f..949b6150bf33 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java @@ -148,7 +148,7 @@ public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception { // Redistribute depends on GBK under the hood and can have runner-specific implementations @Test - public void redistributeWithValidGcpSecretOption() throws Exception { + public void testRedistributeWithValidGcpSecretOption() throws Exception { if (gcpSecretVersionName == null) { // Skip test if we couldn't set up secret manager return; @@ -189,4 +189,44 @@ public void testRedistributeWithInvalidGcpSecretOption() throws Exception { thrown.expect(RuntimeException.class); p.run(); } + + // Combine.PerKey depends on GBK under the hood, but can be overriden by a runner. This can + // fail unless it is handled specially, so we should test it specifically + @Test + public void testCombinePerKeyWithValidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName)); + Pipeline p = Pipeline.create(options); + + List> ungroupedPairs = + Arrays.asList( + KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0)); + List> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2", 33), KV.of("k3", 0)); + PCollection> input = + p.apply( + Create.of(ungroupedPairs) + .withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()))); + PCollection> output = input.apply(Combine.perKey(Sum.ofIntegers())); + PAssert.that(output).containsInAnyOrder(sums); + + p.run(); + } + + @Test + public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception { + if (gcpSecretVersionName == null) { + // Skip test if we couldn't set up secret manager + return; + } + PipelineOptions options = TestPipeline.testingPipelineOptions(); + options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest"); + Pipeline p = Pipeline.create(options); + p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers())); + thrown.expect(RuntimeException.class); + p.run(); + } }