Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions google-cloud-firestore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-auth</artifactId>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-api</artifactId>
</dependency>
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-contrib-grpc-util</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>google-cloud-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.firestore.spi.v1beta1.FirestoreRpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.v1beta1.BatchGetDocumentsRequest;
import com.google.firestore.v1beta1.BatchGetDocumentsResponse;
import com.google.firestore.v1beta1.DatabaseRootName;
import com.google.protobuf.ByteString;
import io.grpc.Context;
import io.grpc.Status;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -52,6 +60,10 @@ class FirestoreImpl implements Firestore {
private static final String AUTO_ID_ALPHABET =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";

private static final Tracer tracer = Tracing.getTracer();
private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS =
io.opencensus.trace.Status.ABORTED.withDescription("too many retries");

private final FirestoreRpc firestoreClient;
private final FirestoreOptions firestoreOptions;
private final ResourcePath databasePath;
Expand Down Expand Up @@ -126,11 +138,24 @@ ApiFuture<List<DocumentSnapshot>> getAll(

ApiStreamObserver<BatchGetDocumentsResponse> responseObserver =
new ApiStreamObserver<BatchGetDocumentsResponse>() {
int numResponses;

This comment was marked as spam.

This comment was marked as spam.


@Override
public void onNext(BatchGetDocumentsResponse response) {
DocumentReference documentReference;
DocumentSnapshot documentSnapshot;

numResponses++;
if (numResponses == 1) {
tracer
.getCurrentSpan()
.addAnnotation("Firestore.BatchGet: First response");
} else if (numResponses % 100 == 0) {
tracer
.getCurrentSpan()
.addAnnotation("Firestore.BatchGet: Received 100 responses");
}

switch (response.getResultCase()) {
case FOUND:
documentReference =
Expand Down Expand Up @@ -161,11 +186,13 @@ public void onNext(BatchGetDocumentsResponse response) {

@Override
public void onError(Throwable throwable) {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Error");
futureList.setException(throwable);
}

@Override
public void onCompleted() {
tracer.getCurrentSpan().addAnnotation("Firestore.BatchGet: Complete");
List<DocumentSnapshot> documentSnapshots = new ArrayList<>();

for (DocumentReference documentReference : documentReferences) {
Expand All @@ -187,6 +214,13 @@ public void onCompleted() {
request.addDocuments(docRef.getName());
}

tracer
.getCurrentSpan()
.addAnnotation(
"Firestore.BatchGet: Start",
ImmutableMap.of(
"numDocuments", AttributeValue.longAttributeValue(documentReferences.length)));

streamRequest(request.build(), responseObserver, firestoreClient.batchGetDocumentsCallable());

return futureList;
Expand All @@ -213,11 +247,28 @@ private <T> void runTransaction(
final Transaction.Function<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options) {
// span is intentionally not ended here. It will be ended by runTransactionAttempt on success
// or error.
Span span = tracer.spanBuilder("CloudFirestore.Transaction").startSpan();
try (Scope s = tracer.withSpan(span)) {
runTransactionAttempt(transactionCallback, resultFuture, options, span);
}
}

private <T> void runTransactionAttempt(
final Transaction.Function<T> transactionCallback,
final SettableApiFuture<T> resultFuture,
final TransactionOptions options,
final Span span) {
final Transaction transaction = new Transaction(this, options.getPreviousTransactionId());
final Executor userCallbackExecutor =
options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor();
Context.currentContextExecutor(
options.getExecutor() != null ? options.getExecutor() : firestoreClient.getExecutor());

final int attemptsRemaining = options.getNumberOfAttempts() - 1;
span.addAnnotation(
"Start runTransaction",
ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining)));

ApiFutures.addCallback(
transaction.begin(),
Expand Down Expand Up @@ -253,6 +304,8 @@ public void onFailure(Throwable throwable) {

@Override
public void onSuccess(List<WriteResult> writeResults) {
span.setStatus(io.opencensus.trace.Status.OK);
span.end();
resultFuture.set(userResult);
}
});
Expand All @@ -279,19 +332,26 @@ public void run() {

private void maybeRetry() {
if (attemptsRemaining > 0) {
runTransaction(
span.addAnnotation("retrying");
runTransactionAttempt(
transactionCallback,
resultFuture,
new TransactionOptions(
attemptsRemaining, options.getExecutor(), transaction.getTransactionId()));
attemptsRemaining, options.getExecutor(), transaction.getTransactionId()),
span);
} else {
span.setStatus(TOO_MANY_RETRIES_STATUS);
rejectTransaction(
FirestoreException.serverRejected(
Status.ABORTED, "Transaction was cancelled because of too many retries."));
}
}

private void rejectTransaction(final Throwable throwable) {
if (throwable instanceof ApiException) {
span.setStatus(TraceUtil.statusFromApiException((ApiException) throwable));
}
span.end();
if (transaction.isPending()) {
ApiFutures.addCallback(
transaction.rollback(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.cloud.firestore.DocumentChange.Type;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.v1beta1.Cursor;
import com.google.firestore.v1beta1.Document;
import com.google.firestore.v1beta1.RunQueryRequest;
Expand All @@ -39,6 +40,8 @@
import com.google.firestore.v1beta1.Value;
import com.google.protobuf.ByteString;
import com.google.protobuf.Int32Value;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
Expand Down Expand Up @@ -922,13 +925,31 @@ private void stream(
request.setTransaction(transactionId);
}

Tracing.getTracer().getCurrentSpan()
.addAnnotation(
"Firestore.Query: Start",
ImmutableMap.of(
"transactional", AttributeValue.booleanAttributeValue(transactionId != null)));

ApiStreamObserver<RunQueryResponse> observer =
new ApiStreamObserver<RunQueryResponse>() {
Instant readTime;
boolean firstResponse;
int numDocuments;

@Override
public void onNext(RunQueryResponse response) {
if (!firstResponse) {
firstResponse = true;
Tracing.getTracer().getCurrentSpan()
.addAnnotation("Firestore.Query: First response");
}
if (response.hasDocument()) {
numDocuments++;
if (numDocuments % 100 == 0) {
Tracing.getTracer().getCurrentSpan()
.addAnnotation("Firestore.Query: Received 100 documents");
}
Document document = response.getDocument();
QueryDocumentSnapshot documentSnapshot =
QueryDocumentSnapshot.fromDocument(firestore, response.getReadTime(), document);
Expand All @@ -944,11 +965,18 @@ public void onNext(RunQueryResponse response) {

@Override
public void onError(Throwable throwable) {
Tracing.getTracer().getCurrentSpan().addAnnotation("Firestore.Query: Error");
documentObserver.onError(throwable);
}

@Override
public void onCompleted() {
Tracing.getTracer().getCurrentSpan()
.addAnnotation(
"Firestore.Query: Completed",
ImmutableMap.of(
"numDocuments",
AttributeValue.longAttributeValue(numDocuments)));
documentObserver.onCompleted(readTime);
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.firestore;

import com.google.api.gax.rpc.ApiException;
import io.opencensus.contrib.grpc.util.StatusConverter;
import io.opencensus.trace.Status;

/** Census tracing utilities. */
final class TraceUtil {
private TraceUtil() {}

public static Status statusFromApiException(ApiException exception) {
if (exception.getStatusCode().getTransportCode() instanceof io.grpc.Status) {
io.grpc.Status grpcStatus = (io.grpc.Status) exception.getStatusCode().getTransportCode();
return StatusConverter.fromGrpcStatus(grpcStatus);
}

return Status.UNKNOWN.withDescription(exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.firestore.UserDataConverter.EncodingOptions;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.firestore.v1beta1.CommitRequest;
import com.google.firestore.v1beta1.CommitResponse;
import com.google.firestore.v1beta1.Write;
import com.google.protobuf.ByteString;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -575,6 +578,9 @@ private T performDelete(

/** Commit the current batch. */
ApiFuture<List<WriteResult>> commit(@Nullable ByteString transactionId) {
Tracing.getTracer().getCurrentSpan().addAnnotation(
"CloudFirestore.Commit",
ImmutableMap.of("numDocuments", AttributeValue.longAttributeValue(mutations.size())));

final CommitRequest.Builder request = CommitRequest.newBuilder();
request.setDatabase(firestore.getDatabaseName());
Expand Down