Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ subprojects {
protobufVersion = "3.22.3"
kafkaVersion = "3.2.3"
micrometerVersion = "1.10.6"
micrometerTracingVersion = "1.1.1"
lombokVersion = "1.18.26"
junitVersion = "4.13.2"
hamcrestVersion = "2.2"
Expand Down
11 changes: 11 additions & 0 deletions micrometer-tracing/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
dependencies {
api project(":processor")

api "org.apache.kafka:kafka-clients:$kafkaVersion"
api "io.micrometer:micrometer-tracing:$micrometerTracingVersion"

itImplementation project(":testing")
itImplementation "io.micrometer:micrometer-tracing-bridge-brave:$micrometerTracingVersion"
itImplementation "io.micrometer:micrometer-tracing-bridge-otel:$micrometerTracingVersion"
itImplementation "io.opentelemetry.instrumentation:opentelemetry-kafka-clients-2.6:1.26.0-alpha"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright 2020 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import static org.junit.Assert.assertEquals;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.testing.KafkaClusterRule;
import com.linecorp.decaton.testing.TestUtils;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType;
import com.linecorp.decaton.testing.processor.ProcessorTestSuite;
import com.linecorp.decaton.testing.processor.ProducedRecord;
import com.linecorp.decaton.testing.processor.ProducerAdaptor;

import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.otel.bridge.OtelCurrentTraceContext;
import io.micrometer.tracing.otel.bridge.OtelPropagator;
import io.micrometer.tracing.otel.bridge.OtelTracer;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Scope;
import io.opentelemetry.context.propagation.ContextPropagators;
import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry;
import io.opentelemetry.sdk.OpenTelemetrySdk;

public class MicrometerTracingOtelBridgeTest {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way do we need to add separate integration tests for each tracer impls?

I think testing with just one tracer impl (to assert the facade usage) is sufficient from Decaton's point of view, because Decaton's responsibility is just using micrometer's tracing facade correctly, so I don't think checking if traces are actually propagated in each tracer is meaningful.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you say, from decaton's point of view, it may be enough to test with one tracer.
(It is the responsibility of micrometer-tracing to verify operation with multiple tracers.)

So, shall I delete MicrometerTracingBraveBridgeTest ?
Because it is similar to the decaton-brave module.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall I delete MicrometerTracingBraveBridgeTest

Sounds good.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed: c2e5189


private OpenTelemetry openTelemetry;
private io.opentelemetry.api.trace.Tracer otelTracer;
private KafkaTelemetry otelKafkaTelemetry;
private Tracer tracer;
private String retryTopic;

@ClassRule
public static KafkaClusterRule rule = new KafkaClusterRule();

@Before
public void setUp() {
openTelemetry = OpenTelemetrySdk.builder()
.setPropagators(ContextPropagators.create(
W3CTraceContextPropagator.getInstance()))
.buildAndRegisterGlobal();
otelTracer = openTelemetry.getTracerProvider().get("io.micrometer.micrometer-tracing");
otelKafkaTelemetry = KafkaTelemetry.create(openTelemetry);
tracer = new OtelTracer(otelTracer, new OtelCurrentTraceContext(), event -> {
});
retryTopic = rule.admin().createRandomTopic(3, 3);
}

@After
public void tearDown() {
rule.admin().deleteTopics(true, retryTopic);
}

@Test(timeout = 30000)
public void testTracePropagation() throws Exception {
// scenario:
// * half of arrived tasks are retried once
// * after retried (i.e. retryCount() > 0), no more retry
final DefaultKafkaProducerSupplier producerSupplier = new DefaultKafkaProducerSupplier();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
if (ctx.metadata().retryCount() == 0 && ThreadLocalRandom.current().nextBoolean()) {
ctx.retry();
}
}))
// micrometer-tracing does not yet have kafka producer support, so use openTelemetry directly
.producerSupplier(bootstrapServers -> new WithParentSpanProducer<>(
otelKafkaTelemetry.wrap(TestUtils.producer(bootstrapServers)), otelTracer))
.retryConfig(RetryConfig.builder()
.retryTopic(retryTopic)
.backoff(Duration.ofMillis(10))
.producerSupplier(config -> new WithParentSpanProducer<>(
otelKafkaTelemetry.wrap(producerSupplier.getProducer(config)),
otelTracer))
.build())
.tracingProvider(new MicrometerTracingProvider(
tracer, new OtelPropagator(openTelemetry.getPropagators(), otelTracer)))
// If we retry tasks, there's no guarantee about ordering nor serial processing
.excludeSemantics(GuaranteeType.PROCESS_ORDERING, GuaranteeType.SERIAL_PROCESSING)
.customSemantics(new TracePropagationGuarantee(tracer))
.build()
.run();
}

