From 5ae388425cd9d5067868d55877195e708b26ba53 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Mon, 24 Nov 2025 23:04:51 +0000 Subject: [PATCH 1/6] Fix inconsistent handling of Firestore Project and Database ID in routing header (resolves #36894) --- .../FirestoreStatefulComponentFactory.java | 23 +++++++++++++++---- .../io/gcp/firestore/FirestoreV1WriteFn.java | 4 +++- .../io/gcp/firestore/it/BaseFirestoreIT.java | 3 ++- .../firestore/it/FirestoreTestingHelper.java | 6 ++--- 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java index 4e8c11f7072c..390e102b6010 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreStatefulComponentFactory.java @@ -29,6 +29,7 @@ import java.io.Serializable; import java.security.SecureRandom; import java.util.Map; +import javax.annotation.Nullable; import javax.annotation.concurrent.Immutable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; @@ -65,6 +66,13 @@ private FirestoreStatefulComponentFactory() {} * @return a new {@link FirestoreStub} pre-configured with values from the provided options */ FirestoreStub getFirestoreStub(PipelineOptions options) { + return getFirestoreStub(options, null, null); + } + + FirestoreStub getFirestoreStub( + PipelineOptions options, + @Nullable String configuredProjectId, + @Nullable String configuredDatabaseId) { try { FirestoreSettings.Builder builder = FirestoreSettings.newBuilder(); @@ -94,12 +102,17 @@ FirestoreStub getFirestoreStub(PipelineOptions options) { builder .setCredentialsProvider(FixedCredentialsProvider.create(gcpOptions.getGcpCredential())) .setEndpoint(firestoreOptions.getFirestoreHost()); + String projectId = + configuredProjectId != null + ? configuredProjectId + : firestoreOptions.getFirestoreProject(); + if (projectId == null) { + projectId = gcpOptions.getProject(); + } + String databaseId = + configuredDatabaseId != null ? configuredDatabaseId : firestoreOptions.getFirestoreDb(); headers.put( - "x-goog-request-params", - "project_id=" - + gcpOptions.getProject() - + "&database_id=" - + firestoreOptions.getFirestoreDb()); + "x-goog-request-params", "project_id=" + projectId + "&database_id=" + databaseId); } builder.setHeaderProvider( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java index 6bbb00e76f2d..ab33d8e5c166 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1WriteFn.java @@ -244,7 +244,9 @@ public final void startBundle(StartBundleContext c) { requireNonNull( databaseId, "firestoreDb must be defined on FirestoreOptions of PipelineOptions")); - firestoreStub = firestoreStatefulComponentFactory.getFirestoreStub(c.getPipelineOptions()); + firestoreStub = + firestoreStatefulComponentFactory.getFirestoreStub( + c.getPipelineOptions(), project, databaseId); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java index 8695080cb885..1c41906aa2b7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java @@ -76,7 +76,8 @@ abstract class BaseFirestoreIT { public final TestName testName = new TestName(); @Rule(order = 2) - public final FirestoreTestingHelper helper = new FirestoreTestingHelper(CleanupMode.ALWAYS); + public final FirestoreTestingHelper helper = + new FirestoreTestingHelper(CleanupMode.ALWAYS, "firestoredb"); @Rule(order = 3) public final TestPipeline testPipeline = TestPipeline.create(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java index cb05e97d6a24..a9c3a0e0cd2d 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java @@ -64,6 +64,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -109,7 +110,6 @@ enum DataLayout { private final GcpOptions gcpOptions; private final org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions firestoreBeamOptions; private final FirestoreOptions firestoreOptions; - private final Firestore fs; private final FirestoreRpc rpc; private final CleanupMode cleanupMode; @@ -125,7 +125,7 @@ enum DataLayout { @SuppressWarnings( "initialization.fields.uninitialized") // testClass and testName are managed via #apply - public FirestoreTestingHelper(CleanupMode cleanupMode) { + public FirestoreTestingHelper(CleanupMode cleanupMode, @Nullable String databaseId) { this.cleanupMode = cleanupMode; gcpOptions = TestPipeline.testingPipelineOptions().as(GcpOptions.class); firestoreBeamOptions = @@ -136,7 +136,7 @@ public FirestoreTestingHelper(CleanupMode cleanupMode) { .setCredentials(gcpOptions.getGcpCredential()) .setProjectId( firstNonNull(firestoreBeamOptions.getFirestoreProject(), gcpOptions.getProject())) - .setDatabaseId(firestoreBeamOptions.getFirestoreDb()) + .setDatabaseId(firstNonNull(databaseId, firestoreBeamOptions.getFirestoreDb())) .setHost(firestoreBeamOptions.getFirestoreHost()) .build(); fs = firestoreOptions.getService(); From a847667148b7f35abb9163a8aa754eef572a5857 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 00:00:58 +0000 Subject: [PATCH 2/6] Update CHANGED.md --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 222d4b82cb25..7d261f20a81c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -87,7 +87,7 @@ ## Bugfixes -* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)). ## Known Issues From b43918b2e6dea1e4a8caf3871a55310a8fbdce1c Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 01:03:10 +0000 Subject: [PATCH 3/6] Address reviewer comments from yixiaoshen --- CHANGES.md | 2 +- .../apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java | 5 +---- .../beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java | 6 +++--- .../sdk/io/gcp/firestore/it/FirestoreTestingHelper.java | 5 ++--- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 7d261f20a81c..d7c1e05dfa3e 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -87,7 +87,7 @@ ## Bugfixes -* Fixed #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)). +* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)). ## Known Issues diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java index 5adc9ef38f36..8b90594bb655 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreOptions.java @@ -48,10 +48,7 @@ public interface FirestoreOptions extends PipelineOptions { */ void setEmulatorHost(String host); - /** - * The Firestore database ID to connect to. Note: named database is currently an internal feature - * in Firestore. Do not set this to anything other than "(default)". - */ + /** The Firestore database ID to connect to. */ @Description("Firestore database ID") @Default.String("(default)") String getFirestoreDb(); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java index 1c41906aa2b7..14344b105b35 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/BaseFirestoreIT.java @@ -42,6 +42,7 @@ import java.util.stream.Stream; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.firestore.FirestoreIO; +import org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions; import org.apache.beam.sdk.io.gcp.firestore.RpcQosOptions; import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.CleanupMode; import org.apache.beam.sdk.io.gcp.firestore.it.FirestoreTestingHelper.DataLayout; @@ -76,8 +77,7 @@ abstract class BaseFirestoreIT { public final TestName testName = new TestName(); @Rule(order = 2) - public final FirestoreTestingHelper helper = - new FirestoreTestingHelper(CleanupMode.ALWAYS, "firestoredb"); + public final FirestoreTestingHelper helper = new FirestoreTestingHelper(CleanupMode.ALWAYS); @Rule(order = 3) public final TestPipeline testPipeline = TestPipeline.create(); @@ -98,7 +98,7 @@ abstract class BaseFirestoreIT { @Before public void setup() { project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject(); - databaseId = "firestoredb"; + databaseId = TestPipeline.testingPipelineOptions().as(FirestoreOptions.class).getFirestoreDb(); } private static Instant toWriteTime(WriteResult result) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java index a9c3a0e0cd2d..0c49130b9d3b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java @@ -64,7 +64,6 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import javax.annotation.Nullable; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -125,7 +124,7 @@ enum DataLayout { @SuppressWarnings( "initialization.fields.uninitialized") // testClass and testName are managed via #apply - public FirestoreTestingHelper(CleanupMode cleanupMode, @Nullable String databaseId) { + public FirestoreTestingHelper(CleanupMode cleanupMode) { this.cleanupMode = cleanupMode; gcpOptions = TestPipeline.testingPipelineOptions().as(GcpOptions.class); firestoreBeamOptions = @@ -136,7 +135,7 @@ public FirestoreTestingHelper(CleanupMode cleanupMode, @Nullable String database .setCredentials(gcpOptions.getGcpCredential()) .setProjectId( firstNonNull(firestoreBeamOptions.getFirestoreProject(), gcpOptions.getProject())) - .setDatabaseId(firstNonNull(databaseId, firestoreBeamOptions.getFirestoreDb())) + .setDatabaseId(firestoreBeamOptions.getFirestoreDb()) .setHost(firestoreBeamOptions.getFirestoreHost()) .build(); fs = firestoreOptions.getService(); From 75678e13131bcf4e2040cf412233f2ccdf8eb5cb Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Tue, 25 Nov 2025 23:51:07 +0000 Subject: [PATCH 4/6] Fix unit tests --- .../sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java | 4 ++-- .../sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java | 8 ++++---- .../FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java | 2 +- .../firestore/FirestoreV1FnBatchWriteWithSummaryTest.java | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java index 0aab59d3aacd..5c28d3fc99ea 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java @@ -45,7 +45,7 @@ abstract class BaseFirestoreV1ReadFnTest public final void attemptsExhaustedForRetryableError() throws Exception { BaseFirestoreV1ReadFn fn = getFn(clock, ff, rpcQosOptions); V1RpcFnTestCtx ctx = newCtx(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt); ctx.mockRpcToCallable(stub); @@ -79,7 +79,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { public final void noRequestIsSentIfNotSafeToProceed() throws Exception { BaseFirestoreV1ReadFn fn = getFn(clock, ff, rpcQosOptions); V1RpcFnTestCtx ctx = newCtx(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java index 623f947c45a7..f20181fbc320 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1WriteFnTest.java @@ -111,7 +111,7 @@ public final void setUp() { when(rpcQos.newWriteAttempt(any())).thenReturn(attempt, attempt2); when(ff.getRpcQos(any())).thenReturn(rpcQos); - when(ff.getFirestoreStub(pipelineOptions)).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(stub.batchWriteCallable()).thenReturn(callable); metricsFixture = new MetricsFixture(); } @@ -129,7 +129,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { Write write = newWrite(); Element element1 = new WriteElement(0, write, window); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)) .thenReturn(attempt); @@ -175,7 +175,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { @Override @Test public final void noRequestIsSentIfNotSafeToProceed() throws Exception { - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newWriteAttempt(FirestoreV1RpcAttemptContexts.V1FnRpcAttemptContext.BatchWrite)) .thenReturn(attempt); @@ -369,7 +369,7 @@ public final void endToEnd_deadlineExceededOnAnIndividualWriteResultsInThrottlin LOG.debug("options = {}", options); FirestoreStatefulComponentFactory ff = mock(FirestoreStatefulComponentFactory.class); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); Random random = new Random(12345); TestClock clock = new TestClock(Instant.EPOCH, Duration.standardSeconds(1)); Sleeper sleeper = millis -> clock.setNext(advanceClockBy(Duration.millis(millis))); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java index 35d0ea9482d3..e7f98ff73c6b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithDeadLetterQueueTest.java @@ -67,7 +67,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception { int maxBytes = 50; RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java index 3e37e3975bf5..e7174537943e 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnBatchWriteWithSummaryTest.java @@ -76,7 +76,7 @@ public void enqueueingWritesValidateBytesSize() throws Exception { int maxBytes = 50; RpcQosOptions options = rpcQosOptions.toBuilder().withBatchMaxBytes(maxBytes).build(); - when(ff.getFirestoreStub(any())).thenReturn(stub); + when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); when(ff.getRpcQos(any())) .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); From 02f104f0400177984dc50e6f05629be4a8395225 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 00:06:20 +0000 Subject: [PATCH 5/6] revert unnecessary change to read test --- .../beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java index 5c28d3fc99ea..0aab59d3aacd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/BaseFirestoreV1ReadFnTest.java @@ -45,7 +45,7 @@ abstract class BaseFirestoreV1ReadFnTest public final void attemptsExhaustedForRetryableError() throws Exception { BaseFirestoreV1ReadFn fn = getFn(clock, ff, rpcQosOptions); V1RpcFnTestCtx ctx = newCtx(); - when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); + when(ff.getFirestoreStub(any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt); ctx.mockRpcToCallable(stub); @@ -79,7 +79,7 @@ public final void attemptsExhaustedForRetryableError() throws Exception { public final void noRequestIsSentIfNotSafeToProceed() throws Exception { BaseFirestoreV1ReadFn fn = getFn(clock, ff, rpcQosOptions); V1RpcFnTestCtx ctx = newCtx(); - when(ff.getFirestoreStub(any(), any(), any())).thenReturn(stub); + when(ff.getFirestoreStub(any())).thenReturn(stub); when(ff.getRpcQos(any())).thenReturn(rpcQos); when(rpcQos.newReadAttempt(fn.getRpcAttemptContext())).thenReturn(attempt); From 3c1d0a049f68ecaf27f98dbc7d9d31e244cd1321 Mon Sep 17 00:00:00 2001 From: Paco Avila Date: Wed, 26 Nov 2025 00:07:29 +0000 Subject: [PATCH 6/6] Revert unnecessary change to test helper --- .../beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java index 0c49130b9d3b..cb05e97d6a24 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/it/FirestoreTestingHelper.java @@ -109,6 +109,7 @@ enum DataLayout { private final GcpOptions gcpOptions; private final org.apache.beam.sdk.io.gcp.firestore.FirestoreOptions firestoreBeamOptions; private final FirestoreOptions firestoreOptions; + private final Firestore fs; private final FirestoreRpc rpc; private final CleanupMode cleanupMode;