Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b98663c
First pass at Java GBEK (AI generated)
damccorm Sep 19, 2025
c5aacfb
Compile
damccorm Sep 19, 2025
45ed98f
Compiletest
damccorm Sep 19, 2025
4bf3c9a
checkstyle
damccorm Sep 19, 2025
c62d8e0
tests passing
damccorm Sep 26, 2025
9230d2e
Move secret code into utils
damccorm Sep 26, 2025
56b7130
Use secret manager from bom
damccorm Sep 26, 2025
9f2204f
Docs
damccorm Sep 26, 2025
25d96bd
Better docs
damccorm Sep 26, 2025
cbd441a
Updates
damccorm Sep 29, 2025
90fa3fd
[WIP] Add pipeline option to force GBEK (Java)
damccorm Oct 1, 2025
f34d8c5
Trigger some postcommits
damccorm Oct 1, 2025
8c25c56
Update triggers
damccorm Oct 1, 2025
eaee76e
Merge branch 'master' into users/damccorm/enforce-java-gbek
damccorm Oct 1, 2025
6c8f959
Tests
damccorm Oct 1, 2025
705213a
test fixes
damccorm Oct 1, 2025
6dfdea9
Move tests to IT
damccorm Oct 2, 2025
7c821e4
Randomized secret postfix
damccorm Oct 2, 2025
b19082e
Update encryption mode
damccorm Oct 3, 2025
a18d241
checkstyle
damccorm Oct 3, 2025
5016b6f
explicitly add dep
damccorm Oct 4, 2025
14b8a64
spotbugs: only create generator once
damccorm Oct 4, 2025
9ee77f5
Merge in master
damccorm Oct 4, 2025
241f438
Gemini nits
damccorm Oct 6, 2025
55b9432
[WIP] CombinePerKey with gbek (Java)
damccorm Oct 6, 2025
fce330b
Dont use urn when gbek set
damccorm Oct 6, 2025
4be8b83
Merge in changes
damccorm Oct 7, 2025
94ec237
Fix casing
damccorm Oct 7, 2025
a542d8f
Syntax + format
damccorm Oct 7, 2025
48306ac
Merge in master
damccorm Oct 7, 2025
2ada075
Fix test naming
damccorm Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 4
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV1.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2,
"modification": 4,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"modification": 4,
"modification": 6,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.Combine.Globally;
import org.apache.beam.sdk.transforms.Combine.PerKey;
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
Expand Down Expand Up @@ -1499,6 +1503,7 @@ public static class PerKey<K, InputT, OutputT>
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
private final boolean fewKeys;
private final List<PCollectionView<?>> sideInputs;
private boolean shouldSkipReplacement;

private PerKey(
GlobalCombineFn<? super InputT, ?, OutputT> fn,
Expand All @@ -1508,6 +1513,7 @@ private PerKey(
this.fnDisplayData = fnDisplayData;
this.fewKeys = fewKeys;
this.sideInputs = ImmutableList.of();
this.shouldSkipReplacement = false;
}

private PerKey(
Expand All @@ -1519,6 +1525,7 @@ private PerKey(
this.fnDisplayData = fnDisplayData;
this.fewKeys = fewKeys;
this.sideInputs = sideInputs;
this.shouldSkipReplacement = false;
}

@Override
Expand Down Expand Up @@ -1592,6 +1599,11 @@ public List<PCollectionView<?>> getSideInputs() {
return sideInputs;
}

/** Returns whether a runner should skip replacing this transform. For runner use only */
public boolean shouldSkipReplacement() {
return this.shouldSkipReplacement;
}

/**
* Returns the side inputs of this {@link Combine}, tagged with the tag of the {@link
* PCollectionView}. The values of the returned map will be equal to the result of {@link
Expand All @@ -1604,6 +1616,13 @@ public Map<TupleTag<?>, PValue> getAdditionalInputs() {

@Override
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
PipelineOptions options = input.getPipeline().getOptions();
String gbekOveride = options.getGbek();
if (gbekOveride != null && !gbekOveride.trim().isEmpty()) {
// Don't replace this transform if we're using GBEK since the runner may insert
// its own GBK which doesn't perform encryption.
this.shouldSkipReplacement = true;
}
return input
.apply(fewKeys ? GroupByKey.createWithFewKeys() : GroupByKey.create())
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,25 @@ public String getUrn() {
return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN;
}

@Override
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
if (transform.shouldSkipReplacement()) {
return "beam:transform:combine_per_key_wrapper:v1";
}
return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN;
}

@Override
public FunctionSpec translate(
AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> transform, SdkComponents components)
throws IOException {
if (transform.getTransform().getSideInputs().isEmpty()) {
GlobalCombineFn<?, ?, ?> combineFn = transform.getTransform().getFn();
Combine.PerKey underlyingCombine = transform.getTransform();
if (underlyingCombine.shouldSkipReplacement()) {
// Can use null for spec for generic composite.
return null;
}
if (underlyingCombine.getSideInputs().isEmpty()) {
GlobalCombineFn<?, ?, ?> combineFn = underlyingCombine.getFn();
Coder<?> accumulatorCoder =
extractAccumulatorCoder(combineFn, (AppliedPTransform) transform);
return FunctionSpec.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception {

// Redistribute depends on GBK under the hood and can have runner-specific implementations
@Test
public void redistributeWithValidGcpSecretOption() throws Exception {
public void testRedistributeWithValidGcpSecretOption() throws Exception {
if (gcpSecretVersionName == null) {
// Skip test if we couldn't set up secret manager
return;
Expand Down Expand Up @@ -189,4 +189,44 @@ public void testRedistributeWithInvalidGcpSecretOption() throws Exception {
thrown.expect(RuntimeException.class);
p.run();
}

// Combine.PerKey depends on GBK under the hood, but can be overriden by a runner. This can
// fail unless it is handled specially, so we should test it specifically
@Test
public void testCombinePerKeyWithValidGcpSecretOption() throws Exception {
if (gcpSecretVersionName == null) {
// Skip test if we couldn't set up secret manager
return;
}
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName));
Pipeline p = Pipeline.create(options);

List<KV<String, Integer>> ungroupedPairs =
Arrays.asList(
KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0));
List<KV<String, Integer>> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2", 33), KV.of("k3", 0));
PCollection<KV<String, Integer>> input =
p.apply(
Create.of(ungroupedPairs)
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
PCollection<KV<String, Integer>> output = input.apply(Combine.perKey(Sum.ofIntegers()));
PAssert.that(output).containsInAnyOrder(sums);

p.run();
}

@Test
public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception {
if (gcpSecretVersionName == null) {
// Skip test if we couldn't set up secret manager
return;
}
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers()));
thrown.expect(RuntimeException.class);
p.run();
}
}
Loading