From 5de5950ebb7558bae700a609d463a831274a602e Mon Sep 17 00:00:00 2001 From: ryosuke-hasebe Date: Tue, 16 May 2023 15:50:56 +0900 Subject: [PATCH 1/2] Support micrometer-tracing --- build.gradle | 1 + micrometer-tracing/build.gradle | 12 + .../MicrometerTracingBraveBridgeTest.java | 93 ++++++++ .../MicrometerTracingOtelBridgeTest.java | 205 ++++++++++++++++++ .../runtime/TracePropagationGuarantee.java | 54 +++++ .../MicrometerProcessorTraceHandle.java | 41 ++++ .../runtime/MicrometerRecordTraceHandle.java | 36 +++ .../runtime/MicrometerTraceHandle.java | 37 ++++ .../runtime/MicrometerTracingProvider.java | 64 ++++++ settings.gradle | 1 + 10 files changed, 544 insertions(+) create mode 100644 micrometer-tracing/build.gradle create mode 100644 micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingBraveBridgeTest.java create mode 100644 micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingOtelBridgeTest.java create mode 100644 micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/TracePropagationGuarantee.java create mode 100644 micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerProcessorTraceHandle.java create mode 100644 micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerRecordTraceHandle.java create mode 100644 micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTraceHandle.java create mode 100644 micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTracingProvider.java diff --git a/build.gradle b/build.gradle index bdf6b9ac..709fdbd6 100644 --- a/build.gradle +++ b/build.gradle @@ -38,6 +38,7 @@ subprojects { protobufVersion = "3.22.3" kafkaVersion = "3.2.3" micrometerVersion = "1.10.6" + micrometerTracingVersion = "1.1.1" lombokVersion = "1.18.26" junitVersion = "4.13.2" hamcrestVersion = "2.2" diff --git a/micrometer-tracing/build.gradle b/micrometer-tracing/build.gradle new file mode 100644 index 00000000..8a485297 --- /dev/null +++ b/micrometer-tracing/build.gradle @@ -0,0 +1,12 @@ +dependencies { + api project(":processor") + + api "org.apache.kafka:kafka-clients:$kafkaVersion" + api "io.micrometer:micrometer-tracing:$micrometerTracingVersion" + + itImplementation project(":testing") + itImplementation "io.micrometer:micrometer-tracing-bridge-brave:$micrometerTracingVersion" + itImplementation "io.micrometer:micrometer-tracing-bridge-otel:$micrometerTracingVersion" + itImplementation "io.zipkin.brave:brave-instrumentation-kafka-clients:5.15.1" + itImplementation "io.opentelemetry.instrumentation:opentelemetry-kafka-clients-2.6:1.26.0-alpha" +} diff --git a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingBraveBridgeTest.java b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingBraveBridgeTest.java new file mode 100644 index 00000000..7c419fd7 --- /dev/null +++ b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingBraveBridgeTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2020 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier; +import com.linecorp.decaton.testing.KafkaClusterRule; +import com.linecorp.decaton.testing.TestUtils; +import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType; +import com.linecorp.decaton.testing.processor.ProcessorTestSuite; + +import brave.Tracing; +import brave.kafka.clients.KafkaTracing; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext; +import io.micrometer.tracing.brave.bridge.BravePropagator; +import io.micrometer.tracing.brave.bridge.BraveTracer; + +public class MicrometerTracingBraveBridgeTest { + + private Tracing braveTracing; + private KafkaTracing braveKafkaTracing; + private Tracer tracer; + private String retryTopic; + + @ClassRule + public static KafkaClusterRule rule = new KafkaClusterRule(); + + @Before + public void setUp() { + braveTracing = Tracing.newBuilder().build(); + braveKafkaTracing = KafkaTracing.create(braveTracing); + tracer = new BraveTracer(braveTracing.tracer(), + new BraveCurrentTraceContext(braveTracing.currentTraceContext())); + retryTopic = rule.admin().createRandomTopic(3, 3); + } + + @After + public void tearDown() { + rule.admin().deleteTopics(true, retryTopic); + } + + @Test(timeout = 30000) + public void testTracePropagation() throws Exception { + // scenario: + // * half of arrived tasks are retried once + // * after retried (i.e. retryCount() > 0), no more retry + final DefaultKafkaProducerSupplier producerSupplier = new DefaultKafkaProducerSupplier(); + ProcessorTestSuite + .builder(rule) + .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { + if (ctx.metadata().retryCount() == 0 && ThreadLocalRandom.current().nextBoolean()) { + ctx.retry(); + } + })) + // micrometer-tracing does not yet have kafka producer support, so use brave directly + .producerSupplier( + bootstrapServers -> braveKafkaTracing.producer(TestUtils.producer(bootstrapServers))) + .retryConfig(RetryConfig.builder() + .retryTopic(retryTopic) + .backoff(Duration.ofMillis(10)) + .producerSupplier(config -> braveKafkaTracing.producer( + producerSupplier.getProducer(config))) + .build()) + .tracingProvider(new MicrometerTracingProvider(tracer, new BravePropagator(braveTracing))) + // If we retry tasks, there's no guarantee about ordering nor serial processing + .excludeSemantics(GuaranteeType.PROCESS_ORDERING, GuaranteeType.SERIAL_PROCESSING) + .customSemantics(new TracePropagationGuarantee(tracer)) + .build() + .run(); + } +} diff --git a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingOtelBridgeTest.java b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingOtelBridgeTest.java new file mode 100644 index 00000000..e9d0aa01 --- /dev/null +++ b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingOtelBridgeTest.java @@ -0,0 +1,205 @@ +/* + * Copyright 2020 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier; +import com.linecorp.decaton.testing.KafkaClusterRule; +import com.linecorp.decaton.testing.TestUtils; +import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType; +import com.linecorp.decaton.testing.processor.ProcessorTestSuite; + +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.otel.bridge.OtelCurrentTraceContext; +import io.micrometer.tracing.otel.bridge.OtelPropagator; +import io.micrometer.tracing.otel.bridge.OtelTracer; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Scope; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; + +public class MicrometerTracingOtelBridgeTest { + + private OpenTelemetry openTelemetry; + private io.opentelemetry.api.trace.Tracer otelTracer; + private KafkaTelemetry otelKafkaTelemetry; + private Tracer tracer; + private String retryTopic; + + @ClassRule + public static KafkaClusterRule rule = new KafkaClusterRule(); + + @Before + public void setUp() { + openTelemetry = OpenTelemetrySdk.builder() + .setPropagators(ContextPropagators.create( + W3CTraceContextPropagator.getInstance())) + .buildAndRegisterGlobal(); + otelTracer = openTelemetry.getTracerProvider().get("io.micrometer.micrometer-tracing"); + otelKafkaTelemetry = KafkaTelemetry.create(openTelemetry); + tracer = new OtelTracer(otelTracer, new OtelCurrentTraceContext(), event -> { + }); + retryTopic = rule.admin().createRandomTopic(3, 3); + } + + @After + public void tearDown() { + rule.admin().deleteTopics(true, retryTopic); + } + + @Test(timeout = 30000) + public void testTracePropagation() throws Exception { + // scenario: + // * half of arrived tasks are retried once + // * after retried (i.e. retryCount() > 0), no more retry + final DefaultKafkaProducerSupplier producerSupplier = new DefaultKafkaProducerSupplier(); + ProcessorTestSuite + .builder(rule) + .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { + if (ctx.metadata().retryCount() == 0 && ThreadLocalRandom.current().nextBoolean()) { + ctx.retry(); + } + })) + // micrometer-tracing does not yet have kafka producer support, so use openTelemetry directly + .producerSupplier(bootstrapServers -> new WithParentSpanProducer<>( + otelKafkaTelemetry.wrap(TestUtils.producer(bootstrapServers)), otelTracer)) + .retryConfig(RetryConfig.builder() + .retryTopic(retryTopic) + .backoff(Duration.ofMillis(10)) + .producerSupplier(config -> new WithParentSpanProducer<>( + otelKafkaTelemetry.wrap(producerSupplier.getProducer(config)), + otelTracer)) + .build()) + .tracingProvider(new MicrometerTracingProvider( + tracer, new OtelPropagator(openTelemetry.getPropagators(), otelTracer))) + // If we retry tasks, there's no guarantee about ordering nor serial processing + .excludeSemantics(GuaranteeType.PROCESS_ORDERING, GuaranteeType.SERIAL_PROCESSING) + .customSemantics(new TracePropagationGuarantee(tracer)) + .build() + .run(); + } + + private static class WithParentSpanProducer implements Producer { + private final Producer delegate; + private final io.opentelemetry.api.trace.Tracer otelTracer; + + WithParentSpanProducer(Producer delegate, io.opentelemetry.api.trace.Tracer otelTracer) { + this.delegate = delegate; + this.otelTracer = otelTracer; + } + + @Override + public Future send(ProducerRecord record) { + return this.send(record, null); + } + + // Since the context of parent is injected in Callback of `producer.send`, create the span manually. + // ref: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/8a15975dcacda48375cae62e98fe7551fb192d1f/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java#L262-L264 + @Override + public Future send(ProducerRecord record, Callback callback) { + final Span span = otelTracer.spanBuilder("test span").startSpan(); + try (final Scope scope = span.makeCurrent()) { + return delegate.send(record, callback); + } finally { + span.end(); + } + } + + @Override + public void initTransactions() { + delegate.initTransactions(); + } + + @Override + public void beginTransaction() throws ProducerFencedException { + delegate.beginTransaction(); + } + + @Deprecated + @Override + public void sendOffsetsToTransaction(Map offsets, + String consumerGroupId) throws ProducerFencedException { + delegate.sendOffsetsToTransaction(offsets, consumerGroupId); + } + + @Override + public void sendOffsetsToTransaction(Map offsets, + ConsumerGroupMetadata groupMetadata) + throws ProducerFencedException { + delegate.sendOffsetsToTransaction(offsets, groupMetadata); + } + + @Override + public void commitTransaction() throws ProducerFencedException { + delegate.commitTransaction(); + } + + @Override + public void abortTransaction() throws ProducerFencedException { + delegate.abortTransaction(); + } + + @Override + public void flush() { + delegate.flush(); + } + + @Override + public List partitionsFor(String topic) { + return delegate.partitionsFor(topic); + } + + @Override + public Map metrics() { + return delegate.metrics(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public void close(Duration timeout) { + delegate.close(); + } + } +} diff --git a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/TracePropagationGuarantee.java b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/TracePropagationGuarantee.java new file mode 100644 index 00000000..6427c03e --- /dev/null +++ b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/TracePropagationGuarantee.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import static org.junit.Assert.assertEquals; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.linecorp.decaton.processor.TaskMetadata; +import com.linecorp.decaton.testing.processor.ProcessedRecord; +import com.linecorp.decaton.testing.processor.ProcessingGuarantee; +import com.linecorp.decaton.testing.processor.ProducedRecord; + +import io.micrometer.tracing.Tracer; + +public class TracePropagationGuarantee implements ProcessingGuarantee { + private final Map producedTraceIds = new ConcurrentHashMap<>(); + private final Map consumedTraceIds = new ConcurrentHashMap<>(); + private final Tracer tracer; + + public TracePropagationGuarantee(Tracer tracer) { + this.tracer = tracer; + } + + @Override + public void onProduce(ProducedRecord record) { + producedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId()); + } + + @Override + public void onProcess(TaskMetadata metadata, ProcessedRecord record) { + consumedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId()); + } + + @Override + public void doAssert() { + assertEquals(producedTraceIds, consumedTraceIds); + } +} diff --git a/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerProcessorTraceHandle.java b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerProcessorTraceHandle.java new file mode 100644 index 00000000..c38c39cc --- /dev/null +++ b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerProcessorTraceHandle.java @@ -0,0 +1,41 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.tracing.TracingProvider.ProcessorTraceHandle; + +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; + +final class MicrometerProcessorTraceHandle extends MicrometerTraceHandle implements ProcessorTraceHandle { + private Tracer.SpanInScope scope; + + MicrometerProcessorTraceHandle(Tracer tracer, Span span) { + super(tracer, span); + } + + @Override + public void processingStart() { + scope = tracer.withSpan(span); + } + + @Override + public void processingReturn() { + span.event("return"); + scope.close(); + } +} diff --git a/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerRecordTraceHandle.java b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerRecordTraceHandle.java new file mode 100644 index 00000000..eb7a77f6 --- /dev/null +++ b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerRecordTraceHandle.java @@ -0,0 +1,36 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.DecatonProcessor; +import com.linecorp.decaton.processor.tracing.TracingProvider; + +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; + +final class MicrometerRecordTraceHandle extends MicrometerTraceHandle + implements TracingProvider.RecordTraceHandle { + MicrometerRecordTraceHandle(Tracer tracer, Span span) { + super(tracer, span); + } + + @Override + public MicrometerProcessorTraceHandle childFor(DecatonProcessor processor) { + final Span childSpan = tracer.nextSpan(span).name(processor.name()).start(); + return new MicrometerProcessorTraceHandle(tracer, childSpan); + } +} diff --git a/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTraceHandle.java b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTraceHandle.java new file mode 100644 index 00000000..455b3903 --- /dev/null +++ b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTraceHandle.java @@ -0,0 +1,37 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.tracing.TracingProvider.TraceHandle; + +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; + +class MicrometerTraceHandle implements TraceHandle { + protected final Tracer tracer; + protected final Span span; + + MicrometerTraceHandle(Tracer tracer, Span span) { + this.tracer = tracer; + this.span = span; + } + + @Override + public void processingCompletion() { + span.end(); + } +} diff --git a/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTracingProvider.java b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTracingProvider.java new file mode 100644 index 00000000..bf4cee44 --- /dev/null +++ b/micrometer-tracing/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerTracingProvider.java @@ -0,0 +1,64 @@ +/* + * Copyright 2023 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +import com.linecorp.decaton.processor.tracing.TracingProvider; + +import io.micrometer.common.lang.Nullable; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.propagation.Propagator; + +public class MicrometerTracingProvider implements TracingProvider { + private final Tracer tracer; + private final Propagator propagator; + + public MicrometerTracingProvider(Tracer tracer, Propagator propagator) { + this.tracer = tracer; + this.propagator = propagator; + } + + @Override + public MicrometerRecordTraceHandle traceFor(ConsumerRecord record, String subscriptionId) { + return new MicrometerRecordTraceHandle( + tracer, + propagator.extract(record.headers(), GETTER) + .name("decaton").tag("subscriptionId", subscriptionId).start() + ); + } + + private static final Propagator.Getter GETTER = new Propagator.Getter() { + @Override + public String get(Headers carrier, String key) { + return lastStringHeader(carrier, key); + } + + @Nullable + private String lastStringHeader(Headers headers, String key) { + final Header header = headers.lastHeader(key); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), UTF_8); + } + }; +} diff --git a/settings.gradle b/settings.gradle index 5b794a37..86b0889b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -10,3 +10,4 @@ include ":docs" include ":testing" include ":benchmark" include ":brave" +include ":micrometer-tracing" From c2e5189c761179ea0f298c5594a37f80da4d8d88 Mon Sep 17 00:00:00 2001 From: ryosuke-hasebe Date: Mon, 22 May 2023 12:54:52 +0900 Subject: [PATCH 2/2] Improve test code --- micrometer-tracing/build.gradle | 1 - .../MicrometerTracingBraveBridgeTest.java | 93 --------------- .../MicrometerTracingOtelBridgeTest.java | 107 ++++++------------ .../runtime/TracePropagationGuarantee.java | 54 --------- .../testing/processor/ProducerAdaptor.java | 2 +- 5 files changed, 37 insertions(+), 220 deletions(-) delete mode 100644 micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingBraveBridgeTest.java delete mode 100644 micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/TracePropagationGuarantee.java diff --git a/micrometer-tracing/build.gradle b/micrometer-tracing/build.gradle index 8a485297..9517fae4 100644 --- a/micrometer-tracing/build.gradle +++ b/micrometer-tracing/build.gradle @@ -7,6 +7,5 @@ dependencies { itImplementation project(":testing") itImplementation "io.micrometer:micrometer-tracing-bridge-brave:$micrometerTracingVersion" itImplementation "io.micrometer:micrometer-tracing-bridge-otel:$micrometerTracingVersion" - itImplementation "io.zipkin.brave:brave-instrumentation-kafka-clients:5.15.1" itImplementation "io.opentelemetry.instrumentation:opentelemetry-kafka-clients-2.6:1.26.0-alpha" } diff --git a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingBraveBridgeTest.java b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingBraveBridgeTest.java deleted file mode 100644 index 7c419fd7..00000000 --- a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingBraveBridgeTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2020 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.decaton.processor.runtime; - -import java.time.Duration; -import java.util.concurrent.ThreadLocalRandom; - -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; - -import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier; -import com.linecorp.decaton.testing.KafkaClusterRule; -import com.linecorp.decaton.testing.TestUtils; -import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType; -import com.linecorp.decaton.testing.processor.ProcessorTestSuite; - -import brave.Tracing; -import brave.kafka.clients.KafkaTracing; -import io.micrometer.tracing.Tracer; -import io.micrometer.tracing.brave.bridge.BraveCurrentTraceContext; -import io.micrometer.tracing.brave.bridge.BravePropagator; -import io.micrometer.tracing.brave.bridge.BraveTracer; - -public class MicrometerTracingBraveBridgeTest { - - private Tracing braveTracing; - private KafkaTracing braveKafkaTracing; - private Tracer tracer; - private String retryTopic; - - @ClassRule - public static KafkaClusterRule rule = new KafkaClusterRule(); - - @Before - public void setUp() { - braveTracing = Tracing.newBuilder().build(); - braveKafkaTracing = KafkaTracing.create(braveTracing); - tracer = new BraveTracer(braveTracing.tracer(), - new BraveCurrentTraceContext(braveTracing.currentTraceContext())); - retryTopic = rule.admin().createRandomTopic(3, 3); - } - - @After - public void tearDown() { - rule.admin().deleteTopics(true, retryTopic); - } - - @Test(timeout = 30000) - public void testTracePropagation() throws Exception { - // scenario: - // * half of arrived tasks are retried once - // * after retried (i.e. retryCount() > 0), no more retry - final DefaultKafkaProducerSupplier producerSupplier = new DefaultKafkaProducerSupplier(); - ProcessorTestSuite - .builder(rule) - .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { - if (ctx.metadata().retryCount() == 0 && ThreadLocalRandom.current().nextBoolean()) { - ctx.retry(); - } - })) - // micrometer-tracing does not yet have kafka producer support, so use brave directly - .producerSupplier( - bootstrapServers -> braveKafkaTracing.producer(TestUtils.producer(bootstrapServers))) - .retryConfig(RetryConfig.builder() - .retryTopic(retryTopic) - .backoff(Duration.ofMillis(10)) - .producerSupplier(config -> braveKafkaTracing.producer( - producerSupplier.getProducer(config))) - .build()) - .tracingProvider(new MicrometerTracingProvider(tracer, new BravePropagator(braveTracing))) - // If we retry tasks, there's no guarantee about ordering nor serial processing - .excludeSemantics(GuaranteeType.PROCESS_ORDERING, GuaranteeType.SERIAL_PROCESSING) - .customSemantics(new TracePropagationGuarantee(tracer)) - .build() - .run(); - } -} diff --git a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingOtelBridgeTest.java b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingOtelBridgeTest.java index e9d0aa01..0a2f0977 100644 --- a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingOtelBridgeTest.java +++ b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerTracingOtelBridgeTest.java @@ -16,33 +16,33 @@ package com.linecorp.decaton.processor.runtime; +import static org.junit.Assert.assertEquals; + import java.time.Duration; -import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ProducerFencedException; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier; +import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.testing.KafkaClusterRule; import com.linecorp.decaton.testing.TestUtils; +import com.linecorp.decaton.testing.processor.ProcessedRecord; +import com.linecorp.decaton.testing.processor.ProcessingGuarantee; import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType; import com.linecorp.decaton.testing.processor.ProcessorTestSuite; +import com.linecorp.decaton.testing.processor.ProducedRecord; +import com.linecorp.decaton.testing.processor.ProducerAdaptor; import io.micrometer.tracing.Tracer; import io.micrometer.tracing.otel.bridge.OtelCurrentTraceContext; @@ -117,18 +117,42 @@ tracer, new OtelPropagator(openTelemetry.getPropagators(), otelTracer))) .run(); } - private static class WithParentSpanProducer implements Producer { - private final Producer delegate; + private static class TracePropagationGuarantee implements ProcessingGuarantee { + private final Map producedTraceIds = new ConcurrentHashMap<>(); + private final Map consumedTraceIds = new ConcurrentHashMap<>(); + private final Tracer tracer; + + TracePropagationGuarantee(Tracer tracer) { + this.tracer = tracer; + } + + @Override + public void onProduce(ProducedRecord record) { + producedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId()); + } + + @Override + public void onProcess(TaskMetadata metadata, ProcessedRecord record) { + consumedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId()); + } + + @Override + public void doAssert() { + assertEquals(producedTraceIds, consumedTraceIds); + } + } + + private static class WithParentSpanProducer extends ProducerAdaptor { private final io.opentelemetry.api.trace.Tracer otelTracer; WithParentSpanProducer(Producer delegate, io.opentelemetry.api.trace.Tracer otelTracer) { - this.delegate = delegate; + super(delegate); this.otelTracer = otelTracer; } @Override public Future send(ProducerRecord record) { - return this.send(record, null); + return send(record, null); } // Since the context of parent is injected in Callback of `producer.send`, create the span manually. @@ -142,64 +166,5 @@ public Future send(ProducerRecord record, Callback callbac span.end(); } } - - @Override - public void initTransactions() { - delegate.initTransactions(); - } - - @Override - public void beginTransaction() throws ProducerFencedException { - delegate.beginTransaction(); - } - - @Deprecated - @Override - public void sendOffsetsToTransaction(Map offsets, - String consumerGroupId) throws ProducerFencedException { - delegate.sendOffsetsToTransaction(offsets, consumerGroupId); - } - - @Override - public void sendOffsetsToTransaction(Map offsets, - ConsumerGroupMetadata groupMetadata) - throws ProducerFencedException { - delegate.sendOffsetsToTransaction(offsets, groupMetadata); - } - - @Override - public void commitTransaction() throws ProducerFencedException { - delegate.commitTransaction(); - } - - @Override - public void abortTransaction() throws ProducerFencedException { - delegate.abortTransaction(); - } - - @Override - public void flush() { - delegate.flush(); - } - - @Override - public List partitionsFor(String topic) { - return delegate.partitionsFor(topic); - } - - @Override - public Map metrics() { - return delegate.metrics(); - } - - @Override - public void close() { - delegate.close(); - } - - @Override - public void close(Duration timeout) { - delegate.close(); - } } } diff --git a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/TracePropagationGuarantee.java b/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/TracePropagationGuarantee.java deleted file mode 100644 index 6427c03e..00000000 --- a/micrometer-tracing/src/it/java/com/linecorp/decaton/processor/runtime/TracePropagationGuarantee.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2020 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.decaton.processor.runtime; - -import static org.junit.Assert.assertEquals; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import com.linecorp.decaton.processor.TaskMetadata; -import com.linecorp.decaton.testing.processor.ProcessedRecord; -import com.linecorp.decaton.testing.processor.ProcessingGuarantee; -import com.linecorp.decaton.testing.processor.ProducedRecord; - -import io.micrometer.tracing.Tracer; - -public class TracePropagationGuarantee implements ProcessingGuarantee { - private final Map producedTraceIds = new ConcurrentHashMap<>(); - private final Map consumedTraceIds = new ConcurrentHashMap<>(); - private final Tracer tracer; - - public TracePropagationGuarantee(Tracer tracer) { - this.tracer = tracer; - } - - @Override - public void onProduce(ProducedRecord record) { - producedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId()); - } - - @Override - public void onProcess(TaskMetadata metadata, ProcessedRecord record) { - consumedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId()); - } - - @Override - public void doAssert() { - assertEquals(producedTraceIds, consumedTraceIds); - } -} diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java index d78cb282..2298e09b 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProducerAdaptor.java @@ -33,7 +33,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; -abstract class ProducerAdaptor implements Producer { +public abstract class ProducerAdaptor implements Producer { protected final Producer delegate; protected ProducerAdaptor(Producer delegate) { this.delegate = delegate;