diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java index dd5202ea2d1b..74d6636b28a9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1.java @@ -1218,6 +1218,10 @@ public void processElement(ProcessContext c) { List cursors = new ArrayList<>(partitionQueryResponse.getPartitionsList()); cursors.sort(CURSOR_REFERENCE_VALUE_COMPARATOR); final int size = cursors.size(); + if (size == 0) { + emit(c, dbRoot, structuredQuery.toBuilder()); + return; + } final int lastIdx = size - 1; for (int i = 0; i < size; i++) { Cursor curr = cursors.get(i); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java index 0c9bbf1b6f60..1f298837ea6a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/FirestoreV1FnPartitionQueryTest.java @@ -99,6 +99,41 @@ public void endToEnd() throws Exception { assertEquals(expected, allValues); } + @Test + public void endToEnd_emptyCursors() throws Exception { + // First page of the response + PartitionQueryRequest request1 = + PartitionQueryRequest.newBuilder() + .setParent(String.format("projects/%s/databases/(default)/document", projectId)) + .build(); + PartitionQueryResponse response1 = PartitionQueryResponse.newBuilder().build(); + when(callable.call(request1)).thenReturn(pagedResponse1); + when(page1.getResponse()).thenReturn(response1); + when(pagedResponse1.iteratePages()).thenReturn(ImmutableList.of(page1)); + + when(stub.partitionQueryPagedCallable()).thenReturn(callable); + + when(ff.getFirestoreStub(any())).thenReturn(stub); + RpcQosOptions options = RpcQosOptions.defaultOptions(); + when(ff.getRpcQos(any())) + .thenReturn(FirestoreStatefulComponentFactory.INSTANCE.getRpcQos(options)); + + ArgumentCaptor responses = + ArgumentCaptor.forClass(PartitionQueryPair.class); + + doNothing().when(processContext).output(responses.capture()); + + when(processContext.element()).thenReturn(request1); + + PartitionQueryFn fn = new PartitionQueryFn(clock, ff, options); + + runFunction(fn); + + List expected = newArrayList(new PartitionQueryPair(request1, response1)); + List allValues = responses.getAllValues(); + assertEquals(expected, allValues); + } + @Override public void resumeFromLastReadValue() throws Exception { when(ff.getFirestoreStub(any())).thenReturn(stub); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java index 25ed63ccddd8..ed789dae59f4 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/firestore/PartitionQueryResponseToRunQueryRequestTest.java @@ -121,6 +121,39 @@ public void ensureCursorPairingWorks() { assertEquals(expectedQueries, actualQueries); } + @Test + public void ensureCursorPairingWorks_emptyCursorsInResponse() { + StructuredQuery query = + StructuredQuery.newBuilder() + .addFrom( + CollectionSelector.newBuilder() + .setAllDescendants(true) + .setCollectionId("c1") + .build()) + .build(); + + List expectedQueries = newArrayList(query); + + PartitionQueryPair partitionQueryPair = + new PartitionQueryPair( + PartitionQueryRequest.newBuilder().setStructuredQuery(query).build(), + PartitionQueryResponse.newBuilder().build()); + + ArgumentCaptor captor = ArgumentCaptor.forClass(RunQueryRequest.class); + when(processContext.element()).thenReturn(partitionQueryPair); + doNothing().when(processContext).output(captor.capture()); + + PartitionQueryResponseToRunQueryRequest fn = new PartitionQueryResponseToRunQueryRequest(); + fn.processElement(processContext); + + List actualQueries = + captor.getAllValues().stream() + .map(RunQueryRequest::getStructuredQuery) + .collect(Collectors.toList()); + + assertEquals(expectedQueries, actualQueries); + } + private static Cursor referenceValueCursor(String referenceValue) { return Cursor.newBuilder() .addValues(Value.newBuilder().setReferenceValue(referenceValue).build())