From f093cafd9fdf96608a5fde36c4f3083c9399e880 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Tue, 18 Dec 2018 12:06:27 -0800 Subject: [PATCH 01/14] Adds OpenCensus context propagation to Publisher and Subscriber. --- .../google-cloud-pubsub/pom.xml | 6 + .../cloud/pubsub/v1/OpenCensusUtil.java | 173 ++++++++++++++++++ .../com/google/cloud/pubsub/v1/Publisher.java | 4 +- .../google/cloud/pubsub/v1/Subscriber.java | 2 +- 4 files changed, 182 insertions(+), 3 deletions(-) create mode 100644 google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java diff --git a/google-cloud-clients/google-cloud-pubsub/pom.xml b/google-cloud-clients/google-cloud-pubsub/pom.xml index 429e53b8df10..afba0650a5f5 100644 --- a/google-cloud-clients/google-cloud-pubsub/pom.xml +++ b/google-cloud-clients/google-cloud-pubsub/pom.xml @@ -16,6 +16,7 @@ google-cloud-pubsub + 0.15.1 @@ -46,6 +47,11 @@ io.grpc grpc-auth + + io.opencensus + opencensus-api + ${opencensus.version} + com.google.auto.value auto-value diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java new file mode 100644 index 000000000000..b23672977ccb --- /dev/null +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -0,0 +1,173 @@ +/* Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.cloud.ServiceOptions; +import com.google.errorprone.annotations.MustBeClosed; +import com.google.pubsub.v1.PubsubMessage; + +import io.opencensus.common.Scope; +import io.opencensus.tags.propagation.TagContextBinarySerializer; +import io.opencensus.tags.propagation.TagContextDeserializationException; +import io.opencensus.tags.propagation.TagContextSerializationException; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import io.opencensus.trace.Link; +import io.opencensus.trace.Span; +import io.opencensus.trace.SpanId; +import io.opencensus.trace.SpanContext; +import io.opencensus.trace.TraceId; +import io.opencensus.trace.TraceOptions; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.samplers.Samplers; + +import java.util.Base64; +import java.util.logging.Level; +import java.util.logging.Logger; + +final class OpenCensusUtil { + private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName()); + + public static final String OPEN_CENSUS_TAG_CONTEXT = "OpenCensusTagContext"; + public static final String OPEN_CENSUS_TRACE_CONTEXT = "OpenCensusTraceContext"; + + private static final Tagger tagger = Tags.getTagger(); + private static final TagContextBinarySerializer serializer = + Tags.getTagPropagationComponent().getBinarySerializer(); + + private static final TraceOptions SAMPLED = TraceOptions.builder().setIsSampled(true).build(); + private static final Tracer tracer = Tracing.getTracer(); + + // Used in Publisher. + static PubsubMessage putOpenCensusAttributes(PubsubMessage message) { + return PubsubMessage.newBuilder(message) + .putAttributes( + OpenCensusUtil.OPEN_CENSUS_TRACE_CONTEXT, + OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext())) + .putAttributes( + OpenCensusUtil.OPEN_CENSUS_TAG_CONTEXT, + OpenCensusUtil.encodeTagContext(tagger.getCurrentTagContext())) + .build(); + } + + // Used in Subscriber. + static MessageReceiver createOpenCensusMessageReceiver(MessageReceiver receiver) { + return new OpenCensusMessageReceiver(receiver); + } + + private static String encodeSpanContext(SpanContext ctxt) { + TraceId traceId = ctxt.getTraceId(); + SpanId spanId = ctxt.getSpanId(); + TraceOptions traceOpts = ctxt.getTraceOptions(); + return "traceid=" + traceId.toLowerBase16() + + "&spanid=" + spanId.toLowerBase16() + + "&traceopt=" + (traceOpts.isSampled() ? "t&" : "f&"); + } + + private static String encodeTagContext(TagContext tags) { + try { + byte[] encodedTags = serializer.toByteArray(tags); + return Base64.getEncoder().encodeToString(encodedTags); + } + catch (TagContextSerializationException exn) { + logger.log(Level.INFO, "OpenCensus: Tag Context Serialization Exception: " + exn); + return ""; + } + } + + private static Scope createScopedTagContext(String encodedTags) { + try { + TagContext tags = serializer.fromByteArray(Base64.getDecoder().decode(encodedTags)); + return tagger.withTagContext(tags); + } catch (TagContextDeserializationException exn) { + logger.log(Level.INFO, "OpenCensus: Tag Context Deserialization Exception: " + exn); + return tagger.withTagContext(tagger.getCurrentTagContext()); + } + } + + @MustBeClosed + private static Scope createScopedSpan(String name) { + return tracer + .spanBuilderWithExplicitParent(name, tracer.getCurrentSpan()) + .setRecordEvents(true) + // TODO(dpo): set to default. + .setSampler(Samplers.alwaysSample()) + .startScopedSpan(); + } + + private static void addParentLink(String encodedParentSpanContext) { + tracer.getCurrentSpan().addLink(Link.fromSpanContext( + createSpanContext(encodedParentSpanContext), Link.Type.PARENT_LINKED_SPAN)); + } + + private static SpanContext createSpanContext(String encodedSpanContext) { + String traceId = getTraceId(encodedSpanContext); + String spanId = getSpanId(encodedSpanContext); + String traceOpt = getTraceOpt(encodedSpanContext); + return SpanContext.create( + TraceId.fromLowerBase16(traceId), + SpanId.fromLowerBase16(spanId), + traceOpt.equals("t") ? SAMPLED : TraceOptions.DEFAULT); + } + + private static String getTraceId(String encodedSpan) { + return lookupKey("traceid=", encodedSpan); + } + + private static String getSpanId(String encodedSpan) { + return lookupKey("spanid=", encodedSpan); + } + + private static String getTraceOpt(String encodedSpan) { + return lookupKey("traceopt=", encodedSpan); + } + + // encodedSpan = (key=value&)* + private static String lookupKey(String key, String encodedSpan) { + int start = encodedSpan.indexOf(key, 0); + if (start == -1) { + return ""; + } + start += key.length(); + int end = encodedSpan.indexOf("&", start); + if (end == -1) { + return ""; + } + return encodedSpan.substring(start, end); + } + + private static final class OpenCensusMessageReceiver implements MessageReceiver { + private final MessageReceiver receiver; + + private OpenCensusMessageReceiver(MessageReceiver receiver) { + this.receiver = receiver; + } + + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + String encodedTagContext = message.getAttributesOrDefault(OPEN_CENSUS_TAG_CONTEXT, ""); + String encodedSpanContext = message.getAttributesOrDefault(OPEN_CENSUS_TRACE_CONTEXT, ""); + try ( + Scope spanScope = createScopedSpan("receiver"); + Scope statsScope = createScopedTagContext(encodedTagContext)) { + addParentLink(encodedSpanContext); + receiver.receiveMessage(message, consumer); + } + } + } +} diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 557a483073de..d5182a4b5536 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -187,11 +187,11 @@ public String getTopicNameString() { * @param message the message to publish. * @return the message ID wrapped in a future. */ - public ApiFuture publish(PubsubMessage message) { + public ApiFuture publish(PubsubMessage originalMessage) { if (shutdown.get()) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } - + PubsubMessage message = OpenCensusUtil.putOpenCensusAttributes(originalMessage); final int messageSize = message.getSerializedSize(); OutstandingBatch batchToSend = null; SettableApiFuture publishResult = SettableApiFuture.create(); diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 41d60c3a2bc9..21ca7d9062d4 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -123,7 +123,7 @@ public class Subscriber extends AbstractApiService { private ScheduledFuture ackDeadlineUpdater; private Subscriber(Builder builder) { - receiver = builder.receiver; + receiver = OpenCensusUtil.createOpenCensusMessageReceiver(builder.receiver); flowControlSettings = builder.flowControlSettings; subscriptionName = builder.subscriptionName; From 69af4b8bf79364ff0a81ec762bd13a680414f7a5 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Thu, 20 Dec 2018 09:19:32 -0800 Subject: [PATCH 02/14] Updates OpenCensus attribute keys so that they will be propagated by CPS. --- .../google/cloud/pubsub/v1/OpenCensusUtil.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index b23672977ccb..5e94655ce814 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -43,8 +43,8 @@ final class OpenCensusUtil { private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName()); - public static final String OPEN_CENSUS_TAG_CONTEXT = "OpenCensusTagContext"; - public static final String OPEN_CENSUS_TRACE_CONTEXT = "OpenCensusTraceContext"; + public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey"; + public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey"; private static final Tagger tagger = Tags.getTagger(); private static final TagContextBinarySerializer serializer = @@ -54,14 +54,15 @@ final class OpenCensusUtil { private static final Tracer tracer = Tracing.getTracer(); // Used in Publisher. + // TODO(dpo): add configuration support to control adding these attributes. static PubsubMessage putOpenCensusAttributes(PubsubMessage message) { return PubsubMessage.newBuilder(message) .putAttributes( - OpenCensusUtil.OPEN_CENSUS_TRACE_CONTEXT, - OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext())) + TRACE_CONTEXT_KEY, + encodeSpanContext(tracer.getCurrentSpan().getContext())) .putAttributes( - OpenCensusUtil.OPEN_CENSUS_TAG_CONTEXT, - OpenCensusUtil.encodeTagContext(tagger.getCurrentTagContext())) + TAG_CONTEXT_KEY, + encodeTagContext(tagger.getCurrentTagContext())) .build(); } @@ -160,8 +161,8 @@ private OpenCensusMessageReceiver(MessageReceiver receiver) { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { - String encodedTagContext = message.getAttributesOrDefault(OPEN_CENSUS_TAG_CONTEXT, ""); - String encodedSpanContext = message.getAttributesOrDefault(OPEN_CENSUS_TRACE_CONTEXT, ""); + String encodedTagContext = message.getAttributesOrDefault(TAG_CONTEXT_KEY, ""); + String encodedSpanContext = message.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""); try ( Scope spanScope = createScopedSpan("receiver"); Scope statsScope = createScopedTagContext(encodedTagContext)) { From 8b37de834e3f08c1e57b84a8e1cf3162f1ab80dd Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Tue, 5 Feb 2019 14:23:05 -0800 Subject: [PATCH 03/14] Addresses reviewer comments by fixing build files and using only defined annotations. --- .../google-cloud-pubsub/pom.xml | 1 - .../cloud/pubsub/v1/OpenCensusUtil.java | 47 +++++++++++++------ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/pom.xml b/google-cloud-clients/google-cloud-pubsub/pom.xml index afba0650a5f5..4d39b879d23d 100644 --- a/google-cloud-clients/google-cloud-pubsub/pom.xml +++ b/google-cloud-clients/google-cloud-pubsub/pom.xml @@ -16,7 +16,6 @@ google-cloud-pubsub - 0.15.1 diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index 5e94655ce814..6a4b4270fe6c 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -54,16 +54,21 @@ final class OpenCensusUtil { private static final Tracer tracer = Tracing.getTracer(); // Used in Publisher. - // TODO(dpo): add configuration support to control adding these attributes. + // TODO: consider adding configuration support to control adding these attributes. static PubsubMessage putOpenCensusAttributes(PubsubMessage message) { - return PubsubMessage.newBuilder(message) - .putAttributes( - TRACE_CONTEXT_KEY, - encodeSpanContext(tracer.getCurrentSpan().getContext())) - .putAttributes( - TAG_CONTEXT_KEY, - encodeTagContext(tagger.getCurrentTagContext())) - .build(); + PubsubMessage.Builder builder = PubsubMessage.newBuilder(message); + String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext()); + String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext()); + if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) { + return message; + } + if (!encodedSpanContext.isEmpty()) { + builder.putAttributes(TRACE_CONTEXT_KEY, encodedSpanContext); + } + if (!encodedTagContext.isEmpty()) { + builder.putAttributes(TAG_CONTEXT_KEY, encodedTagContext); + } + return builder.build(); } // Used in Subscriber. @@ -75,6 +80,9 @@ private static String encodeSpanContext(SpanContext ctxt) { TraceId traceId = ctxt.getTraceId(); SpanId spanId = ctxt.getSpanId(); TraceOptions traceOpts = ctxt.getTraceOptions(); + if (traceOpts.isSampled()) { + return ""; + } return "traceid=" + traceId.toLowerBase16() + "&spanid=" + spanId.toLowerBase16() + "&traceopt=" + (traceOpts.isSampled() ? "t&" : "f&"); @@ -106,8 +114,6 @@ private static Scope createScopedSpan(String name) { return tracer .spanBuilderWithExplicitParent(name, tracer.getCurrentSpan()) .setRecordEvents(true) - // TODO(dpo): set to default. - .setSampler(Samplers.alwaysSample()) .startScopedSpan(); } @@ -152,6 +158,7 @@ private static String lookupKey(String key, String encodedSpan) { return encodedSpan.substring(start, end); } + // class private static final class OpenCensusMessageReceiver implements MessageReceiver { private final MessageReceiver receiver; @@ -162,10 +169,22 @@ private OpenCensusMessageReceiver(MessageReceiver receiver) { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { String encodedTagContext = message.getAttributesOrDefault(TAG_CONTEXT_KEY, ""); + if (encodedTagContext.isEmpty()) { + addTraceScope(message, consumer); + return; + } + try (Scope statsScope = createScopedTagContext(encodedTagContext)) { + addTraceScope(message, consumer); + } + } + + private void addTraceScope(PubsubMessage message, AckReplyConsumer consumer) { String encodedSpanContext = message.getAttributesOrDefault(TRACE_CONTEXT_KEY, ""); - try ( - Scope spanScope = createScopedSpan("receiver"); - Scope statsScope = createScopedTagContext(encodedTagContext)) { + if (encodedSpanContext.isEmpty()) { + receiver.receiveMessage(message, consumer); + return; + } + try (Scope spanScope = createScopedSpan("receiver")) { addParentLink(encodedSpanContext); receiver.receiveMessage(message, consumer); } From a95158598a208cc09f0d9ee05af2752d27420cb7 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Mon, 11 Feb 2019 18:13:39 -0800 Subject: [PATCH 04/14] Updates build dependencies and copyright date. --- google-cloud-clients/google-cloud-pubsub/pom.xml | 5 ----- .../java/com/google/cloud/pubsub/v1/OpenCensusUtil.java | 6 +++++- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/pom.xml b/google-cloud-clients/google-cloud-pubsub/pom.xml index 4d39b879d23d..429e53b8df10 100644 --- a/google-cloud-clients/google-cloud-pubsub/pom.xml +++ b/google-cloud-clients/google-cloud-pubsub/pom.xml @@ -46,11 +46,6 @@ io.grpc grpc-auth - - io.opencensus - opencensus-api - ${opencensus.version} - com.google.auto.value auto-value diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index 6a4b4270fe6c..a119477b57b6 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -1,4 +1,4 @@ -/* Copyright 2018 Google Inc. +/* Copyright 2019 Google Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,10 @@ import java.util.logging.Level; import java.util.logging.Logger; +/** + * Utilities for propagating OpenCensus {@link Tags} and {@link Spans} from publishers to + * subscribers. + */ final class OpenCensusUtil { private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName()); From 1e8fbce0626bc0d82ede374603a6d9a439832ff9 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Tue, 12 Feb 2019 09:37:00 -0800 Subject: [PATCH 05/14] Fixes typo. --- .../main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index a119477b57b6..dd9135505acb 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -41,8 +41,8 @@ import java.util.logging.Logger; /** - * Utilities for propagating OpenCensus {@link Tags} and {@link Spans} from publishers to - * subscribers. + * Utilities for propagating OpenCensus {@link TagContext} and {@link SpanContext} from publishers + * to subscribers. */ final class OpenCensusUtil { private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName()); From 290fb81a4df39821bb1a0fed9d58867e30a7dbfe Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Thu, 14 Feb 2019 08:49:24 -0800 Subject: [PATCH 06/14] Removes encoding of OpenCensus tags. Will re-enable once text encoding spec has been finalized (https://github.com/census-instrumentation/opencensus-specs/issues/65). --- .../cloud/pubsub/v1/OpenCensusUtil.java | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index dd9135505acb..1dab7bf8c75d 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -92,25 +92,16 @@ private static String encodeSpanContext(SpanContext ctxt) { + "&traceopt=" + (traceOpts.isSampled() ? "t&" : "f&"); } + // TODO: update this code once the text encoding of tags has been resolved + // (https://github.com/census-instrumentation/opencensus-specs/issues/65). private static String encodeTagContext(TagContext tags) { - try { - byte[] encodedTags = serializer.toByteArray(tags); - return Base64.getEncoder().encodeToString(encodedTags); - } - catch (TagContextSerializationException exn) { - logger.log(Level.INFO, "OpenCensus: Tag Context Serialization Exception: " + exn); - return ""; - } + return ""; } + // TODO: update this code once the text encoding of tags has been resolved + // (https://github.com/census-instrumentation/opencensus-specs/issues/65). private static Scope createScopedTagContext(String encodedTags) { - try { - TagContext tags = serializer.fromByteArray(Base64.getDecoder().decode(encodedTags)); - return tagger.withTagContext(tags); - } catch (TagContextDeserializationException exn) { - logger.log(Level.INFO, "OpenCensus: Tag Context Deserialization Exception: " + exn); - return tagger.withTagContext(tagger.getCurrentTagContext()); - } + return tagger.withTagContext(tagger.getCurrentTagContext()); } @MustBeClosed From 065c723873bc1fabd0926fb1193ff8ad157e7e63 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Fri, 15 Feb 2019 15:47:18 -0800 Subject: [PATCH 07/14] Updates encoding of SpanContext to use W3C specified encoding; Also preserves sampling decision from the publisher in the subscriber. --- .../cloud/pubsub/v1/OpenCensusUtil.java | 81 ++++++++----------- 1 file changed, 35 insertions(+), 46 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index 1dab7bf8c75d..d4927388b606 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -26,6 +26,10 @@ import io.opencensus.tags.TagContext; import io.opencensus.tags.Tagger; import io.opencensus.tags.Tags; +import io.opencensus.trace.propagation.SpanContextParseException; +import io.opencensus.trace.propagation.TextFormat; +import io.opencensus.trace.propagation.TextFormat.Getter; +import io.opencensus.trace.propagation.TextFormat.Setter; import io.opencensus.trace.Link; import io.opencensus.trace.Span; import io.opencensus.trace.SpanId; @@ -49,6 +53,7 @@ final class OpenCensusUtil { public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey"; public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey"; + private static final String TRACEPARENT_KEY = "traceparent"; private static final Tagger tagger = Tags.getTagger(); private static final TagContextBinarySerializer serializer = @@ -56,6 +61,8 @@ final class OpenCensusUtil { private static final TraceOptions SAMPLED = TraceOptions.builder().setIsSampled(true).build(); private static final Tracer tracer = Tracing.getTracer(); + private static final TextFormat traceContextTextFormat = + Tracing.getPropagationComponent().getTraceContextFormat(); // Used in Publisher. // TODO: consider adding configuration support to control adding these attributes. @@ -80,16 +87,26 @@ static MessageReceiver createOpenCensusMessageReceiver(MessageReceiver receiver) return new OpenCensusMessageReceiver(receiver); } + private static final Setter setter = new Setter() { + @Override + public void put(StringBuilder carrier, String key, String value) { + if (key.equals(TRACEPARENT_KEY)) { + carrier.append(value); + } + } + }; + + private static final Getter getter = new Getter() { + @Override + public String get(String carrier, String key) { + return key.equals(TRACEPARENT_KEY) ? carrier : null; + } + }; + private static String encodeSpanContext(SpanContext ctxt) { - TraceId traceId = ctxt.getTraceId(); - SpanId spanId = ctxt.getSpanId(); - TraceOptions traceOpts = ctxt.getTraceOptions(); - if (traceOpts.isSampled()) { - return ""; - } - return "traceid=" + traceId.toLowerBase16() - + "&spanid=" + spanId.toLowerBase16() - + "&traceopt=" + (traceOpts.isSampled() ? "t&" : "f&"); + StringBuilder builder = new StringBuilder(); + traceContextTextFormat.inject(ctxt, builder, setter); + return builder.toString(); } // TODO: update this code once the text encoding of tags has been resolved @@ -109,48 +126,20 @@ private static Scope createScopedSpan(String name) { return tracer .spanBuilderWithExplicitParent(name, tracer.getCurrentSpan()) .setRecordEvents(true) + // Note: we preserve the sampling decision from the publisher. + .setSampler(Samplers.alwaysSample()) .startScopedSpan(); } private static void addParentLink(String encodedParentSpanContext) { - tracer.getCurrentSpan().addLink(Link.fromSpanContext( - createSpanContext(encodedParentSpanContext), Link.Type.PARENT_LINKED_SPAN)); - } - - private static SpanContext createSpanContext(String encodedSpanContext) { - String traceId = getTraceId(encodedSpanContext); - String spanId = getSpanId(encodedSpanContext); - String traceOpt = getTraceOpt(encodedSpanContext); - return SpanContext.create( - TraceId.fromLowerBase16(traceId), - SpanId.fromLowerBase16(spanId), - traceOpt.equals("t") ? SAMPLED : TraceOptions.DEFAULT); - } - - private static String getTraceId(String encodedSpan) { - return lookupKey("traceid=", encodedSpan); - } - - private static String getSpanId(String encodedSpan) { - return lookupKey("spanid=", encodedSpan); - } - - private static String getTraceOpt(String encodedSpan) { - return lookupKey("traceopt=", encodedSpan); - } - - // encodedSpan = (key=value&)* - private static String lookupKey(String key, String encodedSpan) { - int start = encodedSpan.indexOf(key, 0); - if (start == -1) { - return ""; - } - start += key.length(); - int end = encodedSpan.indexOf("&", start); - if (end == -1) { - return ""; + try { + SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter); + tracer.getCurrentSpan().addLink(Link.fromSpanContext( + ctxt, + Link.Type.PARENT_LINKED_SPAN)); + } catch (SpanContextParseException exn) { + logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn); } - return encodedSpan.substring(start, end); } // class From 9169f317dcd6ccb459fed9f96d864c18518f09c3 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Mon, 25 Feb 2019 11:31:37 -0800 Subject: [PATCH 08/14] Adds unit test for OpenCensusUtil. --- .../cloud/pubsub/v1/OpenCensusUtilTest.java | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java new file mode 100644 index 000000000000..27719ec36497 --- /dev/null +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java @@ -0,0 +1,149 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static com.google.cloud.pubsub.v1.OpenCensusUtil.MESSAGE_RECEIVER_SPAN_NAME; +import static com.google.cloud.pubsub.v1.OpenCensusUtil.TAG_CONTEXT_KEY; +import static com.google.cloud.pubsub.v1.OpenCensusUtil.TRACE_CONTEXT_KEY; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; + +import io.opencensus.common.Scope; +import io.opencensus.tags.TagContext; +import io.opencensus.tags.TagKey; +import io.opencensus.tags.TagValue; +import io.opencensus.tags.Tagger; +import io.opencensus.tags.Tags; +import io.opencensus.trace.Link; +import io.opencensus.trace.SpanContext; +import io.opencensus.trace.Tracer; +import io.opencensus.trace.Tracing; +import io.opencensus.trace.export.RunningSpanStore; +import io.opencensus.trace.export.RunningSpanStore.Filter; +import io.opencensus.trace.export.SpanData; + +import java.util.Collection; +import java.util.List; + +import org.junit.Test; +import org.junit.rules.TestName; + +/** Tests for {@link OpenCensusUtil}. */ +public class OpenCensusUtilTest { + private static final Tagger tagger = Tags.getTagger(); + private static final Tracer tracer = Tracing.getTracer(); + private static final TagKey TEST_TAG_KEY = TagKey.create("TEST_TAG_KEY"); + private static final TagValue TEST_TAG_VAL = TagValue.create("TEST_TAG_VAL"); + private static final String TEST_PARENT_LINK_NAME = "TEST_PARENT_LINK"; + + // Verifies that trace contexts propagated as an attribute are set as the parent link in the + // message receiver and that the tag context is not change (for now). + @Test + public void testOpenCensusMessageReceiver() throws Exception { + PubsubMessage message; + SpanContext publisherContext; + try ( + Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME); + Scope tagScope = createScopeTags()) { + message = OpenCensusUtil.putOpenCensusAttributes(generatePubsubMessage(500)); + publisherContext = tracer.getCurrentSpan().getContext(); + } + MessageReceiver receiver = + OpenCensusUtil.createOpenCensusMessageReceiver( + new TestMessageReceiver(publisherContext, tagger.getCurrentTagContext())); + receiver.receiveMessage(message, new NoOpAckReplyConsumer()); + } + + // Verifies that the current span context is added as an attribute and that (for now) the tag + // context is not added as an attribute. + @Test + public void testPutOpenCensusAttributes() { + try ( + Scope traceScope = OpenCensusUtil.createScopedSpan("PublisherTestRoot"); + Scope tagScope = createScopeTags()) { + PubsubMessage originalMessage = generatePubsubMessage(500); + assertEquals("", originalMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, "")); + assertEquals("", originalMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, "")); + + PubsubMessage attributedMessage = OpenCensusUtil.putOpenCensusAttributes(originalMessage); + String encodedSpanContext = + OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext()); + assertNotEquals("", encodedSpanContext); + assertEquals( + encodedSpanContext, + attributedMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, "")); + assertEquals("", attributedMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, "")); + } + } + + private static PubsubMessage generatePubsubMessage(int size) { + byte[] bytes = new byte[size]; + for (int i = 0; i < size; i++) { + bytes[i] = (byte) (120 + i % 20); + } + return PubsubMessage.newBuilder().setData(ByteString.copyFrom(bytes)).build(); + } + + private static Scope createScopeTags() { + return tagger.currentBuilder().put(TEST_TAG_KEY, TEST_TAG_VAL).buildScoped(); + } + + private static final class NoOpAckReplyConsumer implements AckReplyConsumer { + @Override + public void ack() { + } + + @Override + public void nack() { + } + } + + private static final class TestMessageReceiver implements MessageReceiver { + private static final RunningSpanStore runningSpanStore = + Tracing.getExportComponent().getRunningSpanStore(); + private static final Filter RECEIVER_FILTER = Filter.create(MESSAGE_RECEIVER_SPAN_NAME, 0); + + SpanContext parentLinkedSpan; + TagContext originalTagContext; + + private TestMessageReceiver(SpanContext parentLinkedSpan, TagContext originalTagContext) { + this.parentLinkedSpan = parentLinkedSpan; + this.originalTagContext = originalTagContext; + } + + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + assertEquals(originalTagContext, tagger.getCurrentTagContext()); + Collection spanDatas = runningSpanStore.getRunningSpans(RECEIVER_FILTER); + assertEquals(spanDatas.size(), 1); + for (SpanData spanData : spanDatas) { + List links = spanData.getLinks().getLinks(); + assertEquals(links.size(), 1); + Link link = links.get(0); + assertEquals(Link.Type.PARENT_LINKED_SPAN, link.getType()); + assertEquals(parentLinkedSpan.getTraceId(), link.getTraceId()); + assertEquals(parentLinkedSpan.getSpanId(), link.getSpanId()); + } + consumer.ack(); + } + } +} From 21b2abf93ff4881b0add36aaff9af69e17d985d4 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Mon, 25 Feb 2019 11:31:54 -0800 Subject: [PATCH 09/14] Adds unit test for OpenCensusUtil. --- google-cloud-clients/google-cloud-pubsub/pom.xml | 6 ++++++ .../com/google/cloud/pubsub/v1/OpenCensusUtil.java | 13 +++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/pom.xml b/google-cloud-clients/google-cloud-pubsub/pom.xml index 429e53b8df10..8d5866e005b3 100644 --- a/google-cloud-clients/google-cloud-pubsub/pom.xml +++ b/google-cloud-clients/google-cloud-pubsub/pom.xml @@ -78,6 +78,12 @@ grpc-google-iam-v1 test + + io.opencensus + opencensus-impl + ${opencensus.version} + runtime + com.google.api diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index d4927388b606..278d0c9cf21b 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub.v1; import com.google.cloud.ServiceOptions; +import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.MustBeClosed; import com.google.pubsub.v1.PubsubMessage; @@ -53,6 +54,7 @@ final class OpenCensusUtil { public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey"; public static final String TRACE_CONTEXT_KEY = "googclient_OpenCensusTraceContextKey"; + @VisibleForTesting static final String MESSAGE_RECEIVER_SPAN_NAME = "OpenCensusMessageReceiver"; private static final String TRACEPARENT_KEY = "traceparent"; private static final Tagger tagger = Tags.getTagger(); @@ -103,7 +105,8 @@ public String get(String carrier, String key) { } }; - private static String encodeSpanContext(SpanContext ctxt) { + @VisibleForTesting + static String encodeSpanContext(SpanContext ctxt) { StringBuilder builder = new StringBuilder(); traceContextTextFormat.inject(ctxt, builder, setter); return builder.toString(); @@ -121,8 +124,9 @@ private static Scope createScopedTagContext(String encodedTags) { return tagger.withTagContext(tagger.getCurrentTagContext()); } + @VisibleForTesting @MustBeClosed - private static Scope createScopedSpan(String name) { + static Scope createScopedSpan(String name) { return tracer .spanBuilderWithExplicitParent(name, tracer.getCurrentSpan()) .setRecordEvents(true) @@ -142,7 +146,8 @@ private static void addParentLink(String encodedParentSpanContext) { } } - // class + // Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts + // and puts them in scope. private static final class OpenCensusMessageReceiver implements MessageReceiver { private final MessageReceiver receiver; @@ -168,7 +173,7 @@ private void addTraceScope(PubsubMessage message, AckReplyConsumer consumer) { receiver.receiveMessage(message, consumer); return; } - try (Scope spanScope = createScopedSpan("receiver")) { + try (Scope spanScope = createScopedSpan(MESSAGE_RECEIVER_SPAN_NAME)) { addParentLink(encodedSpanContext); receiver.receiveMessage(message, consumer); } From 4dc84259dd0fa9187c9327ed312102e5b5ad9dc3 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Mon, 4 Mar 2019 15:45:57 -0800 Subject: [PATCH 10/14] Updates OpenCensus integration to use a generic MessageTransform. --- .../cloud/pubsub/v1/OpenCensusUtil.java | 58 ++++++++++--------- .../com/google/cloud/pubsub/v1/Publisher.java | 26 ++++++++- .../google/cloud/pubsub/v1/Subscriber.java | 2 +- .../cloud/pubsub/v1/OpenCensusUtilTest.java | 9 +-- 4 files changed, 61 insertions(+), 34 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index 278d0c9cf21b..a5044aa06779 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -15,6 +15,7 @@ package com.google.cloud.pubsub.v1; +import com.google.api.core.ApiFunction; import com.google.cloud.ServiceOptions; import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.MustBeClosed; @@ -49,7 +50,7 @@ * Utilities for propagating OpenCensus {@link TagContext} and {@link SpanContext} from publishers * to subscribers. */ -final class OpenCensusUtil { +public class OpenCensusUtil { private static final Logger logger = Logger.getLogger(OpenCensusUtil.class.getName()); public static final String TAG_CONTEXT_KEY = "googclient_OpenCensusTagContextKey"; @@ -66,28 +67,29 @@ final class OpenCensusUtil { private static final TextFormat traceContextTextFormat = Tracing.getPropagationComponent().getTraceContextFormat(); - // Used in Publisher. - // TODO: consider adding configuration support to control adding these attributes. - static PubsubMessage putOpenCensusAttributes(PubsubMessage message) { - PubsubMessage.Builder builder = PubsubMessage.newBuilder(message); - String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext()); - String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext()); - if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) { - return message; - } - if (!encodedSpanContext.isEmpty()) { - builder.putAttributes(TRACE_CONTEXT_KEY, encodedSpanContext); - } - if (!encodedTagContext.isEmpty()) { - builder.putAttributes(TAG_CONTEXT_KEY, encodedTagContext); - } - return builder.build(); - } - - // Used in Subscriber. - static MessageReceiver createOpenCensusMessageReceiver(MessageReceiver receiver) { - return new OpenCensusMessageReceiver(receiver); - } + /** + * Propagates active OpenCensus trace and tag contexts from the Publisher by adding them as + * attributes to the {@link PubsubMessage}. + */ + public static final ApiFunction OPEN_CENSUS_MESSAGE_TRANSFORM = + new ApiFunction() { + @Override + public PubsubMessage apply(PubsubMessage message) { + PubsubMessage.Builder builder = PubsubMessage.newBuilder(message); + String encodedSpanContext = encodeSpanContext(tracer.getCurrentSpan().getContext()); + String encodedTagContext = encodeTagContext(tagger.getCurrentTagContext()); + if (encodedSpanContext.isEmpty() && encodedTagContext.isEmpty()) { + return message; + } + if (!encodedSpanContext.isEmpty()) { + builder.putAttributes(TRACE_CONTEXT_KEY, encodedSpanContext); + } + if (!encodedTagContext.isEmpty()) { + builder.putAttributes(TAG_CONTEXT_KEY, encodedTagContext); + } + return builder.build(); + } + }; private static final Setter setter = new Setter() { @Override @@ -146,12 +148,14 @@ private static void addParentLink(String encodedParentSpanContext) { } } - // Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts - // and puts them in scope. - private static final class OpenCensusMessageReceiver implements MessageReceiver { + /** + * Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts + * and puts them in scope. + */ + public static class OpenCensusMessageReceiver implements MessageReceiver { private final MessageReceiver receiver; - private OpenCensusMessageReceiver(MessageReceiver receiver) { + public OpenCensusMessageReceiver(MessageReceiver receiver) { this.receiver = receiver; } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index d5182a4b5536..24496daf3cb1 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub.v1; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; @@ -95,6 +96,7 @@ public class Publisher { private final List closeables; private final MessageWaiter messagesWaiter; private ScheduledFuture currentAlarmFuture; + private final ApiFunction messageTransform; /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { @@ -110,6 +112,7 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + this.messageTransform = builder.messageTransform; messagesBatch = new LinkedList<>(); messagesBatchLock = new ReentrantLock(); @@ -187,11 +190,12 @@ public String getTopicNameString() { * @param message the message to publish. * @return the message ID wrapped in a future. */ - public ApiFuture publish(PubsubMessage originalMessage) { + public ApiFuture publish(PubsubMessage message) { if (shutdown.get()) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } - PubsubMessage message = OpenCensusUtil.putOpenCensusAttributes(originalMessage); + + message = messageTransform.apply(message); final int messageSize = message.getSerializedSize(); OutstandingBatch batchToSend = null; SettableApiFuture publishResult = SettableApiFuture.create(); @@ -528,6 +532,14 @@ public static final class Builder { CredentialsProvider credentialsProvider = TopicAdminSettings.defaultCredentialsProviderBuilder().build(); + ApiFunction messageTransform = + new ApiFunction () { + @Override + public PubsubMessage apply(PubsubMessage input) { + return input; + } + }; + private Builder(String topic) { this.topicName = Preconditions.checkNotNull(topic); } @@ -610,6 +622,16 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { return this; } + /** + * Gives the ability to set an {@link ApiFunction} that will transform the {@link PubsubMessage} + * before it is sent + */ + public Builder setTransform(ApiFunction messageTransform) { + this.messageTransform = + Preconditions.checkNotNull(messageTransform, "The messageTransform cannnot be null."); + return this; + } + public Publisher build() throws IOException { return new Publisher(this); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 21ca7d9062d4..41d60c3a2bc9 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -123,7 +123,7 @@ public class Subscriber extends AbstractApiService { private ScheduledFuture ackDeadlineUpdater; private Subscriber(Builder builder) { - receiver = OpenCensusUtil.createOpenCensusMessageReceiver(builder.receiver); + receiver = builder.receiver; flowControlSettings = builder.flowControlSettings; subscriptionName = builder.subscriptionName; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java index 27719ec36497..042a892e9139 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java @@ -17,6 +17,7 @@ package com.google.cloud.pubsub.v1; import static com.google.cloud.pubsub.v1.OpenCensusUtil.MESSAGE_RECEIVER_SPAN_NAME; +import static com.google.cloud.pubsub.v1.OpenCensusUtil.OPEN_CENSUS_MESSAGE_TRANSFORM; import static com.google.cloud.pubsub.v1.OpenCensusUtil.TAG_CONTEXT_KEY; import static com.google.cloud.pubsub.v1.OpenCensusUtil.TRACE_CONTEXT_KEY; @@ -64,11 +65,11 @@ public void testOpenCensusMessageReceiver() throws Exception { try ( Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME); Scope tagScope = createScopeTags()) { - message = OpenCensusUtil.putOpenCensusAttributes(generatePubsubMessage(500)); + message = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(generatePubsubMessage(500)); publisherContext = tracer.getCurrentSpan().getContext(); } MessageReceiver receiver = - OpenCensusUtil.createOpenCensusMessageReceiver( + new OpenCensusUtil.OpenCensusMessageReceiver( new TestMessageReceiver(publisherContext, tagger.getCurrentTagContext())); receiver.receiveMessage(message, new NoOpAckReplyConsumer()); } @@ -76,7 +77,7 @@ public void testOpenCensusMessageReceiver() throws Exception { // Verifies that the current span context is added as an attribute and that (for now) the tag // context is not added as an attribute. @Test - public void testPutOpenCensusAttributes() { + public void testOpenCensusMessageTransformer() { try ( Scope traceScope = OpenCensusUtil.createScopedSpan("PublisherTestRoot"); Scope tagScope = createScopeTags()) { @@ -84,7 +85,7 @@ public void testPutOpenCensusAttributes() { assertEquals("", originalMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, "")); assertEquals("", originalMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, "")); - PubsubMessage attributedMessage = OpenCensusUtil.putOpenCensusAttributes(originalMessage); + PubsubMessage attributedMessage = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(originalMessage); String encodedSpanContext = OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext()); assertNotEquals("", encodedSpanContext); From 2f11262ea297a926b7aa65a9dfdad7ff4c01ab1f Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Mon, 11 Mar 2019 09:50:35 -0700 Subject: [PATCH 11/14] Removes now-unused private constant. --- .../src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index a5044aa06779..70745f024095 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -62,7 +62,6 @@ public class OpenCensusUtil { private static final TagContextBinarySerializer serializer = Tags.getTagPropagationComponent().getBinarySerializer(); - private static final TraceOptions SAMPLED = TraceOptions.builder().setIsSampled(true).build(); private static final Tracer tracer = Tracing.getTracer(); private static final TextFormat traceContextTextFormat = Tracing.getPropagationComponent().getTraceContextFormat(); From 6ea2f706e943249445e3b9e90bfafc8c4cf076a1 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Thu, 14 Mar 2019 09:14:40 -0400 Subject: [PATCH 12/14] Update pom.xml --- google-cloud-clients/google-cloud-pubsub/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-clients/google-cloud-pubsub/pom.xml b/google-cloud-clients/google-cloud-pubsub/pom.xml index 8d5866e005b3..babc15bebc42 100644 --- a/google-cloud-clients/google-cloud-pubsub/pom.xml +++ b/google-cloud-clients/google-cloud-pubsub/pom.xml @@ -82,7 +82,7 @@ io.opencensus opencensus-impl ${opencensus.version} - runtime + test From 67961f3b4941b3390995641540abe4d21713889a Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Tue, 19 Mar 2019 12:30:57 -0400 Subject: [PATCH 13/14] Marking setTransform as BetaApi --- .../src/main/java/com/google/cloud/pubsub/v1/Publisher.java | 1 + 1 file changed, 1 insertion(+) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 24496daf3cb1..df6e5a5ea25c 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -626,6 +626,7 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { * Gives the ability to set an {@link ApiFunction} that will transform the {@link PubsubMessage} * before it is sent */ + @BetaApi public Builder setTransform(ApiFunction messageTransform) { this.messageTransform = Preconditions.checkNotNull(messageTransform, "The messageTransform cannnot be null."); From 8135358e743583844d0990d44a7a4450f0aeae16 Mon Sep 17 00:00:00 2001 From: Dino Oliva Date: Tue, 19 Mar 2019 17:24:39 -0700 Subject: [PATCH 14/14] Fixes for formatting issues. --- .../cloud/pubsub/v1/OpenCensusUtil.java | 56 ++++++++----------- .../com/google/cloud/pubsub/v1/Publisher.java | 2 +- .../cloud/pubsub/v1/OpenCensusUtilTest.java | 23 ++------ 3 files changed, 30 insertions(+), 51 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java index 70745f024095..018c63452640 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/OpenCensusUtil.java @@ -16,33 +16,23 @@ package com.google.cloud.pubsub.v1; import com.google.api.core.ApiFunction; -import com.google.cloud.ServiceOptions; import com.google.common.annotations.VisibleForTesting; import com.google.errorprone.annotations.MustBeClosed; import com.google.pubsub.v1.PubsubMessage; - import io.opencensus.common.Scope; -import io.opencensus.tags.propagation.TagContextBinarySerializer; -import io.opencensus.tags.propagation.TagContextDeserializationException; -import io.opencensus.tags.propagation.TagContextSerializationException; import io.opencensus.tags.TagContext; import io.opencensus.tags.Tagger; import io.opencensus.tags.Tags; -import io.opencensus.trace.propagation.SpanContextParseException; -import io.opencensus.trace.propagation.TextFormat; -import io.opencensus.trace.propagation.TextFormat.Getter; -import io.opencensus.trace.propagation.TextFormat.Setter; +import io.opencensus.tags.propagation.TagContextBinarySerializer; import io.opencensus.trace.Link; -import io.opencensus.trace.Span; -import io.opencensus.trace.SpanId; import io.opencensus.trace.SpanContext; -import io.opencensus.trace.TraceId; -import io.opencensus.trace.TraceOptions; import io.opencensus.trace.Tracer; import io.opencensus.trace.Tracing; +import io.opencensus.trace.propagation.SpanContextParseException; +import io.opencensus.trace.propagation.TextFormat; +import io.opencensus.trace.propagation.TextFormat.Getter; +import io.opencensus.trace.propagation.TextFormat.Setter; import io.opencensus.trace.samplers.Samplers; - -import java.util.Base64; import java.util.logging.Level; import java.util.logging.Logger; @@ -90,21 +80,23 @@ public PubsubMessage apply(PubsubMessage message) { } }; - private static final Setter setter = new Setter() { - @Override - public void put(StringBuilder carrier, String key, String value) { - if (key.equals(TRACEPARENT_KEY)) { - carrier.append(value); + private static final Setter setter = + new Setter() { + @Override + public void put(StringBuilder carrier, String key, String value) { + if (key.equals(TRACEPARENT_KEY)) { + carrier.append(value); + } } - } - }; + }; - private static final Getter getter = new Getter() { - @Override - public String get(String carrier, String key) { - return key.equals(TRACEPARENT_KEY) ? carrier : null; - } - }; + private static final Getter getter = + new Getter() { + @Override + public String get(String carrier, String key) { + return key.equals(TRACEPARENT_KEY) ? carrier : null; + } + }; @VisibleForTesting static String encodeSpanContext(SpanContext ctxt) { @@ -139,17 +131,15 @@ static Scope createScopedSpan(String name) { private static void addParentLink(String encodedParentSpanContext) { try { SpanContext ctxt = traceContextTextFormat.extract(encodedParentSpanContext, getter); - tracer.getCurrentSpan().addLink(Link.fromSpanContext( - ctxt, - Link.Type.PARENT_LINKED_SPAN)); + tracer.getCurrentSpan().addLink(Link.fromSpanContext(ctxt, Link.Type.PARENT_LINKED_SPAN)); } catch (SpanContextParseException exn) { logger.log(Level.INFO, "OpenCensus: Trace Context Deserialization Exception: " + exn); } } /** - * Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts - * and puts them in scope. + * Wrapper class for {@link MessageReceiver} that decodes any received trace and tag contexts and + * puts them in scope. */ public static class OpenCensusMessageReceiver implements MessageReceiver { private final MessageReceiver receiver; diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 24496daf3cb1..ac0074aa48c6 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -533,7 +533,7 @@ public static final class Builder { TopicAdminSettings.defaultCredentialsProviderBuilder().build(); ApiFunction messageTransform = - new ApiFunction () { + new ApiFunction() { @Override public PubsubMessage apply(PubsubMessage input) { return input; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java index 042a892e9139..6f74d5917c76 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/OpenCensusUtilTest.java @@ -20,14 +20,11 @@ import static com.google.cloud.pubsub.v1.OpenCensusUtil.OPEN_CENSUS_MESSAGE_TRANSFORM; import static com.google.cloud.pubsub.v1.OpenCensusUtil.TAG_CONTEXT_KEY; import static com.google.cloud.pubsub.v1.OpenCensusUtil.TRACE_CONTEXT_KEY; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; import com.google.pubsub.v1.PubsubMessage; - import io.opencensus.common.Scope; import io.opencensus.tags.TagContext; import io.opencensus.tags.TagKey; @@ -41,12 +38,9 @@ import io.opencensus.trace.export.RunningSpanStore; import io.opencensus.trace.export.RunningSpanStore.Filter; import io.opencensus.trace.export.SpanData; - import java.util.Collection; import java.util.List; - import org.junit.Test; -import org.junit.rules.TestName; /** Tests for {@link OpenCensusUtil}. */ public class OpenCensusUtilTest { @@ -62,8 +56,7 @@ public class OpenCensusUtilTest { public void testOpenCensusMessageReceiver() throws Exception { PubsubMessage message; SpanContext publisherContext; - try ( - Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME); + try (Scope traceScope = OpenCensusUtil.createScopedSpan(TEST_PARENT_LINK_NAME); Scope tagScope = createScopeTags()) { message = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(generatePubsubMessage(500)); publisherContext = tracer.getCurrentSpan().getContext(); @@ -78,8 +71,7 @@ public void testOpenCensusMessageReceiver() throws Exception { // context is not added as an attribute. @Test public void testOpenCensusMessageTransformer() { - try ( - Scope traceScope = OpenCensusUtil.createScopedSpan("PublisherTestRoot"); + try (Scope traceScope = OpenCensusUtil.createScopedSpan("PublisherTestRoot"); Scope tagScope = createScopeTags()) { PubsubMessage originalMessage = generatePubsubMessage(500); assertEquals("", originalMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, "")); @@ -88,10 +80,9 @@ public void testOpenCensusMessageTransformer() { PubsubMessage attributedMessage = OPEN_CENSUS_MESSAGE_TRANSFORM.apply(originalMessage); String encodedSpanContext = OpenCensusUtil.encodeSpanContext(tracer.getCurrentSpan().getContext()); - assertNotEquals("", encodedSpanContext); + assertNotEquals("", encodedSpanContext); assertEquals( - encodedSpanContext, - attributedMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, "")); + encodedSpanContext, attributedMessage.getAttributesOrDefault(TRACE_CONTEXT_KEY, "")); assertEquals("", attributedMessage.getAttributesOrDefault(TAG_CONTEXT_KEY, "")); } } @@ -110,12 +101,10 @@ private static Scope createScopeTags() { private static final class NoOpAckReplyConsumer implements AckReplyConsumer { @Override - public void ack() { - } + public void ack() {} @Override - public void nack() { - } + public void nack() {} } private static final class TestMessageReceiver implements MessageReceiver {