watermarkEstimator) {
+ return () -> {
+ final Instant watermark = watermarkEstimator.currentWatermark();
+ LOG.debug("[" + token + "] Updating current watermark to " + watermark);
+ try {
+ partitionMetadataDao.updateWatermark(
+ token, TimestampConverter.timestampFromMillis(watermark.getMillis()));
+ } catch (SpannerException e) {
+ if (e.getErrorCode() == ErrorCode.NOT_FOUND) {
+ LOG.debug("[" + token + "] Unable to update the current watermark, partition NOT FOUND");
+ } else {
+ LOG.error("[" + token + "] Error updating the current watermark: " + e.getMessage(), e);
+ }
+ }
+ };
+ }
+
+ private boolean isTimestampOutOfRange(SpannerException e) {
+ return (e.getErrorCode() == ErrorCode.INVALID_ARGUMENT
+ || e.getErrorCode() == ErrorCode.OUT_OF_RANGE)
+ && e.getMessage() != null
+ && e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/package-info.java
new file mode 100644
index 000000000000..200abaca90dc
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Action processors for each of the types of Change Stream records received. */
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
new file mode 100644
index 000000000000..1c73ad6dae2c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
+
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics.PARTITION_ID_ATTRIBUTE_LABEL;
+
+import io.opencensus.common.Scope;
+import io.opencensus.trace.AttributeValue;
+import io.opencensus.trace.Tracer;
+import io.opencensus.trace.Tracing;
+import java.io.Serializable;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DataChangeRecordAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.HeartbeatRecordAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction.ReadChangeStreamPartitionRangeTracker;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A SDF (Splittable DoFn) class which is responsible for performing a change stream query for a
+ * given partition. A different action will be taken depending on the type of record received from
+ * the query. This component will also reflect the partition state in the partition metadata tables.
+ *
+ * The processing of a partition is delegated to the {@link QueryChangeStreamAction}.
+ */
+// Allows for transient QueryChangeStreamAction
+@SuppressWarnings("initialization.fields.uninitialized")
+@UnboundedPerElement
+public class ReadChangeStreamPartitionDoFn extends DoFn
+ implements Serializable {
+
+ private static final long serialVersionUID = -7574596218085711975L;
+ private static final Logger LOG = LoggerFactory.getLogger(ReadChangeStreamPartitionDoFn.class);
+ private static final Tracer TRACER = Tracing.getTracer();
+
+ private final DaoFactory daoFactory;
+ private final MapperFactory mapperFactory;
+ private final ActionFactory actionFactory;
+ private final ChangeStreamMetrics metrics;
+
+ private transient QueryChangeStreamAction queryChangeStreamAction;
+
+ /**
+ * This class needs a {@link DaoFactory} to build DAOs to access the partition metadata tables and
+ * to perform the change streams query. It uses mappers to transform database rows into the {@link
+ * org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChangeStreamRecord} model. It uses the
+ * {@link ActionFactory} to construct the action dispatchers, which will perform the change stream
+ * query and process each type of record received. It emits metrics for the partition using the
+ * {@link ChangeStreamMetrics}.
+ *
+ * @param daoFactory the {@link DaoFactory} to construct {@link PartitionMetadataDao}s and {@link
+ * ChangeStreamDao}s
+ * @param mapperFactory the {@link MapperFactory} to construct {@link ChangeStreamRecordMapper}s
+ * @param actionFactory the {@link ActionFactory} to construct actions
+ * @param metrics the {@link ChangeStreamMetrics} to emit partition related metrics
+ */
+ public ReadChangeStreamPartitionDoFn(
+ DaoFactory daoFactory,
+ MapperFactory mapperFactory,
+ ActionFactory actionFactory,
+ ChangeStreamMetrics metrics) {
+ this.daoFactory = daoFactory;
+ this.mapperFactory = mapperFactory;
+ this.actionFactory = actionFactory;
+ this.metrics = metrics;
+ }
+
+ @GetInitialWatermarkEstimatorState
+ public Instant getInitialWatermarkEstimatorState(@Timestamp Instant currentElementTimestamp) {
+ return currentElementTimestamp;
+ }
+
+ @NewWatermarkEstimator
+ public ManualWatermarkEstimator newWatermarkEstimator(
+ @WatermarkEstimatorState Instant watermarkEstimatorState) {
+ return new Manual(watermarkEstimatorState);
+ }
+
+ /**
+ * The restriction for a partition will be defined from the start and end timestamp to query the
+ * partition for. These timestamps are converted to microseconds. The {@link OffsetRange}
+ * restriction represents a closed-open interval, while the start / end timestamps represent a
+ * closed-closed interval, so we add 1 microsecond to the end timestamp to convert it to
+ * closed-open.
+ *
+ * In this function we also update the partition state to {@link
+ * PartitionMetadata.State#RUNNING}.
+ *
+ * @param partition the partition to be queried
+ * @return the offset range from the partition start timestamp to the partition end timestamp + 1
+ * microsecond
+ */
+ @GetInitialRestriction
+ public OffsetRange initialRestriction(@Element PartitionMetadata partition) {
+ final String token = partition.getPartitionToken();
+ final com.google.cloud.Timestamp startTimestamp = partition.getStartTimestamp();
+ final long startMicros = TimestampConverter.timestampToMicros(startTimestamp);
+ // Offset range represents closed-open interval
+ final long endMicros =
+ Optional.ofNullable(partition.getEndTimestamp())
+ .map(TimestampConverter::timestampToMicros)
+ .map(micros -> micros + 1)
+ .orElse(TimestampConverter.MAX_MICROS + 1);
+ final com.google.cloud.Timestamp partitionScheduledAt = partition.getScheduledAt();
+ final com.google.cloud.Timestamp partitionRunningAt =
+ daoFactory.getPartitionMetadataDao().updateToRunning(token);
+
+ if (partitionScheduledAt != null && partitionRunningAt != null) {
+ metrics.updatePartitionScheduledToRunning(
+ new Duration(
+ partitionScheduledAt.toSqlTimestamp().getTime(),
+ partitionRunningAt.toSqlTimestamp().getTime()));
+ }
+
+ return new OffsetRange(startMicros, endMicros);
+ }
+
+ @NewTracker
+ public ReadChangeStreamPartitionRangeTracker newTracker(
+ @Element PartitionMetadata partition, @Restriction OffsetRange offsetRange) {
+ return new ReadChangeStreamPartitionRangeTracker(partition, offsetRange);
+ }
+
+ /**
+ * Constructs instances for the {@link PartitionMetadataDao}, {@link ChangeStreamDao}, {@link
+ * ChangeStreamRecordMapper}, {@link PartitionMetadataMapper}, {@link DataChangeRecordAction},
+ * {@link HeartbeatRecordAction}, {@link ChildPartitionsRecordAction} and {@link
+ * QueryChangeStreamAction}.
+ */
+ @Setup
+ public void setup() {
+ final PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao();
+ final ChangeStreamDao changeStreamDao = daoFactory.getChangeStreamDao();
+ final ChangeStreamRecordMapper changeStreamRecordMapper =
+ mapperFactory.changeStreamRecordMapper();
+ final PartitionMetadataMapper partitionMetadataMapper = mapperFactory.partitionMetadataMapper();
+ final DataChangeRecordAction dataChangeRecordAction = actionFactory.dataChangeRecordAction();
+ final HeartbeatRecordAction heartbeatRecordAction =
+ actionFactory.heartbeatRecordAction(metrics);
+ final ChildPartitionsRecordAction childPartitionsRecordAction =
+ actionFactory.childPartitionsRecordAction(partitionMetadataDao, metrics);
+
+ this.queryChangeStreamAction =
+ actionFactory.queryChangeStreamAction(
+ changeStreamDao,
+ partitionMetadataDao,
+ changeStreamRecordMapper,
+ partitionMetadataMapper,
+ dataChangeRecordAction,
+ heartbeatRecordAction,
+ childPartitionsRecordAction);
+ }
+
+ /**
+ * Performs a change stream query for a given partition. A different action will be taken
+ * depending on the type of record received from the query. This component will also reflect the
+ * partition state in the partition metadata tables.
+ *
+ *
The processing of a partition is delegated to the {@link QueryChangeStreamAction}.
+ *
+ * @param partition the partition to be queried
+ * @param tracker an instance of {@link ReadChangeStreamPartitionRangeTracker}
+ * @param receiver a {@link DataChangeRecord} {@link OutputReceiver}
+ * @param watermarkEstimator a {@link ManualWatermarkEstimator} of {@link Instant}
+ * @param bundleFinalizer the bundle finalizer
+ * @return a {@link ProcessContinuation#stop()} if a record timestamp could not be claimed or if
+ * the partition processing has finished
+ */
+ @ProcessElement
+ public ProcessContinuation processElement(
+ @Element PartitionMetadata partition,
+ RestrictionTracker tracker,
+ OutputReceiver receiver,
+ ManualWatermarkEstimator watermarkEstimator,
+ BundleFinalizer bundleFinalizer) {
+
+ final String token = partition.getPartitionToken();
+ try (Scope scope =
+ TRACER
+ .spanBuilder("ReadChangeStreamPartitionDoFn.processElement")
+ .setRecordEvents(true)
+ .startScopedSpan()) {
+ TRACER
+ .getCurrentSpan()
+ .putAttribute(PARTITION_ID_ATTRIBUTE_LABEL, AttributeValue.stringAttributeValue(token));
+
+ LOG.debug(
+ "[" + token + "] Processing element with restriction " + tracker.currentRestriction());
+
+ return queryChangeStreamAction.run(
+ partition, tracker, receiver, watermarkEstimator, bundleFinalizer);
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java
new file mode 100644
index 000000000000..e7832211a4c5
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTracker.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+
+/**
+ * This restriction tracker is a decorator on top of the {@link OffsetRangeTracker}. It modifies the
+ * behaviour of {@link OffsetRangeTracker#tryClaim(Long)} to ignore claims for the same long
+ * multiple times. This is because several change stream records might have the same timestamp, thus
+ * leading to multiple claims of the same {@link Long}. Other than that, it modifies the {@link
+ * OffsetRangeTracker#trySplit(double)} method to always deny splits for the {@link
+ * InitialPartition#PARTITION_TOKEN}, since we only need to perform this query once.
+ */
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ReadChangeStreamPartitionRangeTracker extends OffsetRangeTracker {
+
+ private final PartitionMetadata partition;
+
+ /**
+ * Receives the partition that will be queried and be using this tracker, alongside the range
+ * itself.
+ *
+ * @param partition the partition that will use the tracker
+ * @param range closed / open range interval representing the start / end times for a partition
+ */
+ public ReadChangeStreamPartitionRangeTracker(PartitionMetadata partition, OffsetRange range) {
+ super(range);
+ this.partition = partition;
+ }
+
+ /**
+ * Attempts to claim the given offset.
+ *
+ * Must be equal or larger than the last successfully claimed offset.
+ *
+ * @return {@code true} if the offset was successfully claimed, {@code false} if it is outside the
+ * current {@link OffsetRange} of this tracker (in that case this operation is a no-op).
+ */
+ @Override
+ public boolean tryClaim(Long i) {
+ if (i.equals(lastAttemptedOffset)) {
+ return true;
+ }
+ return super.tryClaim(i);
+ }
+
+ /**
+ * If the partition token is the {@link InitialPartition#PARTITION_TOKEN}, it does not allow for
+ * splits (returns null).
+ */
+ @Override
+ public SplitResult trySplit(double fractionOfRemainder) {
+ if (InitialPartition.isInitialPartition(partition.getPartitionToken())) {
+ return null;
+ }
+ return super.trySplit(fractionOfRemainder);
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/package-info.java
new file mode 100644
index 000000000000..7868b297bb90
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Custom restriction tracker related classes. */
+@Experimental
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import org.apache.beam.sdk.annotations.Experimental;
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverterTest.java
new file mode 100644
index 000000000000..1cbc75ac94f9
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/TimestampConverterTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.cloud.Timestamp;
+import org.junit.Test;
+
+public class TimestampConverterTest {
+
+ @Test
+ public void testConvertTimestampToMicros() {
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(2_000_360L);
+
+ assertEquals(2_000_360L, TimestampConverter.timestampToMicros(timestamp));
+ }
+
+ @Test
+ public void testConvertTimestampZeroToMicros() {
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(0);
+
+ assertEquals(0L, TimestampConverter.timestampToMicros(timestamp));
+ }
+
+ @Test
+ public void testConvertTimestampMinToMicros() {
+ final Timestamp timestamp = Timestamp.MIN_VALUE;
+
+ assertEquals(-62135596800000000L, TimestampConverter.timestampToMicros(timestamp));
+ }
+
+ @Test
+ public void testConvertTimestampMaxToMicros() {
+ final Timestamp timestamp = Timestamp.MAX_VALUE;
+
+ assertEquals(253402300799999999L, TimestampConverter.timestampToMicros(timestamp));
+ }
+
+ @Test
+ public void testConvertMillisToTimestamp() {
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(1234_000L);
+
+ assertEquals(timestamp, TimestampConverter.timestampFromMillis(1234L));
+ }
+
+ @Test
+ public void testTruncateNanos() {
+ final Timestamp timestamp = Timestamp.ofTimeSecondsAndNanos(10L, 123456789);
+ final Timestamp expectedTimestamp = Timestamp.ofTimeSecondsAndNanos(10L, 123456000);
+
+ assertEquals(expectedTimestamp, TimestampConverter.truncateNanos(timestamp));
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
new file mode 100644
index 000000000000..429a3db5242a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ChildPartitionsRecordActionTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.CREATED;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao.InTransactionContext;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.util.TestTransactionAnswer;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChildPartitionsRecordActionTest {
+
+ private PartitionMetadataDao dao;
+ private InTransactionContext transaction;
+ private ChangeStreamMetrics metrics;
+ private ChildPartitionsRecordAction action;
+ private RestrictionTracker tracker;
+ private ManualWatermarkEstimator watermarkEstimator;
+
+ @Before
+ public void setUp() {
+ dao = mock(PartitionMetadataDao.class);
+ transaction = mock(InTransactionContext.class);
+ metrics = mock(ChangeStreamMetrics.class);
+ action = new ChildPartitionsRecordAction(dao, metrics);
+ tracker = mock(RestrictionTracker.class);
+ watermarkEstimator = mock(ManualWatermarkEstimator.class);
+
+ when(dao.runInTransaction(any())).thenAnswer(new TestTransactionAnswer(transaction));
+ }
+
+ @Test
+ public void testRestrictionClaimedAndIsSplitCase() {
+ final String partitionToken = "partitionToken";
+ final long heartbeat = 30L;
+ final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+ final PartitionMetadata partition = mock(PartitionMetadata.class);
+ final ChildPartitionsRecord record =
+ new ChildPartitionsRecord(
+ startTimestamp,
+ "recordSequence",
+ Arrays.asList(
+ new ChildPartition("childPartition1", partitionToken),
+ new ChildPartition("childPartition2", partitionToken)),
+ null);
+ when(partition.getEndTimestamp()).thenReturn(endTimestamp);
+ when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+ when(tracker.tryClaim(10L)).thenReturn(true);
+ when(transaction.getPartition("childPartition1")).thenReturn(null);
+ when(transaction.getPartition("childPartition2")).thenReturn(null);
+
+ final Optional maybeContinuation =
+ action.run(partition, record, tracker, watermarkEstimator);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
+ verify(transaction)
+ .insert(
+ PartitionMetadata.newBuilder()
+ .setPartitionToken("childPartition1")
+ .setParentTokens(Sets.newHashSet(partitionToken))
+ .setStartTimestamp(startTimestamp)
+ .setEndTimestamp(endTimestamp)
+ .setHeartbeatMillis(heartbeat)
+ .setState(CREATED)
+ .setWatermark(startTimestamp)
+ .build());
+ verify(transaction)
+ .insert(
+ PartitionMetadata.newBuilder()
+ .setPartitionToken("childPartition2")
+ .setParentTokens(Sets.newHashSet(partitionToken))
+ .setStartTimestamp(startTimestamp)
+ .setEndTimestamp(endTimestamp)
+ .setHeartbeatMillis(heartbeat)
+ .setState(CREATED)
+ .setWatermark(startTimestamp)
+ .build());
+ }
+
+ @Test
+ public void testRestrictionClaimedAnsIsSplitCaseAndChildExists() {
+ final String partitionToken = "partitionToken";
+ final long heartbeat = 30L;
+ final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+ final PartitionMetadata partition = mock(PartitionMetadata.class);
+ final ChildPartitionsRecord record =
+ new ChildPartitionsRecord(
+ startTimestamp,
+ "recordSequence",
+ Arrays.asList(
+ new ChildPartition("childPartition1", partitionToken),
+ new ChildPartition("childPartition2", partitionToken)),
+ null);
+ when(partition.getEndTimestamp()).thenReturn(endTimestamp);
+ when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+ when(tracker.tryClaim(10L)).thenReturn(true);
+ when(transaction.getPartition("childPartition1")).thenReturn(mock(Struct.class));
+ when(transaction.getPartition("childPartition2")).thenReturn(mock(Struct.class));
+
+ final Optional maybeContinuation =
+ action.run(partition, record, tracker, watermarkEstimator);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
+ }
+
+ @Test
+ public void testRestrictionClaimedAndIsMergeCaseAndChildNotExists() {
+ final String partitionToken = "partitionToken";
+ final String anotherPartitionToken = "anotherPartitionToken";
+ final String childPartitionToken = "childPartition1";
+ final HashSet parentTokens = Sets.newHashSet(partitionToken, anotherPartitionToken);
+ final long heartbeat = 30L;
+ final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+ final PartitionMetadata partition = mock(PartitionMetadata.class);
+ final ChildPartitionsRecord record =
+ new ChildPartitionsRecord(
+ startTimestamp,
+ "recordSequence",
+ Collections.singletonList(new ChildPartition(childPartitionToken, parentTokens)),
+ null);
+ when(partition.getEndTimestamp()).thenReturn(endTimestamp);
+ when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+ when(tracker.tryClaim(10L)).thenReturn(true);
+ when(transaction.getPartition(childPartitionToken)).thenReturn(null);
+
+ final Optional maybeContinuation =
+ action.run(partition, record, tracker, watermarkEstimator);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
+ verify(transaction)
+ .insert(
+ PartitionMetadata.newBuilder()
+ .setPartitionToken(childPartitionToken)
+ .setParentTokens(parentTokens)
+ .setStartTimestamp(startTimestamp)
+ .setEndTimestamp(endTimestamp)
+ .setHeartbeatMillis(heartbeat)
+ .setState(CREATED)
+ .setWatermark(startTimestamp)
+ .build());
+ }
+
+ @Test
+ public void testRestrictionClaimedAndIsMergeCaseAndChildExists() {
+ final String partitionToken = "partitionToken";
+ final String anotherPartitionToken = "anotherPartitionToken";
+ final String childPartitionToken = "childPartition1";
+ final HashSet parentTokens = Sets.newHashSet(partitionToken, anotherPartitionToken);
+ final long heartbeat = 30L;
+ final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Timestamp endTimestamp = Timestamp.ofTimeMicroseconds(20L);
+ final PartitionMetadata partition = mock(PartitionMetadata.class);
+ final ChildPartitionsRecord record =
+ new ChildPartitionsRecord(
+ startTimestamp,
+ "recordSequence",
+ Collections.singletonList(new ChildPartition(childPartitionToken, parentTokens)),
+ null);
+ when(partition.getEndTimestamp()).thenReturn(endTimestamp);
+ when(partition.getHeartbeatMillis()).thenReturn(heartbeat);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+ when(tracker.tryClaim(10L)).thenReturn(true);
+ when(transaction.getPartition(childPartitionToken)).thenReturn(mock(Struct.class));
+
+ final Optional maybeContinuation =
+ action.run(partition, record, tracker, watermarkEstimator);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new Instant(startTimestamp.toSqlTimestamp().getTime()));
+ verify(transaction, never()).insert(any());
+ }
+
+ @Test
+ public void testRestrictionNotClaimed() {
+ final String partitionToken = "partitionToken";
+ final Timestamp startTimestamp = Timestamp.ofTimeMicroseconds(10L);
+ final PartitionMetadata partition = mock(PartitionMetadata.class);
+ final ChildPartitionsRecord record =
+ new ChildPartitionsRecord(
+ startTimestamp,
+ "recordSequence",
+ Arrays.asList(
+ new ChildPartition("childPartition1", partitionToken),
+ new ChildPartition("childPartition2", partitionToken)),
+ null);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+ when(tracker.tryClaim(10L)).thenReturn(false);
+
+ final Optional maybeContinuation =
+ action.run(partition, record, tracker, watermarkEstimator);
+
+ assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
+ verify(watermarkEstimator, never()).setWatermark(any());
+ verify(dao, never()).insert(any());
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
new file mode 100644
index 000000000000..15789bb91156
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DataChangeRecordActionTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DataChangeRecordActionTest {
+
+ private DataChangeRecordAction action;
+ private PartitionMetadata partition;
+ private RestrictionTracker tracker;
+ private OutputReceiver outputReceiver;
+ private ManualWatermarkEstimator watermarkEstimator;
+
+ @Before
+ public void setUp() {
+ action = new DataChangeRecordAction();
+ partition = mock(PartitionMetadata.class);
+ tracker = mock(RestrictionTracker.class);
+ outputReceiver = mock(OutputReceiver.class);
+ watermarkEstimator = mock(ManualWatermarkEstimator.class);
+ }
+
+ @Test
+ public void testRestrictionClaimed() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final Instant instant = new Instant(timestamp.toSqlTimestamp().getTime());
+ final DataChangeRecord record = mock(DataChangeRecord.class);
+ when(record.getCommitTimestamp()).thenReturn(timestamp);
+ when(tracker.tryClaim(10L)).thenReturn(true);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional maybeContinuation =
+ action.run(partition, record, tracker, outputReceiver, watermarkEstimator);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(outputReceiver).outputWithTimestamp(record, instant);
+ verify(watermarkEstimator).setWatermark(instant);
+ }
+
+ @Test
+ public void testRestrictionNotClaimed() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+ final DataChangeRecord record = mock(DataChangeRecord.class);
+ when(record.getCommitTimestamp()).thenReturn(timestamp);
+ when(tracker.tryClaim(10L)).thenReturn(false);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional maybeContinuation =
+ action.run(partition, record, tracker, outputReceiver, watermarkEstimator);
+
+ assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
+ verify(outputReceiver, never()).outputWithTimestamp(any(), any());
+ verify(watermarkEstimator, never()).setWatermark(any());
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
new file mode 100644
index 000000000000..c7ed34f03453
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/HeartbeatRecordActionTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class HeartbeatRecordActionTest {
+
+ private HeartbeatRecordAction action;
+ private PartitionMetadata partition;
+ private RestrictionTracker tracker;
+ private ManualWatermarkEstimator watermarkEstimator;
+
+ @Before
+ public void setUp() {
+ final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
+ action = new HeartbeatRecordAction(metrics);
+ partition = mock(PartitionMetadata.class);
+ tracker = mock(RestrictionTracker.class);
+ watermarkEstimator = mock(ManualWatermarkEstimator.class);
+ }
+
+ @Test
+ public void testRestrictionClaimed() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+
+ when(tracker.tryClaim(10L)).thenReturn(true);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional maybeContinuation =
+ action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator);
+
+ assertEquals(Optional.empty(), maybeContinuation);
+ verify(watermarkEstimator).setWatermark(new Instant(timestamp.toSqlTimestamp().getTime()));
+ }
+
+ @Test
+ public void testRestrictionNotClaimed() {
+ final String partitionToken = "partitionToken";
+ final Timestamp timestamp = Timestamp.ofTimeMicroseconds(10L);
+
+ when(tracker.tryClaim(10L)).thenReturn(false);
+ when(partition.getPartitionToken()).thenReturn(partitionToken);
+
+ final Optional maybeContinuation =
+ action.run(partition, new HeartbeatRecord(timestamp, null), tracker, watermarkEstimator);
+
+ assertEquals(Optional.of(ProcessContinuation.stop()), maybeContinuation);
+ verify(watermarkEstimator, never()).setWatermark(any());
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
new file mode 100644
index 000000000000..41fb1f7bf640
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;
+
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.Struct;
+import java.util.Arrays;
+import java.util.Optional;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.TimestampConverter;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSet;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamResultSetMetadata;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+
+public class QueryChangeStreamActionTest {
+ private static final String PARTITION_TOKEN = "partitionToken";
+ private static final Timestamp PARTITION_START_TIMESTAMP = Timestamp.ofTimeMicroseconds(10L);
+ private static final Timestamp PARTITION_END_TIMESTAMP = Timestamp.ofTimeMicroseconds(30L);
+ private static final long PARTITION_END_MICROS = 30L;
+ private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L;
+ private static final Instant WATERMARK = Instant.now();
+ private static final Timestamp WATERMARK_TIMESTAMP =
+ TimestampConverter.timestampFromMillis(WATERMARK.getMillis());
+
+ private ChangeStreamDao changeStreamDao;
+ private PartitionMetadataDao partitionMetadataDao;
+ private PartitionMetadata partition;
+ private OffsetRange restriction;
+ private RestrictionTracker restrictionTracker;
+ private OutputReceiver outputReceiver;
+ private ChangeStreamRecordMapper changeStreamRecordMapper;
+ private PartitionMetadataMapper partitionMetadataMapper;
+ private ManualWatermarkEstimator watermarkEstimator;
+ private BundleFinalizer bundleFinalizer;
+ private DataChangeRecordAction dataChangeRecordAction;
+ private HeartbeatRecordAction heartbeatRecordAction;
+ private ChildPartitionsRecordAction childPartitionsRecordAction;
+ private QueryChangeStreamAction action;
+
+ @Before
+ public void setUp() throws Exception {
+ changeStreamDao = mock(ChangeStreamDao.class);
+ partitionMetadataDao = mock(PartitionMetadataDao.class);
+ changeStreamRecordMapper = mock(ChangeStreamRecordMapper.class);
+ partitionMetadataMapper = mock(PartitionMetadataMapper.class);
+ dataChangeRecordAction = mock(DataChangeRecordAction.class);
+ heartbeatRecordAction = mock(HeartbeatRecordAction.class);
+ childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
+
+ action =
+ new QueryChangeStreamAction(
+ changeStreamDao,
+ partitionMetadataDao,
+ changeStreamRecordMapper,
+ partitionMetadataMapper,
+ dataChangeRecordAction,
+ heartbeatRecordAction,
+ childPartitionsRecordAction);
+ final Struct row = mock(Struct.class);
+ partition =
+ PartitionMetadata.newBuilder()
+ .setPartitionToken(PARTITION_TOKEN)
+ .setParentTokens(Sets.newHashSet("parentToken"))
+ .setStartTimestamp(PARTITION_START_TIMESTAMP)
+ .setEndTimestamp(PARTITION_END_TIMESTAMP)
+ .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
+ .setState(SCHEDULED)
+ .setWatermark(WATERMARK_TIMESTAMP)
+ .setScheduledAt(Timestamp.now())
+ .build();
+ restriction = mock(OffsetRange.class);
+ restrictionTracker = mock(RestrictionTracker.class);
+ outputReceiver = mock(OutputReceiver.class);
+ watermarkEstimator = mock(ManualWatermarkEstimator.class);
+ bundleFinalizer = new BundleFinalizerStub();
+
+ when(restrictionTracker.currentRestriction()).thenReturn(restriction);
+ when(restriction.getFrom()).thenReturn(10L);
+ when(partitionMetadataDao.getPartition(PARTITION_TOKEN)).thenReturn(row);
+ when(partitionMetadataMapper.from(row)).thenReturn(partition);
+ }
+
+ @Test
+ public void testQueryChangeStreamWithDataChangeRecord() {
+ final Struct rowAsStruct = mock(Struct.class);
+ final ChangeStreamResultSetMetadata resultSetMetadata =
+ mock(ChangeStreamResultSetMetadata.class);
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final DataChangeRecord record1 = mock(DataChangeRecord.class);
+ final DataChangeRecord record2 = mock(DataChangeRecord.class);
+ when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+ when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+ when(changeStreamDao.changeStreamQuery(
+ PARTITION_TOKEN,
+ PARTITION_START_TIMESTAMP,
+ PARTITION_END_TIMESTAMP,
+ PARTITION_HEARTBEAT_MILLIS))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+ when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+ when(changeStreamRecordMapper.toChangeStreamRecords(partition, rowAsStruct, resultSetMetadata))
+ .thenReturn(Arrays.asList(record1, record2));
+ when(dataChangeRecordAction.run(
+ partition, record1, restrictionTracker, outputReceiver, watermarkEstimator))
+ .thenReturn(Optional.empty());
+ when(dataChangeRecordAction.run(
+ partition, record2, restrictionTracker, outputReceiver, watermarkEstimator))
+ .thenReturn(Optional.of(ProcessContinuation.stop()));
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ verify(dataChangeRecordAction)
+ .run(partition, record1, restrictionTracker, outputReceiver, watermarkEstimator);
+ verify(dataChangeRecordAction)
+ .run(partition, record2, restrictionTracker, outputReceiver, watermarkEstimator);
+ verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
+
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
+ verify(restrictionTracker, never()).tryClaim(any());
+ }
+
+ @Test
+ public void testQueryChangeStreamWithHeartbeatRecord() {
+ final Struct rowAsStruct = mock(Struct.class);
+ final ChangeStreamResultSetMetadata resultSetMetadata =
+ mock(ChangeStreamResultSetMetadata.class);
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final HeartbeatRecord record1 = mock(HeartbeatRecord.class);
+ final HeartbeatRecord record2 = mock(HeartbeatRecord.class);
+ when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+ when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+ when(changeStreamDao.changeStreamQuery(
+ PARTITION_TOKEN,
+ PARTITION_START_TIMESTAMP,
+ PARTITION_END_TIMESTAMP,
+ PARTITION_HEARTBEAT_MILLIS))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+ when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+ when(changeStreamRecordMapper.toChangeStreamRecords(partition, rowAsStruct, resultSetMetadata))
+ .thenReturn(Arrays.asList(record1, record2));
+ when(heartbeatRecordAction.run(partition, record1, restrictionTracker, watermarkEstimator))
+ .thenReturn(Optional.empty());
+ when(heartbeatRecordAction.run(partition, record2, restrictionTracker, watermarkEstimator))
+ .thenReturn(Optional.of(ProcessContinuation.stop()));
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ verify(heartbeatRecordAction).run(partition, record1, restrictionTracker, watermarkEstimator);
+ verify(heartbeatRecordAction).run(partition, record2, restrictionTracker, watermarkEstimator);
+ verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
+ verify(restrictionTracker, never()).tryClaim(any());
+ }
+
+ @Test
+ public void testQueryChangeStreamWithChildPartitionsRecord() {
+ final Struct rowAsStruct = mock(Struct.class);
+ final ChangeStreamResultSetMetadata resultSetMetadata =
+ mock(ChangeStreamResultSetMetadata.class);
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
+ final ChildPartitionsRecord record2 = mock(ChildPartitionsRecord.class);
+ when(record1.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+ when(record2.getRecordTimestamp()).thenReturn(PARTITION_START_TIMESTAMP);
+ when(changeStreamDao.changeStreamQuery(
+ PARTITION_TOKEN,
+ PARTITION_START_TIMESTAMP,
+ PARTITION_END_TIMESTAMP,
+ PARTITION_HEARTBEAT_MILLIS))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+ when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+ when(changeStreamRecordMapper.toChangeStreamRecords(partition, rowAsStruct, resultSetMetadata))
+ .thenReturn(Arrays.asList(record1, record2));
+ when(childPartitionsRecordAction.run(
+ partition, record1, restrictionTracker, watermarkEstimator))
+ .thenReturn(Optional.empty());
+ when(childPartitionsRecordAction.run(
+ partition, record2, restrictionTracker, watermarkEstimator))
+ .thenReturn(Optional.of(ProcessContinuation.stop()));
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ verify(childPartitionsRecordAction)
+ .run(partition, record1, restrictionTracker, watermarkEstimator);
+ verify(childPartitionsRecordAction)
+ .run(partition, record2, restrictionTracker, watermarkEstimator);
+ verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+ verify(restrictionTracker, never()).tryClaim(any());
+ }
+
+ @Test
+ public void testQueryChangeStreamWithRestrictionStartAfterPartitionStart() {
+ final Struct rowAsStruct = mock(Struct.class);
+ final ChangeStreamResultSetMetadata resultSetMetadata =
+ mock(ChangeStreamResultSetMetadata.class);
+ final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
+ final ChildPartitionsRecord record1 = mock(ChildPartitionsRecord.class);
+ final ChildPartitionsRecord record2 = mock(ChildPartitionsRecord.class);
+
+ // One microsecond after partition start timestamp
+ when(restriction.getFrom()).thenReturn(11L);
+ // This record should be ignored because it is before restriction.getFrom
+ when(record1.getRecordTimestamp()).thenReturn(Timestamp.ofTimeMicroseconds(10L));
+ // This record should be included because it is at the restriction.getFrom
+ when(record2.getRecordTimestamp()).thenReturn(Timestamp.ofTimeMicroseconds(11L));
+ // We should start the query 1 microsecond before the restriction.getFrom
+ when(changeStreamDao.changeStreamQuery(
+ PARTITION_TOKEN,
+ Timestamp.ofTimeMicroseconds(10L),
+ PARTITION_END_TIMESTAMP,
+ PARTITION_HEARTBEAT_MILLIS))
+ .thenReturn(resultSet);
+ when(resultSet.next()).thenReturn(true);
+ when(resultSet.getCurrentRowAsStruct()).thenReturn(rowAsStruct);
+ when(resultSet.getMetadata()).thenReturn(resultSetMetadata);
+ when(changeStreamRecordMapper.toChangeStreamRecords(partition, rowAsStruct, resultSetMetadata))
+ .thenReturn(Arrays.asList(record1, record2));
+ when(childPartitionsRecordAction.run(
+ partition, record2, restrictionTracker, watermarkEstimator))
+ .thenReturn(Optional.of(ProcessContinuation.stop()));
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ verify(childPartitionsRecordAction)
+ .run(partition, record2, restrictionTracker, watermarkEstimator);
+ verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
+
+ verify(childPartitionsRecordAction, never())
+ .run(partition, record1, restrictionTracker, watermarkEstimator);
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+ verify(restrictionTracker, never()).tryClaim(any());
+ }
+
+ @Test
+ public void testQueryChangeStreamWithStreamFinished() {
+ final ChangeStreamResultSet changeStreamResultSet = mock(ChangeStreamResultSet.class);
+ when(changeStreamDao.changeStreamQuery(
+ PARTITION_TOKEN,
+ PARTITION_START_TIMESTAMP,
+ PARTITION_END_TIMESTAMP,
+ PARTITION_HEARTBEAT_MILLIS))
+ .thenReturn(changeStreamResultSet);
+ when(changeStreamResultSet.next()).thenReturn(false);
+ when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
+ when(restrictionTracker.tryClaim(PARTITION_END_MICROS)).thenReturn(true);
+
+ final ProcessContinuation result =
+ action.run(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ verify(partitionMetadataDao).updateWatermark(PARTITION_TOKEN, WATERMARK_TIMESTAMP);
+ verify(partitionMetadataDao).updateToFinished(PARTITION_TOKEN);
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
+ }
+
+ private static class BundleFinalizerStub implements BundleFinalizer {
+ @Override
+ public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
new file mode 100644
index 000000000000..f70f119dd3d8
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn;
+
+import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.Timestamp;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ChildPartitionsRecordAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.DataChangeRecordAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.HeartbeatRecordAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.QueryChangeStreamAction;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.ChangeStreamDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.ChangeStreamRecordMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.MapperFactory;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.mapper.PartitionMetadataMapper;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class ReadChangeStreamPartitionDoFnTest {
+
+ private static final String PARTITION_TOKEN = "partitionToken";
+ private static final Timestamp PARTITION_START_TIMESTAMP =
+ Timestamp.ofTimeSecondsAndNanos(10, 20);
+ private static final Timestamp PARTITION_END_TIMESTAMP = Timestamp.ofTimeSecondsAndNanos(30, 40);
+ private static final long PARTITION_HEARTBEAT_MILLIS = 30_000L;
+
+ private ReadChangeStreamPartitionDoFn doFn;
+ private PartitionMetadata partition;
+ private OffsetRange restriction;
+ private RestrictionTracker restrictionTracker;
+ private OutputReceiver outputReceiver;
+ private ManualWatermarkEstimator watermarkEstimator;
+ private BundleFinalizer bundleFinalizer;
+ private DataChangeRecordAction dataChangeRecordAction;
+ private HeartbeatRecordAction heartbeatRecordAction;
+ private ChildPartitionsRecordAction childPartitionsRecordAction;
+ private QueryChangeStreamAction queryChangeStreamAction;
+
+ @Before
+ public void setUp() {
+ final DaoFactory daoFactory = mock(DaoFactory.class);
+ final MapperFactory mapperFactory = mock(MapperFactory.class);
+ final ChangeStreamMetrics metrics = mock(ChangeStreamMetrics.class);
+ final ActionFactory actionFactory = mock(ActionFactory.class);
+ final PartitionMetadataDao partitionMetadataDao = mock(PartitionMetadataDao.class);
+ final ChangeStreamDao changeStreamDao = mock(ChangeStreamDao.class);
+ final ChangeStreamRecordMapper changeStreamRecordMapper = mock(ChangeStreamRecordMapper.class);
+ final PartitionMetadataMapper partitionMetadataMapper = mock(PartitionMetadataMapper.class);
+ dataChangeRecordAction = mock(DataChangeRecordAction.class);
+ heartbeatRecordAction = mock(HeartbeatRecordAction.class);
+ childPartitionsRecordAction = mock(ChildPartitionsRecordAction.class);
+ queryChangeStreamAction = mock(QueryChangeStreamAction.class);
+
+ doFn = new ReadChangeStreamPartitionDoFn(daoFactory, mapperFactory, actionFactory, metrics);
+
+ partition =
+ PartitionMetadata.newBuilder()
+ .setPartitionToken(PARTITION_TOKEN)
+ .setParentTokens(Sets.newHashSet("parentToken"))
+ .setStartTimestamp(PARTITION_START_TIMESTAMP)
+ .setEndTimestamp(PARTITION_END_TIMESTAMP)
+ .setHeartbeatMillis(PARTITION_HEARTBEAT_MILLIS)
+ .setState(SCHEDULED)
+ .setWatermark(PARTITION_START_TIMESTAMP)
+ .setScheduledAt(Timestamp.now())
+ .build();
+ restriction = mock(OffsetRange.class);
+ restrictionTracker = mock(RestrictionTracker.class);
+ outputReceiver = mock(OutputReceiver.class);
+ watermarkEstimator = mock(ManualWatermarkEstimator.class);
+ bundleFinalizer = mock(BundleFinalizer.class);
+
+ when(restrictionTracker.currentRestriction()).thenReturn(restriction);
+ when(daoFactory.getPartitionMetadataDao()).thenReturn(partitionMetadataDao);
+ when(daoFactory.getChangeStreamDao()).thenReturn(changeStreamDao);
+ when(mapperFactory.changeStreamRecordMapper()).thenReturn(changeStreamRecordMapper);
+ when(mapperFactory.partitionMetadataMapper()).thenReturn(partitionMetadataMapper);
+
+ when(actionFactory.dataChangeRecordAction()).thenReturn(dataChangeRecordAction);
+ when(actionFactory.heartbeatRecordAction(metrics)).thenReturn(heartbeatRecordAction);
+ when(actionFactory.childPartitionsRecordAction(partitionMetadataDao, metrics))
+ .thenReturn(childPartitionsRecordAction);
+ when(actionFactory.queryChangeStreamAction(
+ changeStreamDao,
+ partitionMetadataDao,
+ changeStreamRecordMapper,
+ partitionMetadataMapper,
+ dataChangeRecordAction,
+ heartbeatRecordAction,
+ childPartitionsRecordAction))
+ .thenReturn(queryChangeStreamAction);
+
+ doFn.setup();
+ }
+
+ @Test
+ public void testQueryChangeStreamMode() {
+ when(queryChangeStreamAction.run(any(), any(), any(), any(), any()))
+ .thenReturn(ProcessContinuation.stop());
+
+ final ProcessContinuation result =
+ doFn.processElement(
+ partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
+
+ assertEquals(ProcessContinuation.stop(), result);
+ verify(queryChangeStreamAction)
+ .run(partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
+
+ verify(dataChangeRecordAction, never()).run(any(), any(), any(), any(), any());
+ verify(heartbeatRecordAction, never()).run(any(), any(), any(), any());
+ verify(childPartitionsRecordAction, never()).run(any(), any(), any(), any());
+ verify(restrictionTracker, never()).tryClaim(any());
+ }
+
+ // --------------------------
+ // Sad Paths
+
+ // Client library errors:
+ // 1. RESOURCE_EXHAUSTED error on client library
+ // 2. DEADLINE_EXCEEDED error on client library
+ // 3. INTERNAL error on client library
+ // 4. UNAVAILABLE error on client library
+ // 5. UNKNOWN error on client library (transaction outcome unknown)
+ // 6. ABORTED error on client library
+ // 7. UNAUTHORIZED error on client library
+
+ // Metadata table
+ // - Table is deleted
+ // - Database is deleted
+ // - No permissions for the metadata table
+ // --------------------------
+
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java
new file mode 100644
index 000000000000..4da1d22943c7
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/restriction/ReadChangeStreamPartitionRangeTrackerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.restriction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.junit.Test;
+
+public class ReadChangeStreamPartitionRangeTrackerTest {
+
+ @Test
+ public void testTryClaim() {
+ final PartitionMetadata partition = mock(PartitionMetadata.class);
+ final OffsetRange range = new OffsetRange(100, 200);
+ final ReadChangeStreamPartitionRangeTracker tracker =
+ new ReadChangeStreamPartitionRangeTracker(partition, range);
+ assertEquals(range, tracker.currentRestriction());
+ assertTrue(tracker.tryClaim(100L));
+ assertTrue(tracker.tryClaim(100L));
+ assertTrue(tracker.tryClaim(150L));
+ assertTrue(tracker.tryClaim(199L));
+ assertFalse(tracker.tryClaim(200L));
+ }
+
+ @Test
+ public void testTrySplitReturnsNullForInitialPartition() {
+ final PartitionMetadata partition = mock(PartitionMetadata.class);
+ final OffsetRange range = new OffsetRange(100, 200);
+ final ReadChangeStreamPartitionRangeTracker tracker =
+ new ReadChangeStreamPartitionRangeTracker(partition, range);
+
+ when(partition.getPartitionToken()).thenReturn(InitialPartition.PARTITION_TOKEN);
+
+ assertNull(tracker.trySplit(0.0D));
+ }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestTransactionAnswer.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestTransactionAnswer.java
new file mode 100644
index 000000000000..52851832040d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/util/TestTransactionAnswer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner.changestreams.util;
+
+import com.google.cloud.Timestamp;
+import java.util.function.Function;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao.InTransactionContext;
+import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao.TransactionResult;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+@SuppressWarnings("rawtypes")
+public class TestTransactionAnswer implements Answer {
+
+ private final InTransactionContext transaction;
+
+ public TestTransactionAnswer(InTransactionContext transaction) {
+ this.transaction = transaction;
+ }
+
+ @Override
+ public TransactionResult answer(InvocationOnMock invocation) {
+ Function callable = invocation.getArgument(0);
+ final Object result = callable.apply(transaction);
+ return new TransactionResult(result, Timestamp.now());
+ }
+}