private static class TracePropagationGuarantee implements ProcessingGuarantee {
private final Map<String, String> producedTraceIds = new ConcurrentHashMap<>();
private final Map<String, String> consumedTraceIds = new ConcurrentHashMap<>();
private final Tracer tracer;

TracePropagationGuarantee(Tracer tracer) {
this.tracer = tracer;
}

@Override
public void onProduce(ProducedRecord record) {
producedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId());
}

@Override
public void onProcess(TaskMetadata metadata, ProcessedRecord record) {
consumedTraceIds.put(record.task().getId(), tracer.currentTraceContext().context().traceId());
}

@Override
public void doAssert() {
assertEquals(producedTraceIds, consumedTraceIds);
}
}

private static class WithParentSpanProducer<K, V> extends ProducerAdaptor<K, V> {
private final io.opentelemetry.api.trace.Tracer otelTracer;

WithParentSpanProducer(Producer<K, V> delegate, io.opentelemetry.api.trace.Tracer otelTracer) {
super(delegate);
this.otelTracer = otelTracer;
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}

// Since the context of parent is injected in Callback of `producer.send`, create the span manually.
// ref: https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/8a15975dcacda48375cae62e98fe7551fb192d1f/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/KafkaTelemetry.java#L262-L264
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding below correct?

  • As you mentioned, otel KafkaTelemetry executes producer callback inside the parent context, which no traceId is associated unless we explicitly provide some
  • ProcessingGuarantee's onProduce is executed in producer callback. Hence, we need to create a span manually to supply some traceId

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, right.

final Span span = otelTracer.spanBuilder("test span").startSpan();
try (final Scope scope = span.makeCurrent()) {
return delegate.send(record, callback);
} finally {
span.end();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.tracing.TracingProvider.ProcessorTraceHandle;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

final class MicrometerProcessorTraceHandle extends MicrometerTraceHandle implements ProcessorTraceHandle {
private Tracer.SpanInScope scope;

MicrometerProcessorTraceHandle(Tracer tracer, Span span) {
super(tracer, span);
}

@Override
public void processingStart() {
scope = tracer.withSpan(span);
}

@Override
public void processingReturn() {
span.event("return");
scope.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.tracing.TracingProvider;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

final class MicrometerRecordTraceHandle extends MicrometerTraceHandle
implements TracingProvider.RecordTraceHandle {
MicrometerRecordTraceHandle(Tracer tracer, Span span) {
super(tracer, span);
}

@Override
public MicrometerProcessorTraceHandle childFor(DecatonProcessor<?> processor) {
final Span childSpan = tracer.nextSpan(span).name(processor.name()).start();
return new MicrometerProcessorTraceHandle(tracer, childSpan);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import com.linecorp.decaton.processor.tracing.TracingProvider.TraceHandle;

import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;

class MicrometerTraceHandle implements TraceHandle {
protected final Tracer tracer;
protected final Span span;

MicrometerTraceHandle(Tracer tracer, Span span) {
this.tracer = tracer;
this.span = span;
}

@Override
public void processingCompletion() {
span.end();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2023 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import static java.nio.charset.StandardCharsets.UTF_8;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import com.linecorp.decaton.processor.tracing.TracingProvider;

import io.micrometer.common.lang.Nullable;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;

public class MicrometerTracingProvider implements TracingProvider {
private final Tracer tracer;
private final Propagator propagator;

public MicrometerTracingProvider(Tracer tracer, Propagator propagator) {
this.tracer = tracer;
this.propagator = propagator;
}

@Override
public MicrometerRecordTraceHandle traceFor(ConsumerRecord<?, ?> record, String subscriptionId) {
return new MicrometerRecordTraceHandle(
tracer,
propagator.extract(record.headers(), GETTER)
.name("decaton").tag("subscriptionId", subscriptionId).start()
);
}

private static final Propagator.Getter<Headers> GETTER = new Propagator.Getter<Headers>() {
@Override
public String get(Headers carrier, String key) {
return lastStringHeader(carrier, key);
}

@Nullable
private String lastStringHeader(Headers headers, String key) {
final Header header = headers.lastHeader(key);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), UTF_8);
}
};
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ include ":docs"
include ":testing"
include ":benchmark"
include ":brave"
include ":micrometer-tracing"
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;

abstract class ProducerAdaptor<K, V> implements Producer<K, V> {
public abstract class ProducerAdaptor<K, V> implements Producer<K, V> {
protected final Producer<K, V> delegate;
protected ProducerAdaptor(Producer<K, V> delegate) {
this.delegate = delegate;
Expand Down