diff --git a/google-cloud-firestore/pom.xml b/google-cloud-firestore/pom.xml index 7f98b1e816aa..296e011185ee 100644 --- a/google-cloud-firestore/pom.xml +++ b/google-cloud-firestore/pom.xml @@ -63,6 +63,14 @@ io.grpc grpc-auth + + io.opencensus + opencensus-api + + + io.opencensus + opencensus-contrib-grpc-util + ${project.groupId} google-cloud-core diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index 4df571b02cea..f932887f1080 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -20,17 +20,25 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.BidiStreamingCallable; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.firestore.spi.v1beta1.FirestoreRpc; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.firestore.v1beta1.BatchGetDocumentsRequest; import com.google.firestore.v1beta1.BatchGetDocumentsResponse; import com.google.firestore.v1beta1.DatabaseRootName; import com.google.protobuf.ByteString; +import io.grpc.Context; import io.grpc.Status; +import io.opencensus.common.Scope; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Span; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -52,6 +60,10 @@ class FirestoreImpl implements Firestore { private static final String AUTO_ID_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + private static final Tracer tracer = Tracing.getTracer(); + private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS = + io.opencensus.trace.Status.ABORTED.withDescription("too many retries"); + private final FirestoreRpc firestoreClient; private final FirestoreOptions firestoreOptions; private final ResourcePath databasePath; @@ -126,11 +138,24 @@ ApiFuture> getAll( ApiStreamObserver responseObserver = new ApiStreamObserver() { + int numResponses; + @Override public void onNext(BatchGetDocumentsResponse response) { DocumentReference documentReference; DocumentSnapshot documentSnapshot; + numResponses++; + if (numResponses == 1) { + tracer + .getCurrentSpan() + .addAnnotation("Firestore.BatchGet: First response"); + } else if (numResponses % 100 == 0) { + tracer + .getCurrentSpan() + .addAnnotation("Firestore.BatchGet: Received 100 responses"); + } + switch (response.getResultCase()) { case FOUND: documentReference = @@ -161,11 +186,13 @@ public void onNext(BatchGetDocumentsResponse response) { @Override public void onError(Throwable throwable) { + tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Error"); futureList.setException(throwable); } @Override public void onCompleted() { + tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Complete"); List documentSnapshots = new ArrayList<>(); for (DocumentReference documentReference : documentReferences) { @@ -187,6 +214,13 @@ public void onCompleted() { request.addDocuments(docRef.getName()); } + tracer + .getCurrentSpan() + .addAnnotation( + "Firestore.BatchGet: Start", + ImmutableMap.of( + "numDocuments", AttributeValue.longAttributeValue(documentReferences.length))); + streamRequest(request.build(), responseObserver, firestoreClient.batchGetDocumentsCallable()); return futureList; @@ -213,11 +247,28 @@ private void runTransaction( final Transaction.Function transactionCallback, final SettableApiFuture resultFuture, final TransactionOptions options) { + // span is intentionally not ended here. It will be ended by runTransactionAttempt on success + // or error. + Span span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan(); + try (Scope s = tracer.withSpan(span)) { + runTransactionAttempt(transactionCallback, resultFuture, options, span); + } + } + + private void runTransactionAttempt( + final Transaction.Function transactionCallback, + final SettableApiFuture resultFuture, + final TransactionOptions options, + final Span span) { final Transaction transaction = new Transaction(this, options.getPreviousTransactionId()); final Executor userCallbackExecutor = - options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor(); + Context.currentContextExecutor( + options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor()); final int attemptsRemaining = options.getNumberOfAttempts() - 1; + span.addAnnotation( + "Start runTransaction", + ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining))); ApiFutures.addCallback( transaction.begin(), @@ -253,6 +304,8 @@ public void onFailure(Throwable throwable) { @Override public void onSuccess(List writeResults) { + span.setStatus(io.opencensus.trace.Status.OK); + span.end(); resultFuture.set(userResult); } }); @@ -279,12 +332,15 @@ public void run() { private void maybeRetry() { if (attemptsRemaining > 0) { - runTransaction( + span.addAnnotation("retrying"); + runTransactionAttempt( transactionCallback, resultFuture, new TransactionOptions( - attemptsRemaining, options.getExecutor(), transaction.getTransactionId())); + attemptsRemaining, options.getExecutor(), transaction.getTransactionId()), + span); } else { + span.setStatus(TOO_MANY_RETRIES_STATUS); rejectTransaction( FirestoreException.serverRejected( Status.ABORTED, "Transaction was cancelled because of too many retries.")); @@ -292,6 +348,10 @@ private void maybeRetry() { } private void rejectTransaction(final Throwable throwable) { + if (throwable instanceof ApiException) { + span.setStatus(TraceUtil.statusFromApiException((ApiException) throwable)); + } + span.end(); if (transaction.isPending()) { ApiFutures.addCallback( transaction.rollback(), diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java index 8ed74713676f..53cc87bf0064 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java @@ -27,6 +27,7 @@ import com.google.api.gax.rpc.ApiStreamObserver; import com.google.cloud.firestore.DocumentChange.Type; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.firestore.v1beta1.Cursor; import com.google.firestore.v1beta1.Document; import com.google.firestore.v1beta1.RunQueryRequest; @@ -39,6 +40,8 @@ import com.google.firestore.v1beta1.Value; import com.google.protobuf.ByteString; import com.google.protobuf.Int32Value; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Tracing; import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; @@ -922,13 +925,31 @@ private void stream( request.setTransaction(transactionId); } + Tracing.getTracer().getCurrentSpan() + .addAnnotation( + "Firestore.Query: Start", + ImmutableMap.of( + "transactional", AttributeValue.booleanAttributeValue(transactionId != null))); + ApiStreamObserver observer = new ApiStreamObserver() { Instant readTime; + boolean firstResponse; + int numDocuments; @Override public void onNext(RunQueryResponse response) { + if (!firstResponse) { + firstResponse = true; + Tracing.getTracer().getCurrentSpan() + .addAnnotation("Firestore.Query: First response"); + } if (response.hasDocument()) { + numDocuments++; + if (numDocuments % 100 == 0) { + Tracing.getTracer().getCurrentSpan() + .addAnnotation("Firestore.Query: Received 100 documents"); + } Document document = response.getDocument(); QueryDocumentSnapshot documentSnapshot = QueryDocumentSnapshot.fromDocument(firestore, response.getReadTime(), document); @@ -944,11 +965,18 @@ public void onNext(RunQueryResponse response) { @Override public void onError(Throwable throwable) { + Tracing.getTracer().getCurrentSpan().addAnnotation("Firestore.Query: Error"); documentObserver.onError(throwable); } @Override public void onCompleted() { + Tracing.getTracer().getCurrentSpan() + .addAnnotation( + "Firestore.Query: Completed", + ImmutableMap.of( + "numDocuments", + AttributeValue.longAttributeValue(numDocuments))); documentObserver.onCompleted(readTime); } }; diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TraceUtil.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TraceUtil.java new file mode 100644 index 000000000000..1f7c55af4c57 --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TraceUtil.java @@ -0,0 +1,35 @@ +/* + * Copyright 2018 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.gax.rpc.ApiException; +import io.opencensus.contrib.grpc.util.StatusConverter; +import io.opencensus.trace.Status; + +/** Census tracing utilities. */ +final class TraceUtil { + private TraceUtil() {} + + public static Status statusFromApiException(ApiException exception) { + if (exception.getStatusCode().getTransportCode() instanceof io.grpc.Status) { + io.grpc.Status grpcStatus = (io.grpc.Status) exception.getStatusCode().getTransportCode(); + return StatusConverter.fromGrpcStatus(grpcStatus); + } + + return Status.UNKNOWN.withDescription(exception.getMessage()); + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java index b067dc396f3a..4e5746a9e3f4 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java @@ -21,10 +21,13 @@ import com.google.api.core.ApiFutures; import com.google.cloud.firestore.UserDataConverter.EncodingOptions; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.firestore.v1beta1.CommitRequest; import com.google.firestore.v1beta1.CommitResponse; import com.google.firestore.v1beta1.Write; import com.google.protobuf.ByteString; +import io.opencensus.trace.AttributeValue; +import io.opencensus.trace.Tracing; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -575,6 +578,9 @@ private T performDelete( /** Commit the current batch. */ ApiFuture> commit(@Nullable ByteString transactionId) { + Tracing.getTracer().getCurrentSpan().addAnnotation( + "CloudFirestore.Commit", + ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(mutations.size()))); final CommitRequest.Builder request = CommitRequest.newBuilder(); request.setDatabase(firestore.getDatabaseName());