From 801e09a09e94a085f7c2ccd82d9c389453c99dc1 Mon Sep 17 00:00:00 2001 From: Darshan Siddesh Jagaluru Date: Wed, 15 Dec 2021 13:47:21 +0000 Subject: [PATCH 1/4] Making rpcPriority a ValueProvider in SpannerConfig --- .../sdk/io/gcp/spanner/BatchSpannerRead.java | 35 +++++++++++++++++-- .../sdk/io/gcp/spanner/NaiveSpannerRead.java | 34 ++++++++++++++++-- .../sdk/io/gcp/spanner/SpannerConfig.java | 10 ++++-- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 12 +++++-- 4 files changed, 79 insertions(+), 12 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java index 1f4a01d5f135..34288cba5cbd 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java @@ -20,6 +20,7 @@ import com.google.auto.value.AutoValue; import com.google.cloud.spanner.BatchReadOnlyTransaction; import com.google.cloud.spanner.Options; +import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.Partition; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; @@ -120,10 +121,38 @@ public void processElement(ProcessContext c) throws Exception { } private List execute(ReadOperation op, BatchReadOnlyTransaction tx) { + if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) { + return executeWithPriority(op, tx, config.getRpcPriority().get()); + } else { + return executeWithoutPriority(op, tx); + } + } + + private List executeWithoutPriority(ReadOperation op, BatchReadOnlyTransaction tx) { + // Query was selected. + if (op.getQuery() != null) { + return tx.partitionQuery(op.getPartitionOptions(), op.getQuery()); + } + // Read with index was selected. + if (op.getIndex() != null) { + return tx.partitionReadUsingIndex( + op.getPartitionOptions(), + op.getTable(), + op.getIndex(), + op.getKeySet(), + op.getColumns()); + } + // Read from table was selected. + return tx.partitionRead( + op.getPartitionOptions(), op.getTable(), op.getKeySet(), op.getColumns()); + } + + private List executeWithPriority( + ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority rpcPriority) { // Query was selected. if (op.getQuery() != null) { return tx.partitionQuery( - op.getPartitionOptions(), op.getQuery(), Options.priority(config.getRpcPriority())); + op.getPartitionOptions(), op.getQuery(), Options.priority(rpcPriority)); } // Read with index was selected. if (op.getIndex() != null) { @@ -133,7 +162,7 @@ private List execute(ReadOperation op, BatchReadOnlyTransaction tx) { op.getIndex(), op.getKeySet(), op.getColumns(), - Options.priority(config.getRpcPriority())); + Options.priority(rpcPriority)); } // Read from table was selected. return tx.partitionRead( @@ -141,7 +170,7 @@ private List execute(ReadOperation op, BatchReadOnlyTransaction tx) { op.getTable(), op.getKeySet(), op.getColumns(), - Options.priority(config.getRpcPriority())); + Options.priority(rpcPriority)); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java index fb8306c5a44a..86e1d64328f4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java @@ -20,6 +20,7 @@ import com.google.auto.value.AutoValue; import com.google.cloud.spanner.BatchReadOnlyTransaction; import com.google.cloud.spanner.Options; +import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; @@ -106,9 +107,36 @@ public void processElement(ProcessContext c) throws Exception { } private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) { + if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) { + return executeWithPriority(op, readOnlyTransaction, config.getRpcPriority().get()); + } else { + return executeWithoutPriority(op, readOnlyTransaction); + } + } + + private ResultSet executeWithoutPriority( + ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) { + if (op.getQuery() != null) { + return readOnlyTransaction.executeQuery(op.getQuery()); + } + if (op.getIndex() != null) { + return readOnlyTransaction.readUsingIndex( + op.getTable(), + op.getIndex(), + op.getKeySet(), + op.getColumns()); + } + return readOnlyTransaction.read( + op.getTable(), + op.getKeySet(), + op.getColumns()); + } + + private ResultSet executeWithPriority( + ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction, RpcPriority rpcPriority) { if (op.getQuery() != null) { return readOnlyTransaction.executeQuery( - op.getQuery(), Options.priority(config.getRpcPriority())); + op.getQuery(), Options.priority(rpcPriority)); } if (op.getIndex() != null) { return readOnlyTransaction.readUsingIndex( @@ -116,13 +144,13 @@ private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTra op.getIndex(), op.getKeySet(), op.getColumns(), - Options.priority(config.getRpcPriority())); + Options.priority(rpcPriority)); } return readOnlyTransaction.read( op.getTable(), op.getKeySet(), op.getColumns(), - Options.priority(config.getRpcPriority())); + Options.priority(rpcPriority)); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index 1c4153188d5b..98ec8da8e52c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -60,7 +60,7 @@ public abstract class SpannerConfig implements Serializable { public abstract @Nullable ValueProvider getMaxCumulativeBackoff(); - public abstract RpcPriority getRpcPriority(); + public abstract @Nullable ValueProvider getRpcPriority(); @VisibleForTesting abstract @Nullable ServiceFactory getServiceFactory(); @@ -73,7 +73,7 @@ public static SpannerConfig create() { .setCommitDeadline(ValueProvider.StaticValueProvider.of(DEFAULT_COMMIT_DEADLINE)) .setMaxCumulativeBackoff( ValueProvider.StaticValueProvider.of(DEFAULT_MAX_CUMULATIVE_BACKOFF)) - .setRpcPriority(DEFAULT_RPC_PRIORITY) + .setRpcPriority(ValueProvider.StaticValueProvider.of(DEFAULT_RPC_PRIORITY)) .build(); } @@ -123,7 +123,7 @@ public abstract static class Builder { abstract Builder setServiceFactory(ServiceFactory serviceFactory); - abstract Builder setRpcPriority(RpcPriority rpcPriority); + abstract Builder setRpcPriority(ValueProvider rpcPriority); public abstract SpannerConfig build(); } @@ -182,6 +182,10 @@ SpannerConfig withServiceFactory(ServiceFactory service } public SpannerConfig withRpcPriority(RpcPriority rpcPriority) { + return withRpcPriority(ValueProvider.StaticValueProvider.of(rpcPriority)).build(); + } + + public SpannerConfig withRpcPriority(ValueProvider rpcPriority) { return toBuilder().setRpcPriority(rpcPriority).build(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 466fdb56aa26..96c867f580ef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -1685,9 +1685,15 @@ private void spannerWriteWithRetryIfSchemaChange(Iterable batch) this.spannerConfig.getInstanceId().toString(), "Write"); try { - spannerAccessor - .getDatabaseClient() - .writeAtLeastOnceWithOptions(batch, Options.priority(spannerConfig.getRpcPriority())); + if (spannerConfig.getRpcPriority() != null && + spannerConfig.getRpcPriority().get() != null) { + spannerAccessor + .getDatabaseClient() + .writeAtLeastOnceWithOptions( + batch, Options.priority(spannerConfig.getRpcPriority().get())); + } else { + spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch); + } serviceCallMetric.call("ok"); return; } catch (AbortedException e) { From 911ffe1af7513e6272260fca35649425627b537a Mon Sep 17 00:00:00 2001 From: Darshan Siddesh Jagaluru Date: Sun, 19 Dec 2021 08:24:55 +0000 Subject: [PATCH 2/4] correcting build failure --- .../java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java index 98ec8da8e52c..7fcc0b5676b9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerConfig.java @@ -182,7 +182,7 @@ SpannerConfig withServiceFactory(ServiceFactory service } public SpannerConfig withRpcPriority(RpcPriority rpcPriority) { - return withRpcPriority(ValueProvider.StaticValueProvider.of(rpcPriority)).build(); + return withRpcPriority(ValueProvider.StaticValueProvider.of(rpcPriority)); } public SpannerConfig withRpcPriority(ValueProvider rpcPriority) { From f69241b1f4d06097017a3e4f47dc47ef14fb5ec1 Mon Sep 17 00:00:00 2001 From: Darshan Siddesh Jagaluru Date: Sun, 19 Dec 2021 14:59:34 +0000 Subject: [PATCH 3/4] correcting tests --- .../org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java | 4 ++-- .../apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java index 8e1c8330a7b6..68b62e21da80 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java @@ -179,7 +179,7 @@ public void runQueryWithPriority() throws Exception { ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 6))); PAssert.that(one).containsInAnyOrder(FAKE_ROWS); - assertEquals(RpcPriority.HIGH, read.getSpannerConfig().getRpcPriority()); + assertEquals(RpcPriority.HIGH, read.getSpannerConfig().getRpcPriority().get()); pipeline.run(); } @@ -277,7 +277,7 @@ public void runReadWithPriority() throws Exception { ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6))); PAssert.that(one).containsInAnyOrder(FAKE_ROWS); - assertEquals(RpcPriority.LOW, read.getSpannerConfig().getRpcPriority()); + assertEquals(RpcPriority.LOW, read.getSpannerConfig().getRpcPriority().get()); pipeline.run(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java index ea7ca685166b..db81ee396cc2 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java @@ -358,7 +358,7 @@ public void streamingWritesWithPriority() throws Exception { .withHighPriority(); pipeline.apply(testStream).apply(write); pipeline.run(); - assertEquals(RpcPriority.HIGH, write.getSpannerConfig().getRpcPriority()); + assertEquals(RpcPriority.HIGH, write.getSpannerConfig().getRpcPriority().get()); verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L))); } @@ -406,7 +406,7 @@ public void streamingWritesWithGroupingWithPriority() throws Exception { .withLowPriority(); pipeline.apply(testStream).apply(write); pipeline.run(); - assertEquals(RpcPriority.LOW, write.getSpannerConfig().getRpcPriority()); + assertEquals(RpcPriority.LOW, write.getSpannerConfig().getRpcPriority().get()); // Output should be batches of sorted mutations. verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L))); From 51c3d039e3f68de3a63f57cd0440a69479f68016 Mon Sep 17 00:00:00 2001 From: Darshan Siddesh Jagaluru Date: Sun, 19 Dec 2021 15:04:15 +0000 Subject: [PATCH 4/4] formatting changes --- .../sdk/io/gcp/spanner/NaiveSpannerRead.java | 18 ++++-------------- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 4 ++-- 2 files changed, 6 insertions(+), 16 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java index 86e1d64328f4..e460e20df191 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java @@ -121,22 +121,15 @@ private ResultSet executeWithoutPriority( } if (op.getIndex() != null) { return readOnlyTransaction.readUsingIndex( - op.getTable(), - op.getIndex(), - op.getKeySet(), - op.getColumns()); + op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns()); } - return readOnlyTransaction.read( - op.getTable(), - op.getKeySet(), - op.getColumns()); + return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns()); } private ResultSet executeWithPriority( ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction, RpcPriority rpcPriority) { if (op.getQuery() != null) { - return readOnlyTransaction.executeQuery( - op.getQuery(), Options.priority(rpcPriority)); + return readOnlyTransaction.executeQuery(op.getQuery(), Options.priority(rpcPriority)); } if (op.getIndex() != null) { return readOnlyTransaction.readUsingIndex( @@ -147,10 +140,7 @@ private ResultSet executeWithPriority( Options.priority(rpcPriority)); } return readOnlyTransaction.read( - op.getTable(), - op.getKeySet(), - op.getColumns(), - Options.priority(rpcPriority)); + op.getTable(), op.getKeySet(), op.getColumns(), Options.priority(rpcPriority)); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 96c867f580ef..b3440d51c8eb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -1685,8 +1685,8 @@ private void spannerWriteWithRetryIfSchemaChange(Iterable batch) this.spannerConfig.getInstanceId().toString(), "Write"); try { - if (spannerConfig.getRpcPriority() != null && - spannerConfig.getRpcPriority().get() != null) { + if (spannerConfig.getRpcPriority() != null + && spannerConfig.getRpcPriority().get() != null) { spannerAccessor .getDatabaseClient() .writeAtLeastOnceWithOptions(