Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public static void main(String[] args) throws Exception {
var rootSpan = tracer.spanBuilder("anthropic-java-instrumentation-example").startSpan();
try (var ignored = rootSpan.makeCurrent()) {
messagesApiExample(anthropicClient);
// streaming instrumentation coming soon
// messagesStreamingExample(anthropicClient);
} finally {
rootSpan.end();
Expand Down Expand Up @@ -71,7 +70,16 @@ private static void messagesStreamingExample(AnthropicClient anthropicClient) {

System.out.println("\n~~~ STREAMING RESPONSE:");
try (var stream = anthropicClient.messages().createStreaming(request)) {
stream.stream().forEach(System.out::print);
stream.stream()
.forEach(
event -> {
if (event.contentBlockDelta().isPresent()) {
var delta = event.contentBlockDelta().get().delta();
if (delta.text().isPresent()) {
System.out.print(delta.text().get().text());
}
}
});
}
System.out.println("\n");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package dev.braintrust.instrumentation.anthropic.otel;

import com.anthropic.core.RequestOptions;
import com.anthropic.core.http.StreamResponse;
import com.anthropic.models.messages.Message;
import com.anthropic.models.messages.MessageCreateParams;
import com.anthropic.models.messages.MessageParam;
import com.anthropic.models.messages.RawMessageStreamEvent;
import com.anthropic.models.messages.TextBlockParam;
import com.anthropic.services.blocking.MessageService;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -47,15 +49,31 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();

if (methodName.equals("create")) {
if (parameterTypes.length >= 1 && parameterTypes[0] == MessageCreateParams.class) {
if (parameterTypes.length == 1) {
return create((MessageCreateParams) args[0], RequestOptions.none());
} else if (parameterTypes.length == 2
&& parameterTypes[1] == RequestOptions.class) {
return create((MessageCreateParams) args[0], (RequestOptions) args[1]);
switch (methodName) {
case "create":
if (parameterTypes.length >= 1 && parameterTypes[0] == MessageCreateParams.class) {
if (parameterTypes.length == 1) {
return create((MessageCreateParams) args[0], RequestOptions.none());
} else if (parameterTypes.length == 2
&& parameterTypes[1] == RequestOptions.class) {
return create((MessageCreateParams) args[0], (RequestOptions) args[1]);
}
}
}
break;
case "createStreaming":
if (parameterTypes.length >= 1 && parameterTypes[0] == MessageCreateParams.class) {
if (parameterTypes.length == 1) {
return createStreaming(
(MessageCreateParams) args[0], RequestOptions.none());
} else if (parameterTypes.length == 2
&& parameterTypes[1] == RequestOptions.class) {
return createStreaming(
(MessageCreateParams) args[0], (RequestOptions) args[1]);
}
}
break;
default:
// fallthrough
}

return super.invoke(proxy, method, args);
Expand Down Expand Up @@ -98,6 +116,55 @@ private Message create(MessageCreateParams inputMessage, RequestOptions requestO
return outputMessage;
}

private StreamResponse<RawMessageStreamEvent> createStreaming(
MessageCreateParams inputMessage, RequestOptions requestOptions) {
Context parentContext = Context.current();
if (!instrumenter.shouldStart(parentContext, inputMessage)) {
return createStreamingWithLogs(parentContext, inputMessage, requestOptions, false);
}

Context context = instrumenter.start(parentContext, inputMessage);
try (Scope ignored = context.makeCurrent()) {
return createStreamingWithLogs(context, inputMessage, requestOptions, true);
} catch (Throwable t) {
instrumenter.end(context, inputMessage, null, t);
throw t;
}
}

@SneakyThrows
private StreamResponse<RawMessageStreamEvent> createStreamingWithLogs(
Context context,
MessageCreateParams inputMessage,
RequestOptions requestOptions,
boolean newSpan) {
List<MessageParam> inputMessages = new ArrayList<>(inputMessage.messages());
// Put system in the input message so the backend will pick it up in the LLM display
if (inputMessage.system().isPresent()) {
inputMessages.add(
0,
MessageParam.builder()
.role(MessageParam.Role.of("system"))
.content(inputMessage.system().get().asString())
.build());
}
Span.fromContext(context)
.setAttribute(
"braintrust.input_json", JSON_MAPPER.writeValueAsString(inputMessages));

StreamResponse<RawMessageStreamEvent> result =
delegate.createStreaming(inputMessage, requestOptions);
return new TracingStreamedResponse(
result,
new StreamListener(
context,
inputMessage,
instrumenter,
eventLogger,
captureMessageContent,
newSpan));
}

private static String contentToString(MessageCreateParams.System content) {
if (content.isString()) {
return content.asString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package dev.braintrust.instrumentation.anthropic.otel;

import com.anthropic.models.messages.Message;
import com.anthropic.models.messages.MessageCreateParams;
import com.anthropic.models.messages.MessageDeltaUsage;
import com.anthropic.models.messages.Model;
import com.anthropic.models.messages.RawMessageStreamEvent;
import com.anthropic.models.messages.Usage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import lombok.SneakyThrows;

final class StreamListener {
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();

private final Context context;
private final MessageCreateParams request;
private final Instrumenter<MessageCreateParams, Message> instrumenter;
private final Logger eventLogger;
private final boolean captureMessageContent;
private final boolean newSpan;
private final AtomicBoolean hasEnded;

private final StringBuilder contentBuilder = new StringBuilder();

@Nullable private Usage usage;
@Nullable private MessageDeltaUsage deltaUsage;
@Nullable private Model model;
@Nullable private String responseId;
@Nullable private String stopReason;

StreamListener(
Context context,
MessageCreateParams request,
Instrumenter<MessageCreateParams, Message> instrumenter,
Logger eventLogger,
boolean captureMessageContent,
boolean newSpan) {
this.context = context;
this.request = request;
this.instrumenter = instrumenter;
this.eventLogger = eventLogger;
this.captureMessageContent = captureMessageContent;
this.newSpan = newSpan;
hasEnded = new AtomicBoolean();
}

@SneakyThrows
void onEvent(RawMessageStreamEvent event) {
// Handle message_start event
if (event.messageStart().isPresent()) {
var messageStart = event.messageStart().get();
model = messageStart.message().model();
responseId = messageStart.message().id();
if (messageStart.message().usage() != null) {
usage = messageStart.message().usage();
}
}

// Handle content_block_delta event - accumulate text
if (event.contentBlockDelta().isPresent()) {
var delta = event.contentBlockDelta().get();
if (delta.delta().text().isPresent()) {
contentBuilder.append(delta.delta().text().get().text());
}
}

// Handle message_delta event
if (event.messageDelta().isPresent()) {
var messageDelta = event.messageDelta().get();
if (messageDelta.delta().stopReason().isPresent()) {
stopReason = messageDelta.delta().stopReason().get().toString();
}
if (messageDelta.usage() != null) {
deltaUsage = messageDelta.usage();
}
}

// Handle content_block_stop - write output
if (event.contentBlockStop().isPresent()) {
ArrayNode outputArray = JSON_MAPPER.createArrayNode();
ObjectNode message = JSON_MAPPER.createObjectNode();
message.put("role", "assistant");
message.put("content", contentBuilder.toString());
outputArray.add(message);

Span.fromContext(context)
.setAttribute(
"braintrust.output_json", JSON_MAPPER.writeValueAsString(outputArray));
}
}

void endSpan(@Nullable Throwable error) {
// Use an atomic operation since close() type of methods are exposed to the user
// and can come from any thread.
if (!hasEnded.compareAndSet(false, true)) {
return;
}

if (!newSpan) {
return;
}

if (model == null || responseId == null) {
// Only happens if we got no events, so we have no response.
instrumenter.end(context, request, null, error);
return;
}

// Set response attributes directly on the span since building a valid Message is complex
// The content was already written to braintrust.output_json in onEvent
Span span = Span.fromContext(context);

// Set model and response ID
if (model != null) {
span.setAttribute("gen_ai.response.model", model.asString());
}
if (responseId != null) {
span.setAttribute("gen_ai.response.id", responseId);
}
if (stopReason != null) {
span.setAttribute(
AttributeKey.stringArrayKey("gen_ai.response.finish_reasons"),
Arrays.asList(stopReason));
}

// Set usage metrics - combine from both message_start and message_delta
// message_start has input_tokens, message_delta has final output_tokens
if (usage != null) {
span.setAttribute("gen_ai.usage.input_tokens", usage.inputTokens());
}
if (deltaUsage != null) {
// message_delta may also have input_tokens, prefer it if present
deltaUsage
.inputTokens()
.ifPresent(tokens -> span.setAttribute("gen_ai.usage.input_tokens", tokens));
span.setAttribute("gen_ai.usage.output_tokens", deltaUsage.outputTokens());
} else if (usage != null) {
// Fallback to usage from message_start for output_tokens if no delta
span.setAttribute("gen_ai.usage.output_tokens", usage.outputTokens());
}

instrumenter.end(context, request, null, error);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package dev.braintrust.instrumentation.anthropic.otel;

import com.anthropic.core.http.StreamResponse;
import com.anthropic.models.messages.RawMessageStreamEvent;
import java.util.Comparator;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;

final class TracingStreamedResponse implements StreamResponse<RawMessageStreamEvent> {

private final StreamResponse<RawMessageStreamEvent> delegate;
private final StreamListener listener;

TracingStreamedResponse(
StreamResponse<RawMessageStreamEvent> delegate, StreamListener listener) {
this.delegate = delegate;
this.listener = listener;
}

@Override
public Stream<RawMessageStreamEvent> stream() {
return StreamSupport.stream(new TracingSpliterator(delegate.stream().spliterator()), false);
}

@Override
public void close() {
listener.endSpan(null);
delegate.close();
}

private class TracingSpliterator implements Spliterator<RawMessageStreamEvent> {

private final Spliterator<RawMessageStreamEvent> delegateSpliterator;

private TracingSpliterator(Spliterator<RawMessageStreamEvent> delegateSpliterator) {
this.delegateSpliterator = delegateSpliterator;
}

@Override
public boolean tryAdvance(Consumer<? super RawMessageStreamEvent> action) {
boolean eventReceived =
delegateSpliterator.tryAdvance(
event -> {
listener.onEvent(event);
action.accept(event);
});
if (!eventReceived) {
listener.endSpan(null);
}
return eventReceived;
}

@Override
@Nullable
public Spliterator<RawMessageStreamEvent> trySplit() {
// do not support parallelism to reliably catch the last event
return null;
}

@Override
public long estimateSize() {
return delegateSpliterator.estimateSize();
}

@Override
public long getExactSizeIfKnown() {
return delegateSpliterator.getExactSizeIfKnown();
}

@Override
public int characteristics() {
return delegateSpliterator.characteristics();
}

@Override
public Comparator<? super RawMessageStreamEvent> getComparator() {
return delegateSpliterator.getComparator();
}
}
}
Loading