Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.Partition;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -120,10 +121,38 @@ public void processElement(ProcessContext c) throws Exception {
}

private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction tx) {
if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) {
return executeWithPriority(op, tx, config.getRpcPriority().get());
} else {
return executeWithoutPriority(op, tx);
}
}

private List<Partition> executeWithoutPriority(ReadOperation op, BatchReadOnlyTransaction tx) {
// Query was selected.
if (op.getQuery() != null) {
return tx.partitionQuery(op.getPartitionOptions(), op.getQuery());
}
// Read with index was selected.
if (op.getIndex() != null) {
return tx.partitionReadUsingIndex(
op.getPartitionOptions(),
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns());
}
// Read from table was selected.
return tx.partitionRead(
op.getPartitionOptions(), op.getTable(), op.getKeySet(), op.getColumns());
}

private List<Partition> executeWithPriority(
ReadOperation op, BatchReadOnlyTransaction tx, RpcPriority rpcPriority) {
// Query was selected.
if (op.getQuery() != null) {
return tx.partitionQuery(
op.getPartitionOptions(), op.getQuery(), Options.priority(config.getRpcPriority()));
op.getPartitionOptions(), op.getQuery(), Options.priority(rpcPriority));
}
// Read with index was selected.
if (op.getIndex() != null) {
Expand All @@ -133,15 +162,15 @@ private List<Partition> execute(ReadOperation op, BatchReadOnlyTransaction tx) {
op.getIndex(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority()));
Options.priority(rpcPriority));
}
// Read from table was selected.
return tx.partitionRead(
op.getPartitionOptions(),
op.getTable(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority()));
Options.priority(rpcPriority));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
Expand Down Expand Up @@ -106,23 +107,40 @@ public void processElement(ProcessContext c) throws Exception {
}

private ResultSet execute(ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
if (config.getRpcPriority() != null && config.getRpcPriority().get() != null) {
return executeWithPriority(op, readOnlyTransaction, config.getRpcPriority().get());
} else {
return executeWithoutPriority(op, readOnlyTransaction);
}
}

private ResultSet executeWithoutPriority(
ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction) {
if (op.getQuery() != null) {
return readOnlyTransaction.executeQuery(op.getQuery());
}
if (op.getIndex() != null) {
return readOnlyTransaction.readUsingIndex(
op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns());
}
return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns());
}

private ResultSet executeWithPriority(
ReadOperation op, BatchReadOnlyTransaction readOnlyTransaction, RpcPriority rpcPriority) {
if (op.getQuery() != null) {
return readOnlyTransaction.executeQuery(
op.getQuery(), Options.priority(config.getRpcPriority()));
return readOnlyTransaction.executeQuery(op.getQuery(), Options.priority(rpcPriority));
}
if (op.getIndex() != null) {
return readOnlyTransaction.readUsingIndex(
op.getTable(),
op.getIndex(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority()));
Options.priority(rpcPriority));
}
return readOnlyTransaction.read(
op.getTable(),
op.getKeySet(),
op.getColumns(),
Options.priority(config.getRpcPriority()));
op.getTable(), op.getKeySet(), op.getColumns(), Options.priority(rpcPriority));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public abstract class SpannerConfig implements Serializable {

public abstract @Nullable ValueProvider<Duration> getMaxCumulativeBackoff();

public abstract RpcPriority getRpcPriority();
public abstract @Nullable ValueProvider<RpcPriority> getRpcPriority();

@VisibleForTesting
abstract @Nullable ServiceFactory<Spanner, SpannerOptions> getServiceFactory();
Expand All @@ -73,7 +73,7 @@ public static SpannerConfig create() {
.setCommitDeadline(ValueProvider.StaticValueProvider.of(DEFAULT_COMMIT_DEADLINE))
.setMaxCumulativeBackoff(
ValueProvider.StaticValueProvider.of(DEFAULT_MAX_CUMULATIVE_BACKOFF))
.setRpcPriority(DEFAULT_RPC_PRIORITY)
.setRpcPriority(ValueProvider.StaticValueProvider.of(DEFAULT_RPC_PRIORITY))
.build();
}

Expand Down Expand Up @@ -123,7 +123,7 @@ public abstract static class Builder {

abstract Builder setServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory);

abstract Builder setRpcPriority(RpcPriority rpcPriority);
abstract Builder setRpcPriority(ValueProvider<RpcPriority> rpcPriority);

public abstract SpannerConfig build();
}
Expand Down Expand Up @@ -182,6 +182,10 @@ SpannerConfig withServiceFactory(ServiceFactory<Spanner, SpannerOptions> service
}

public SpannerConfig withRpcPriority(RpcPriority rpcPriority) {
return withRpcPriority(ValueProvider.StaticValueProvider.of(rpcPriority));
}

public SpannerConfig withRpcPriority(ValueProvider<RpcPriority> rpcPriority) {
return toBuilder().setRpcPriority(rpcPriority).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1685,9 +1685,15 @@ private void spannerWriteWithRetryIfSchemaChange(Iterable<Mutation> batch)
this.spannerConfig.getInstanceId().toString(),
"Write");
try {
spannerAccessor
.getDatabaseClient()
.writeAtLeastOnceWithOptions(batch, Options.priority(spannerConfig.getRpcPriority()));
if (spannerConfig.getRpcPriority() != null
&& spannerConfig.getRpcPriority().get() != null) {
spannerAccessor
.getDatabaseClient()
.writeAtLeastOnceWithOptions(
batch, Options.priority(spannerConfig.getRpcPriority().get()));
} else {
spannerAccessor.getDatabaseClient().writeAtLeastOnce(batch);
}
serviceCallMetric.call("ok");
return;
} catch (AbortedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void runQueryWithPriority() throws Exception {
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 6)));

PAssert.that(one).containsInAnyOrder(FAKE_ROWS);
assertEquals(RpcPriority.HIGH, read.getSpannerConfig().getRpcPriority());
assertEquals(RpcPriority.HIGH, read.getSpannerConfig().getRpcPriority().get());
pipeline.run();
}

Expand Down Expand Up @@ -277,7 +277,7 @@ public void runReadWithPriority() throws Exception {
ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(4, 6)));

PAssert.that(one).containsInAnyOrder(FAKE_ROWS);
assertEquals(RpcPriority.LOW, read.getSpannerConfig().getRpcPriority());
assertEquals(RpcPriority.LOW, read.getSpannerConfig().getRpcPriority().get());
pipeline.run();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ public void streamingWritesWithPriority() throws Exception {
.withHighPriority();
pipeline.apply(testStream).apply(write);
pipeline.run();
assertEquals(RpcPriority.HIGH, write.getSpannerConfig().getRpcPriority());
assertEquals(RpcPriority.HIGH, write.getSpannerConfig().getRpcPriority().get());
verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L)));
}

Expand Down Expand Up @@ -406,7 +406,7 @@ public void streamingWritesWithGroupingWithPriority() throws Exception {
.withLowPriority();
pipeline.apply(testStream).apply(write);
pipeline.run();
assertEquals(RpcPriority.LOW, write.getSpannerConfig().getRpcPriority());
assertEquals(RpcPriority.LOW, write.getSpannerConfig().getRpcPriority().get());

// Output should be batches of sorted mutations.
verifyBatches(batch(m(1L), m(2L)), batch(m(3L), m(4L)), batch(m(5L), m(6L)));
Expand Down