From fabbb15347703194bd79d3cfcc1bb7a3bc943d3f Mon Sep 17 00:00:00 2001 From: Vikas Kedia Date: Fri, 17 Nov 2017 15:58:06 -0800 Subject: [PATCH 01/20] More fixes --- .../cloud/spanner/DatabaseClientImpl.java | 55 +++++++++++++++---- .../com/google/cloud/spanner/SessionPool.java | 34 ++++++++++++ .../com/google/cloud/spanner/SpannerImpl.java | 6 +- 3 files changed, 84 insertions(+), 11 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 3452f4c4d50f..99ff46328e2b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -19,8 +19,24 @@ import com.google.cloud.Timestamp; import com.google.common.util.concurrent.ListenableFuture; -class DatabaseClientImpl implements DatabaseClient { +import io.opencensus.common.Scope; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.samplers.Samplers; + +import java.util.Arrays; +class DatabaseClientImpl implements DatabaseClient { + private static final String READ_WRITE_TRANSACTION = "CloudSpanner.ReadWriteTransaction"; + private static final String READ_ONLY_TRANSACTION = "CloudSpanner.ReadOnlyTransaction"; + private static final Tracer tracer = Tracing.getTracer(); + + static { + Tracing.getExportComponent().getSampledSpanStore().registerSpanNamesForCollection( + Arrays.asList(READ_WRITE_TRANSACTION, READ_ONLY_TRANSACTION)); + } + private final SessionPool pool; DatabaseClientImpl(SessionPool pool) { @@ -29,47 +45,66 @@ class DatabaseClientImpl implements DatabaseClient { @Override public Timestamp write(Iterable mutations) throws SpannerException { - return pool.getReadWriteSession().write(mutations); + try (Scope s = tracer.spanBuilder(READ_WRITE_TRANSACTION).startScopedSpan()) { + return pool.getReadWriteSession().write(mutations); + } } @Override public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - return pool.getReadSession().writeAtLeastOnce(mutations); + try (Scope s = tracer.spanBuilder(READ_WRITE_TRANSACTION).startScopedSpan()) { + return pool.getReadSession().writeAtLeastOnce(mutations); + } } @Override public ReadContext singleUse() { - return pool.getReadSession().singleUse(); + Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)){ + return pool.getReadSession().singleUse(); + } } @Override public ReadContext singleUse(TimestampBound bound) { - return pool.getReadSession().singleUse(bound); + try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + return pool.getReadSession().singleUse(bound); + } } @Override public ReadOnlyTransaction singleUseReadOnlyTransaction() { - return pool.getReadSession().singleUseReadOnlyTransaction(); + try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + return pool.getReadSession().singleUseReadOnlyTransaction(); + } } @Override public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { - return pool.getReadSession().singleUseReadOnlyTransaction(bound); + try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + return pool.getReadSession().singleUseReadOnlyTransaction(bound); + } } @Override public ReadOnlyTransaction readOnlyTransaction() { - return pool.getReadSession().readOnlyTransaction(); + try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + return pool.getReadSession().readOnlyTransaction(); + } } @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { - return pool.getReadSession().readOnlyTransaction(bound); + try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + return pool.getReadSession().readOnlyTransaction(bound); + } } @Override public TransactionRunner readWriteTransaction() { - return pool.getReadWriteSession().readWriteTransaction(); + try (Scope s = tracer.spanBuilder(READ_WRITE_TRANSACTION).startScopedSpan()) { + return pool.getReadWriteSession().readWriteTransaction(); + } } ListenableFuture closeAsync() { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index ebc9a1a895b6..419795b2fa16 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -26,10 +26,17 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.Uninterruptibles; + +import io.opencensus.trace.Annotation; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracing; + import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -731,30 +738,40 @@ private PooledSession findSessionToKeepAlive( * */ Session getReadSession() throws SpannerException { + Span span = Tracing.getTracer().getCurrentSpan(); + span.addAnnotation("Acquiring session"); Waiter waiter = null; PooledSession sess = null; synchronized (lock) { if (closureFuture != null) { + span.addAnnotation("Pool has been closed"); throw new IllegalStateException("Pool has been closed"); } sess = readSessions.poll(); if (sess == null) { sess = writePreparedSessions.poll(); if (sess == null) { + span.addAnnotation("No session available"); maybeCreateSession(); waiter = new Waiter(); readWaiters.add(waiter); + } else { + span.addAnnotation("Acquired read write session"); } + } else { + span.addAnnotation("Acquired read only session"); } } if (waiter != null) { logger.log( Level.FINE, "No session available in the pool. Blocking for one to become available/created"); + span.addAnnotation("Waiting for read only session to be available"); sess = waiter.take(); } sess.markBusy(); incrementNumSessionsInUse(); + span.addAnnotation(sessionAnnotation(sess)); return sess; } @@ -777,6 +794,8 @@ Session getReadSession() throws SpannerException { * */ Session getReadWriteSession() { + Span span = Tracing.getTracer().getCurrentSpan(); + span.addAnnotation("Acquiring read write session"); Waiter waiter = null; PooledSession sess = null; synchronized (lock) { @@ -788,26 +807,38 @@ Session getReadWriteSession() { if (numSessionsBeingPrepared <= readWriteWaiters.size()) { PooledSession readSession = readSessions.poll(); if (readSession != null) { + span.addAnnotation("Acquired read only session. Preparing for read write transaction"); prepareSession(readSession); } else { + span.addAnnotation("No session available"); maybeCreateSession(); } } waiter = new Waiter(); readWriteWaiters.add(waiter); + } else { + span.addAnnotation("Acquired read write session"); } } if (waiter != null) { logger.log( Level.FINE, "No session available in the pool. Blocking for one to become available/created"); + span.addAnnotation("Waiting for read write session to be available"); sess = waiter.take(); } sess.markBusy(); incrementNumSessionsInUse(); + span.addAnnotation(sessionAnnotation(sess)); return sess; } + private Annotation sessionAnnotation(Session session) { + AttributeValue sessionId = AttributeValue.stringAttributeValue(session.getName()); + return Annotation.fromDescriptionAndAttributes("Using Session", + ImmutableMap.of("sessionId", sessionId)); + } + private void incrementNumSessionsInUse() { synchronized (lock) { if (maxSessionsInUse < ++numSessionsInUse) { @@ -817,11 +848,14 @@ private void incrementNumSessionsInUse() { } private void maybeCreateSession() { + Span span = Tracing.getTracer().getCurrentSpan(); synchronized (lock) { if (numWaiters() >= numSessionsBeingCreated) { if (canCreateSession()) { + span.addAnnotation("Creating session"); createSession(); } else if (options.isFailIfPoolExhausted()) { + span.addAnnotation("Pool exhausted. Failing"); // throw specific exception throw newSpannerException( ErrorCode.RESOURCE_EXHAUSTED, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 750205bfc421..bbc905d61787 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -76,6 +76,9 @@ import com.google.spanner.v1.TypeCode; import io.grpc.Context; import io.grpc.ManagedChannel; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; + import java.io.IOException; import java.io.Serializable; import java.util.AbstractList; @@ -114,7 +117,8 @@ class SpannerImpl extends BaseService implements Spanner { private static final Logger logger = Logger.getLogger(SpannerImpl.class.getName()); private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName()); - + private static final Tracer tracer = Tracing.getTracer(); + private final Random random = new Random(); private final SpannerRpc rpc; private final int defaultPrefetchChunks; From 3d40e30d3d32e8995599416dd76b9ebcec25e64f Mon Sep 17 00:00:00 2001 From: Vikas Kedia Date: Wed, 29 Nov 2017 15:54:12 -0800 Subject: [PATCH 02/20] Fixed findbugs version --- google-cloud-core/pom.xml | 1 - google-cloud-spanner/pom.xml | 5 +++++ pom.xml | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/google-cloud-core/pom.xml b/google-cloud-core/pom.xml index 59121724be1e..0348db885f6e 100644 --- a/google-cloud-core/pom.xml +++ b/google-cloud-core/pom.xml @@ -53,7 +53,6 @@ com.google.code.findbugs jsr305 - 3.0.0 org.easymock diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index e49d4266c315..ee08501b6e23 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -182,5 +182,10 @@ testlib test + + io.opencensus + opencensus-api + 0.6.0 + diff --git a/pom.xml b/pom.xml index fc625dcbdf86..ed4e478101b6 100644 --- a/pom.xml +++ b/pom.xml @@ -707,7 +707,7 @@ com.google.code.findbugs jsr305 - 3.0.0 + 3.0.1 com.google.protobuf From b62dee79ba3686cc652aa59c6e8eaf4713fb33cd Mon Sep 17 00:00:00 2001 From: Vikas Kedia Date: Fri, 1 Dec 2017 15:56:09 -0800 Subject: [PATCH 03/20] Add tracing to cloud spanner client --- .../cloud/spanner/DatabaseClientImpl.java | 60 ++- .../com/google/cloud/spanner/SpannerImpl.java | 369 ++++++++++++------ .../com/google/cloud/spanner/TraceUtil.java | 109 ++++++ .../cloud/spanner/spi/v1/GrpcSpannerRpc.java | 34 ++ .../spanner/ResumableStreamIteratorTest.java | 2 +- 5 files changed, 452 insertions(+), 122 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 99ff46328e2b..306e15020e5e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -17,13 +17,13 @@ package com.google.cloud.spanner; import com.google.cloud.Timestamp; + import com.google.common.util.concurrent.ListenableFuture; import io.opencensus.common.Scope; import io.opencensus.trace.Span; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; -import io.opencensus.trace.samplers.Samplers; import java.util.Arrays; @@ -45,65 +45,105 @@ class DatabaseClientImpl implements DatabaseClient { @Override public Timestamp write(Iterable mutations) throws SpannerException { - try (Scope s = tracer.spanBuilder(READ_WRITE_TRANSACTION).startScopedSpan()) { + Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { return pool.getReadWriteSession().write(mutations); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } finally { + span.end(); } } @Override public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - try (Scope s = tracer.spanBuilder(READ_WRITE_TRANSACTION).startScopedSpan()) { + Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { return pool.getReadSession().writeAtLeastOnce(mutations); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } finally { + span.end(); } } @Override public ReadContext singleUse() { Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); - try (Scope s = tracer.withSpan(span)){ + try (Scope s = tracer.withSpan(span)) { return pool.getReadSession().singleUse(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } + } @Override public ReadContext singleUse(TimestampBound bound) { - try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { return pool.getReadSession().singleUse(bound); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } } @Override public ReadOnlyTransaction singleUseReadOnlyTransaction() { - try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { return pool.getReadSession().singleUseReadOnlyTransaction(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } } @Override public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { - try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { return pool.getReadSession().singleUseReadOnlyTransaction(bound); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } } @Override public ReadOnlyTransaction readOnlyTransaction() { - try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { return pool.getReadSession().readOnlyTransaction(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } } @Override public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { - try (Scope s = tracer.spanBuilder(READ_ONLY_TRANSACTION).startScopedSpan()) { + Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { return pool.getReadSession().readOnlyTransaction(bound); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } } @Override public TransactionRunner readWriteTransaction() { - try (Scope s = tracer.spanBuilder(READ_WRITE_TRANSACTION).startScopedSpan()) { + Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { return pool.getReadWriteSession().readWriteTransaction(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 9b1d45ccc85f..eead76d9b268 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -76,6 +76,9 @@ import com.google.spanner.v1.TypeCode; import io.grpc.Context; import io.grpc.ManagedChannel; +import io.opencensus.common.Scope; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Span; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; @@ -83,6 +86,7 @@ import java.io.Serializable; import java.util.AbstractList; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -119,6 +123,18 @@ class SpannerImpl extends BaseService implements Spanner { private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName()); private static final Tracer tracer = Tracing.getTracer(); + private static final String CREATE_SESSION = "CloudSpannerOperation.CreateSession"; + private static final String DELETE_SESSION = "CloudSpannerOperation.DeleteSession"; + private static final String BEGIN_TRANSACTION = "CloudSpannerOperation.BeginTransaction"; + private static final String COMMIT = "CloudSpannerOperation.Commit"; + private static final String QUERY = "CloudSpannerOperation.ExecuteStreamingQuery"; + private static final String READ = "CloudSpannerOperation.ExecuteStreamingRead"; + + static { + Tracing.getExportComponent().getSampledSpanStore().registerSpanNamesForCollection( + Arrays.asList(CREATE_SESSION, DELETE_SESSION, BEGIN_TRANSACTION, COMMIT, QUERY, READ)); + } + private final Random random = new Random(); private final SpannerRpc rpc; private final int defaultPrefetchChunks; @@ -165,6 +181,8 @@ private static long nextBackOffMillis(BackOff backoff) throws SpannerException { } private static void backoffSleep(Context context, long backoffMillis) throws SpannerException { + tracer.getCurrentSpan().addAnnotation("Backing off", + ImmutableMap.of("Delay", AttributeValue.longAttributeValue(backoffMillis))); final CountDownLatch latch = new CountDownLatch(1); final Context.CancellationListener listener = new Context.CancellationListener() { @@ -199,11 +217,17 @@ public void cancelled(Context context) { */ static T runWithRetries(Callable callable) { // Use same backoff setting as abort, somewhat arbitrarily. + Span span = tracer.getCurrentSpan(); ExponentialBackOff backOff = newBackOff(); Context context = Context.current(); + int attempt = 0; while (true) { + attempt++; try { - return callable.call(); + span.addAnnotation("Starting operation", + ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt))); + T result = callable.call(); + return result; } catch (SpannerException e) { if (!e.isRetryable()) { throw e; @@ -220,15 +244,22 @@ static T runWithRetries(Callable callable) { Session createSession(final DatabaseId db) throws SpannerException { final Map options = optionMap(SessionOption.channelHint(random.nextLong())); - com.google.spanner.v1.Session session = - runWithRetries( - new Callable() { - @Override - public com.google.spanner.v1.Session call() throws Exception { - return rpc.createSession(db.getName(), getOptions().getSessionLabels(), options); - } - }); - return new SessionImpl(session.getName(), options); + Span span = tracer.spanBuilder(CREATE_SESSION).startSpan(); + try (Scope s = tracer.withSpan(span)) { + com.google.spanner.v1.Session session = + runWithRetries( + new Callable() { + @Override + public com.google.spanner.v1.Session call() throws Exception { + return rpc.createSession(db.getName(), getOptions().getSessionLabels(), options); + } + }); + span.end(); + return new SessionImpl(session.getName(), options); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } } @Override @@ -731,24 +762,31 @@ public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerEx Mutation.toProto(mutations, mutationsProto); final CommitRequest request = CommitRequest.newBuilder() - .setSession(name) - .addAllMutations(mutationsProto) - .setSingleUseTransaction( - TransactionOptions.newBuilder() - .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) - .build(); - CommitResponse response = - runWithRetries( - new Callable() { - @Override - public CommitResponse call() throws Exception { - return rpc.commit(request, options); - } - }); - try { - return Timestamp.fromProto(response.getCommitTimestamp()); + .setSession(name) + .addAllMutations(mutationsProto) + .setSingleUseTransaction( + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) + .build(); + Span span = tracer.spanBuilder(COMMIT).startSpan(); + try (Scope s = tracer.withSpan(span)) { + CommitResponse response = + runWithRetries( + new Callable() { + @Override + public CommitResponse call() throws Exception { + return rpc.commit(request, options); + } + }); + Timestamp t = Timestamp.fromProto(response.getCommitTimestamp()); + span.end(); + return t; } catch (IllegalArgumentException e) { + TraceUtil.endSpanWithFailure(span, e); throw newSpannerException(ErrorCode.INTERNAL, "Could not parse commit timestamp", e); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } } @@ -795,36 +833,50 @@ public void prepareReadWriteTransaction() { @Override public void close() { - runWithRetries( - new Callable() { - @Override - public Void call() throws Exception { - rpc.deleteSession(name, options); - return null; - } - }); + Span span = tracer.spanBuilder(DELETE_SESSION).startSpan(); + try (Scope s = tracer.withSpan(span)) { + runWithRetries( + new Callable() { + @Override + public Void call() throws Exception { + rpc.deleteSession(name, options); + return null; + } + }); + span.end(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } } ByteString beginTransaction() { - final BeginTransactionRequest request = - BeginTransactionRequest.newBuilder() - .setSession(name) - .setOptions( - TransactionOptions.newBuilder() - .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) - .build(); - Transaction txn = - runWithRetries( - new Callable() { - @Override - public Transaction call() throws Exception { - return rpc.beginTransaction(request, options); - } - }); - if (txn.getId().isEmpty()) { - throw newSpannerException(ErrorCode.INTERNAL, "Missing id in transaction\n" + getName()); + Span span = tracer.spanBuilder(BEGIN_TRANSACTION).startSpan(); + try (Scope s = tracer.withSpan(span)) { + final BeginTransactionRequest request = + BeginTransactionRequest.newBuilder() + .setSession(name) + .setOptions( + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())) + .build(); + Transaction txn = + runWithRetries( + new Callable() { + @Override + public Transaction call() throws Exception { + return rpc.beginTransaction(request, options); + } + }); + if (txn.getId().isEmpty()) { + throw newSpannerException(ErrorCode.INTERNAL, "Missing id in transaction\n" + getName()); + } + span.end(); + return txn.getId(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; } - return txn.getId(); } private T setActive(@Nullable T ctx) { @@ -854,6 +906,7 @@ private abstract static class AbstractReadContext final SessionImpl session; final SpannerRpc rpc; final int defaultPrefetchChunks; + final Span span; @GuardedBy("lock") private boolean isValid = true; @@ -865,9 +918,15 @@ private abstract static class AbstractReadContext private static final int MAX_BUFFERED_CHUNKS = 2048; private AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) { + this(session, rpc, defaultPrefetchChunks, Tracing.getTracer().getCurrentSpan()); + } + + private AbstractReadContext(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks, + Span span) { this.session = session; this.rpc = rpc; this.defaultPrefetchChunks = defaultPrefetchChunks; + this.span = span; } @Override @@ -948,7 +1007,7 @@ private ResultSet executeQueryInternal( final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; ResumableStreamIterator stream = - new ResumableStreamIterator(MAX_BUFFERED_CHUNKS) { + new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, QUERY) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); @@ -1000,6 +1059,7 @@ public final void invalidate() { @Override public void close() { + span.end(); synchronized (lock) { isClosed = true; } @@ -1046,7 +1106,7 @@ private ResultSet readInternal( final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; ResumableStreamIterator stream = - new ResumableStreamIterator(MAX_BUFFERED_CHUNKS) { + new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, READ) { @Override CloseableIterator startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks); @@ -1103,34 +1163,51 @@ void backoffSleep(Context context, long backoffMillis) { private final SessionImpl session; private final Sleeper sleeper; + private final Span span; private TransactionContextImpl txn; private volatile boolean isValid = true; TransactionRunnerImpl( SessionImpl session, SpannerRpc rpc, Sleeper sleeper, int defaultPrefetchChunks) { - ByteString transactionId = session.readyTransactionId; - session.readyTransactionId = null; this.session = session; this.sleeper = sleeper; - this.txn = new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks); + this.span = Tracing.getTracer().getCurrentSpan(); + ByteString transactionId = session.readyTransactionId; + session.readyTransactionId = null; + this.txn = new TransactionContextImpl(session, transactionId, rpc, defaultPrefetchChunks, + span); } TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) { this(session, rpc, new Sleeper(), defaultPrefetchChunks); } - + @Nullable @Override public T run(TransactionCallable callable) { + try { + return runInternal(callable); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } finally { + span.end(); + } + } + + private T runInternal(TransactionCallable callable) { BackOff backoff = newBackOff(); final Context context = Context.current(); + int attempt = 0; while (true) { checkState( isValid, "TransactionRunner has been invalidated by a new operation on the session"); checkContext(context); - + attempt++; // TODO(user): When using streaming reads, consider using the first read to begin // the txn. + span.addAnnotation("Starting Transaction Attempt", + ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt))); txn.ensureTxn(); T result; @@ -1141,15 +1218,23 @@ public T run(TransactionCallable callable) { } catch (Exception e) { txnLogger.log(Level.FINE, "User-provided TransactionCallable raised exception", e); if (txn.isAborted()) { + span.addAnnotation("Transaction Attempt Aborted in user operation. Retrying", + ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt))); shouldRollback = false; backoff(context, backoff); continue; } + SpannerException toThrow; if (e instanceof SpannerException) { - throw (SpannerException) e; + toThrow = (SpannerException) e; } else { - throw newSpannerException(ErrorCode.UNKNOWN, e.getMessage(), e); + toThrow = newSpannerException(ErrorCode.UNKNOWN, e.getMessage(), e); } + span.addAnnotation("Transaction Attempt Failed in user operation", + ImmutableMap.builder() + .putAll(TraceUtil.getExceptionAnnotations(toThrow)) + .put("Attempt", AttributeValue.longAttributeValue(attempt)).build()); + throw toThrow; } finally { if (shouldRollback) { txn.rollback(); @@ -1158,10 +1243,20 @@ public T run(TransactionCallable callable) { try { txn.commit(); + span.addAnnotation("Transaction Attempt Succeeded", + ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt))); return result; } catch (AbortedException e) { txnLogger.log(Level.FINE, "Commit aborted", e); + span.addAnnotation("Transaction Attempt Aborted in Commit. Retrying", + ImmutableMap.of("Attempt", AttributeValue.longAttributeValue(attempt))); backoff(context, backoff); + } catch (SpannerException e) { + span.addAnnotation("Transaction Attempt Failed in Commit", + ImmutableMap.builder() + .putAll(TraceUtil.getExceptionAnnotations(e)) + .put("Attempt", AttributeValue.longAttributeValue(attempt)).build()); + throw e; } } } @@ -1178,7 +1273,9 @@ public void invalidate() { private void backoff(Context context, BackOff backoff) { long delay = txn.getRetryDelayInMillis(backoff); - txn = new TransactionContextImpl(session, null, txn.rpc, txn.defaultPrefetchChunks); + txn = new TransactionContextImpl(session, null, txn.rpc, txn.defaultPrefetchChunks, span); + span.addAnnotation("Backing off", + ImmutableMap.of("Delay", AttributeValue.longAttributeValue(delay))); sleeper.backoffSleep(context, delay); } } @@ -1202,19 +1299,31 @@ static class TransactionContextImpl extends AbstractReadContext implements Trans SessionImpl session, @Nullable ByteString transactionId, SpannerRpc rpc, - int defaultPrefetchChunks) { - super(session, rpc, defaultPrefetchChunks); + int defaultPrefetchChunks, + Span span) { + super(session, rpc, defaultPrefetchChunks, span); this.transactionId = transactionId; } void ensureTxn() { if (transactionId == null) { - transactionId = session.beginTransaction(); - txnLogger.log( - Level.FINER, - "Started transaction {0}", - txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null); + span.addAnnotation("Creating Transaction"); + try { + transactionId = session.beginTransaction(); + span.addAnnotation("Transaction Creation Done", ImmutableMap.of("Id", + AttributeValue.stringAttributeValue(transactionId.toStringUtf8()))); + txnLogger.log( + Level.FINER, + "Started transaction {0}", + txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null); + } catch (SpannerException e) { + span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e)); + throw e; + } } else { + span.addAnnotation("Transaction Initialized", + ImmutableMap.of("Id", AttributeValue.stringAttributeValue( + transactionId.toStringUtf8()))); txnLogger.log( Level.FINER, "Using prepared transaction {0}", @@ -1223,6 +1332,7 @@ void ensureTxn() { } void commit() { + span.addAnnotation("Starting Commit"); CommitRequest.Builder builder = CommitRequest.newBuilder().setSession(session.getName()).setTransactionId(transactionId); synchronized (lock) { @@ -1235,20 +1345,29 @@ void commit() { mutations = null; } final CommitRequest commitRequest = builder.build(); - CommitResponse commitResponse = - runWithRetries( - new Callable() { - @Override - public CommitResponse call() throws Exception { - return rpc.commit(commitRequest, session.options); - } - }); + Span opSpan = tracer.spanBuilder(COMMIT).startSpan(); + try (Scope s = tracer.withSpan(opSpan)) { + CommitResponse commitResponse = + runWithRetries( + new Callable() { + @Override + public CommitResponse call() throws Exception { + return rpc.commit(commitRequest, session.options); + } + }); - if (!commitResponse.hasCommitTimestamp()) { - throw newSpannerException( - ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName()); + if (!commitResponse.hasCommitTimestamp()) { + throw newSpannerException( + ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName()); + } + commitTimestamp = Timestamp.fromProto(commitResponse.getCommitTimestamp()); + opSpan.end(); + } catch (RuntimeException e) { + span.addAnnotation("Commit Failed", TraceUtil.getExceptionAnnotations(e)); + TraceUtil.endSpanWithFailure(opSpan, e); + throw e; } - commitTimestamp = Timestamp.fromProto(commitResponse.getCommitTimestamp()); + span.addAnnotation("Commit Done"); } Timestamp commitTimestamp() { @@ -1281,14 +1400,17 @@ void rollback() { // Note that we're not retrying this request since we don't particularly care about the // response. Normally, the next thing that will happen is that we will make a fresh // transaction attempt, which should implicitly abort this one. + span.addAnnotation("Starting Rollback"); rpc.rollback( RollbackRequest.newBuilder() .setSession(session.getName()) .setTransactionId(transactionId) .build(), session.options); + span.addAnnotation("Rollback Done"); } catch (SpannerException e) { txnLogger.log(Level.FINE, "Exception during rollback", e); + span.addAnnotation("Rollback Failed", TraceUtil.getExceptionAnnotations(e)); } } @@ -1480,36 +1602,44 @@ private void initTransaction() { if (transactionId != null) { return; } - TransactionOptions.Builder options = TransactionOptions.newBuilder(); - bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true); - final BeginTransactionRequest request = - BeginTransactionRequest.newBuilder() - .setSession(session.getName()) - .setOptions(options) - .build(); - Transaction transaction = - runWithRetries( - new Callable() { - @Override - public Transaction call() throws Exception { - return rpc.beginTransaction(request, session.options); - } - }); - if (!transaction.hasReadTimestamp()) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field"); - } - if (transaction.getId().isEmpty()) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.INTERNAL, "Missing expected transaction.id metadata field"); - } + span.addAnnotation("Creating Transaction"); try { - timestamp = Timestamp.fromProto(transaction.getReadTimestamp()); - } catch (IllegalArgumentException e) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e); + TransactionOptions.Builder options = TransactionOptions.newBuilder(); + bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true); + final BeginTransactionRequest request = + BeginTransactionRequest.newBuilder() + .setSession(session.getName()) + .setOptions(options) + .build(); + Transaction transaction = + runWithRetries( + new Callable() { + @Override + public Transaction call() throws Exception { + return rpc.beginTransaction(request, session.options); + } + }); + if (!transaction.hasReadTimestamp()) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field"); + } + if (transaction.getId().isEmpty()) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, "Missing expected transaction.id metadata field"); + } + try { + timestamp = Timestamp.fromProto(transaction.getReadTimestamp()); + } catch (IllegalArgumentException e) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e); + } + transactionId = transaction.getId(); + span.addAnnotation("Transaction Creation Done", + TraceUtil.getTransactionAnnotations(transaction)); + } catch (SpannerException e) { + span.addAnnotation("Transaction Creation Failed", TraceUtil.getExceptionAnnotations(e)); + throw e; } - transactionId = transaction.getId(); } } } @@ -2185,6 +2315,7 @@ abstract static class ResumableStreamIterator extends AbstractIterator buffer = new LinkedList<>(); private final int maxBufferSize; + private final Span span; private CloseableIterator stream; private ByteString resumeToken; private boolean finished; @@ -2195,15 +2326,17 @@ abstract static class ResumableStreamIterator extends AbstractIterator= 0); this.maxBufferSize = maxBufferSize; + this.span = tracer.spanBuilder(streamName).startSpan(); } abstract CloseableIterator startStream(@Nullable ByteString resumeToken); @Override public void close(@Nullable String message) { + span.end(); if (stream != null) { stream.close(message); } @@ -2215,6 +2348,10 @@ protected PartialResultSet computeNext() { while (true) { // Eagerly start stream before consuming any buffered items. if (stream == null) { + span.addAnnotation("Starting/Resuming stream", + ImmutableMap.of("ResumeToken", + AttributeValue.stringAttributeValue( + resumeToken == null ? "null" : resumeToken.toStringUtf8()))); stream = checkNotNull(startStream(resumeToken)); } // Buffer contains items up to a resume token or has reached capacity: flush. @@ -2251,6 +2388,8 @@ protected PartialResultSet computeNext() { } } catch (SpannerException e) { if (safeToRetry && e.isRetryable()) { + span.addAnnotation("Stream broken. Safe to retry", + TraceUtil.getExceptionAnnotations(e)); logger.log(Level.FINE, "Retryable exception, will sleep and retry", e); // Truncate any items in the buffer before the last retry token. while (!buffer.isEmpty() && buffer.getLast().getResumeToken().isEmpty()) { @@ -2258,9 +2397,17 @@ protected PartialResultSet computeNext() { } assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken); stream = null; - backoffSleep(context, backOff); + try (Scope s = tracer.withSpan(span)) { + backoffSleep(context, backOff); + } continue; } + span.addAnnotation("Stream broken. Not safe to retry"); + TraceUtil.endSpanWithFailure(span, e); + throw e; + } catch (RuntimeException e) { + span.addAnnotation("Stream broken. Not safe to retry"); + TraceUtil.endSpanWithFailure(span, e); throw e; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java new file mode 100644 index 000000000000..d6b939471d2a --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java @@ -0,0 +1,109 @@ +/* + * Copyright 2017 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.cloud.Timestamp; +import com.google.common.collect.ImmutableMap; +import com.google.spanner.v1.Transaction; + +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.EndSpanOptions; +import io.opencensus.trace.Span; +import io.opencensus.trace.Status; + +import java.util.Map; + +/** + * Utility methods for tracing. + */ +class TraceUtil { + + static Map getTransactionAnnotations(Transaction t) { + return ImmutableMap.of("Id", + AttributeValue.stringAttributeValue(t.getId().toStringUtf8()), + "Timestamp", + AttributeValue.stringAttributeValue(Timestamp.fromProto(t.getReadTimestamp()).toString())); + } + + static ImmutableMap getExceptionAnnotations(RuntimeException e) { + if (e instanceof SpannerException) { + return ImmutableMap.of("Status", + AttributeValue.stringAttributeValue(((SpannerException) e).getErrorCode().toString())); + } + return ImmutableMap.of(); + } + + static ImmutableMap getExceptionAnnotations(SpannerException e) { + return ImmutableMap.of("Status", + AttributeValue.stringAttributeValue(e.getErrorCode().toString())); + } + + static void endSpanWithFailure(Span span, Exception e) { + if (e instanceof SpannerException) { + endSpanWithFailure(span, (SpannerException) e); + } else { + span.end(EndSpanOptions.builder() + .setStatus(Status.INTERNAL.withDescription(e.getMessage())) + .build()); + } + } + + static void endSpanWithFailure(Span span, SpannerException e) { + span.end(EndSpanOptions.builder() + .setStatus(getOpenCensusStatus(e.getErrorCode()).withDescription(e.getMessage())) + .build()); + } + + private static Status getOpenCensusStatus(ErrorCode code) { + switch (code) { + case ABORTED: + return Status.ABORTED; + case ALREADY_EXISTS: + return Status.ALREADY_EXISTS; + case CANCELLED: + return Status.CANCELLED; + case DATA_LOSS: + return Status.DATA_LOSS; + case DEADLINE_EXCEEDED: + return Status.DEADLINE_EXCEEDED; + case FAILED_PRECONDITION: + return Status.FAILED_PRECONDITION; + case INTERNAL: + return Status.INTERNAL; + case INVALID_ARGUMENT: + return Status.INVALID_ARGUMENT; + case NOT_FOUND: + return Status.NOT_FOUND; + case OUT_OF_RANGE: + return Status.OUT_OF_RANGE; + case PERMISSION_DENIED: + return Status.PERMISSION_DENIED; + case RESOURCE_EXHAUSTED: + return Status.RESOURCE_EXHAUSTED; + case UNAUTHENTICATED: + return Status.UNAUTHENTICATED; + case UNAVAILABLE: + return Status.UNAVAILABLE; + case UNIMPLEMENTED: + return Status.UNIMPLEMENTED; + case UNKNOWN: + default: + return Status.UNKNOWN; + } + } + +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java index a9dbeff5ada0..8be6d9d928d1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java @@ -75,12 +75,16 @@ import io.grpc.ForwardingClientCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.ServiceDescriptor; import io.grpc.Status; import io.grpc.auth.MoreCallCredentials; import io.grpc.stub.AbstractStub; import io.grpc.stub.ClientCallStreamObserver; import io.grpc.stub.ClientCalls; import io.grpc.stub.ClientResponseObserver; +import io.opencensus.trace.Tracing; + +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; @@ -95,6 +99,11 @@ /** Implementation of Cloud Spanner remote calls using gRPC. */ public class GrpcSpannerRpc implements SpannerRpc { + + static { + setupTracingConfig(); + } + private static final Logger logger = Logger.getLogger(GrpcSpannerRpc.class.getName()); private static final Metadata.Key API_CLIENT_KEY = @@ -495,6 +504,31 @@ private T pick(@Nullable Long hint, List elements) { return elements.get((int) index); } + /** + * This is a one time setup for grpcz pages. This adds all of the methods to the Tracing + * environment required to show a consistent set of methods relating to Cloud Bigtable on the + * grpcz page. If HBase artifacts are present, this will add tracing metadata for HBase methods. + */ + private static void setupTracingConfig() { + List descriptors = new ArrayList<>(); + addDescriptor(descriptors, SpannerGrpc.getServiceDescriptor()); + addDescriptor(descriptors, DatabaseAdminGrpc.getServiceDescriptor()); + addDescriptor(descriptors, InstanceAdminGrpc.getServiceDescriptor()); + + Tracing.getExportComponent().getSampledSpanStore().registerSpanNamesForCollection(descriptors); + } + + /** + * Reads a list of {@link MethodDescriptor}s from a {@link ServiceDescriptor} and creates a list + * of Open Census tags. + */ + private static void addDescriptor(List descriptors, ServiceDescriptor serviceDescriptor) { + for (MethodDescriptor method : serviceDescriptor.getMethods()) { + // This is added by a grpc ClientInterceptor + descriptors.add("Sent." + method.getFullMethodName().replace('/', '.')); + } + } + private static class ResultSetStreamObserver implements ClientResponseObserver, StreamingCall { private final ResultStreamConsumer consumer; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java index 46932923630c..a8234ac5ae8e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java @@ -98,7 +98,7 @@ public void setUp() { private void initWithLimit(int maxBufferSize) { iterator = - new SpannerImpl.ResumableStreamIterator(maxBufferSize) { + new SpannerImpl.ResumableStreamIterator(maxBufferSize, "") { @Override SpannerImpl.CloseableIterator startStream( @Nullable ByteString resumeToken) { From 027e3fb5bf71ca2f6adece95c296e08be4c7e0d3 Mon Sep 17 00:00:00 2001 From: Vikas Kedia Date: Fri, 1 Dec 2017 16:12:26 -0800 Subject: [PATCH 04/20] Fix version of findbugs --- google-cloud-core/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-core/pom.xml b/google-cloud-core/pom.xml index 5a67ee331a87..86005c415be3 100644 --- a/google-cloud-core/pom.xml +++ b/google-cloud-core/pom.xml @@ -53,6 +53,7 @@ com.google.code.findbugs jsr305 + 3.0.1 org.easymock From 2804e1bb4b251f58f92ac74ad9e48c12cffae18b Mon Sep 17 00:00:00 2001 From: Vikas Kedia Date: Fri, 15 Dec 2017 16:23:50 -0800 Subject: [PATCH 05/20] Made changes per code review comments --- google-cloud-spanner/pom.xml | 7 +- .../com/google/cloud/spanner/ErrorCode.java | 4 ++ .../com/google/cloud/spanner/TraceUtil.java | 65 ++++--------------- .../cloud/spanner/spi/v1/GrpcSpannerRpc.java | 4 +- 4 files changed, 26 insertions(+), 54 deletions(-) diff --git a/google-cloud-spanner/pom.xml b/google-cloud-spanner/pom.xml index df36ca264f1e..68b9e97bcaf6 100644 --- a/google-cloud-spanner/pom.xml +++ b/google-cloud-spanner/pom.xml @@ -185,7 +185,12 @@ io.opencensus opencensus-api - 0.6.0 + 0.10.0 + + + io.opencensus + opencensus-contrib-grpc-util + 0.10.0 diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java index 6082e33fe538..3679a5357f3b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/ErrorCode.java @@ -65,6 +65,10 @@ int getCode() { return this.code.value(); } + Status getGrpcStatus() { + return this.code.toStatus(); + } + /** * Returns the error code represents by {@code name}, or {@code defaultValue} if {@code name} does * not map to a known code. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java index d6b939471d2a..847ab8eb6697 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TraceUtil.java @@ -20,8 +20,8 @@ import com.google.common.collect.ImmutableMap; import com.google.spanner.v1.Transaction; +import io.opencensus.contrib.grpc.util.StatusConverter; import io.opencensus.trace.AttributeValue; -import io.opencensus.trace.EndSpanOptions; import io.opencensus.trace.Span; import io.opencensus.trace.Status; @@ -31,14 +31,14 @@ * Utility methods for tracing. */ class TraceUtil { - + static Map getTransactionAnnotations(Transaction t) { return ImmutableMap.of("Id", AttributeValue.stringAttributeValue(t.getId().toStringUtf8()), "Timestamp", AttributeValue.stringAttributeValue(Timestamp.fromProto(t.getReadTimestamp()).toString())); } - + static ImmutableMap getExceptionAnnotations(RuntimeException e) { if (e instanceof SpannerException) { return ImmutableMap.of("Status", @@ -46,64 +46,25 @@ static ImmutableMap getExceptionAnnotations(RuntimeExcep } return ImmutableMap.of(); } - + static ImmutableMap getExceptionAnnotations(SpannerException e) { return ImmutableMap.of("Status", AttributeValue.stringAttributeValue(e.getErrorCode().toString())); } - + static void endSpanWithFailure(Span span, Exception e) { if (e instanceof SpannerException) { - endSpanWithFailure(span, (SpannerException) e); + endSpanWithFailure(span, (SpannerException) e); } else { - span.end(EndSpanOptions.builder() - .setStatus(Status.INTERNAL.withDescription(e.getMessage())) - .build()); + span.setStatus(Status.INTERNAL.withDescription(e.getMessage())) + span.end() + } } - + static void endSpanWithFailure(Span span, SpannerException e) { - span.end(EndSpanOptions.builder() - .setStatus(getOpenCensusStatus(e.getErrorCode()).withDescription(e.getMessage())) - .build()); - } - - private static Status getOpenCensusStatus(ErrorCode code) { - switch (code) { - case ABORTED: - return Status.ABORTED; - case ALREADY_EXISTS: - return Status.ALREADY_EXISTS; - case CANCELLED: - return Status.CANCELLED; - case DATA_LOSS: - return Status.DATA_LOSS; - case DEADLINE_EXCEEDED: - return Status.DEADLINE_EXCEEDED; - case FAILED_PRECONDITION: - return Status.FAILED_PRECONDITION; - case INTERNAL: - return Status.INTERNAL; - case INVALID_ARGUMENT: - return Status.INVALID_ARGUMENT; - case NOT_FOUND: - return Status.NOT_FOUND; - case OUT_OF_RANGE: - return Status.OUT_OF_RANGE; - case PERMISSION_DENIED: - return Status.PERMISSION_DENIED; - case RESOURCE_EXHAUSTED: - return Status.RESOURCE_EXHAUSTED; - case UNAUTHENTICATED: - return Status.UNAUTHENTICATED; - case UNAVAILABLE: - return Status.UNAVAILABLE; - case UNIMPLEMENTED: - return Status.UNIMPLEMENTED; - case UNKNOWN: - default: - return Status.UNKNOWN; - } + span.setStatus(StatusConverter.fromGrpcStatus(e.getErrorCode().getGrpcStatus()) + .withDescription(e.getMessage())); + span.end() } - } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java index 8be6d9d928d1..fe079ed45f30 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcSpannerRpc.java @@ -508,6 +508,8 @@ private T pick(@Nullable Long hint, List elements) { * This is a one time setup for grpcz pages. This adds all of the methods to the Tracing * environment required to show a consistent set of methods relating to Cloud Bigtable on the * grpcz page. If HBase artifacts are present, this will add tracing metadata for HBase methods. + * + * TODO: Remove this when we depend on gRPC 1.8 */ private static void setupTracingConfig() { List descriptors = new ArrayList<>(); @@ -528,7 +530,7 @@ private static void addDescriptor(List descriptors, ServiceDescriptor se descriptors.add("Sent." + method.getFullMethodName().replace('/', '.')); } } - + private static class ResultSetStreamObserver implements ClientResponseObserver, StreamingCall { private final ResultStreamConsumer consumer; From 9d06238bed3e97b3424d84cd1c077f468ef53d73 Mon Sep 17 00:00:00 2001 From: Vikas Kedia Date: Fri, 15 Dec 2017 17:19:33 -0800 Subject: [PATCH 06/20] Fixed dependencies --- google-cloud-bom/pom.xml | 13 +++++++++++- google-cloud-core/pom.xml | 1 - google-cloud-spanner/pom.xml | 20 +++++++++++++++++-- .../com/google/cloud/spanner/TraceUtil.java | 6 +++--- 4 files changed, 33 insertions(+), 7 deletions(-) diff --git a/google-cloud-bom/pom.xml b/google-cloud-bom/pom.xml index 897947e9cfaa..4f095308d86a 100644 --- a/google-cloud-bom/pom.xml +++ b/google-cloud-bom/pom.xml @@ -138,6 +138,7 @@ 20.0 1.23.0 3.4.0 + 0.9.0 0.32.1-alpha-SNAPSHOT 0.32.1-beta-SNAPSHOT @@ -872,7 +873,7 @@ com.google.code.findbugs jsr305 - 3.0.0 + 3.0.1 com.google.protobuf @@ -979,6 +980,16 @@ netty-tcnative-boringssl-static ${nettyssl.version} + + io.opencensus + opencensus-api + ${opencensus.version} + + + io.opencensus + opencensus-contrib-grpc-util + ${opencensus.version} +