From 287bf40fea649dc2a476334f6c59f5618fd70b12 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Wed, 8 Oct 2025 19:34:26 -0600 Subject: [PATCH] anthropic streaming example --- .../AnthropicInstrumentationExample.java | 12 +- .../otel/InstrumentedMessageService.java | 83 +++++++++- .../anthropic/otel/StreamListener.java | 154 ++++++++++++++++++ .../otel/TracingStreamedResponse.java | 83 ++++++++++ .../anthropic/BraintrustAnthropicTest.java | 137 ++++++++++++++++ 5 files changed, 459 insertions(+), 10 deletions(-) create mode 100644 src/main/java/dev/braintrust/instrumentation/anthropic/otel/StreamListener.java create mode 100644 src/main/java/dev/braintrust/instrumentation/anthropic/otel/TracingStreamedResponse.java diff --git a/examples/src/main/java/dev/braintrust/examples/AnthropicInstrumentationExample.java b/examples/src/main/java/dev/braintrust/examples/AnthropicInstrumentationExample.java index de0e071..94018fb 100644 --- a/examples/src/main/java/dev/braintrust/examples/AnthropicInstrumentationExample.java +++ b/examples/src/main/java/dev/braintrust/examples/AnthropicInstrumentationExample.java @@ -29,7 +29,6 @@ public static void main(String[] args) throws Exception { var rootSpan = tracer.spanBuilder("anthropic-java-instrumentation-example").startSpan(); try (var ignored = rootSpan.makeCurrent()) { messagesApiExample(anthropicClient); - // streaming instrumentation coming soon // messagesStreamingExample(anthropicClient); } finally { rootSpan.end(); @@ -71,7 +70,16 @@ private static void messagesStreamingExample(AnthropicClient anthropicClient) { System.out.println("\n~~~ STREAMING RESPONSE:"); try (var stream = anthropicClient.messages().createStreaming(request)) { - stream.stream().forEach(System.out::print); + stream.stream() + .forEach( + event -> { + if (event.contentBlockDelta().isPresent()) { + var delta = event.contentBlockDelta().get().delta(); + if (delta.text().isPresent()) { + System.out.print(delta.text().get().text()); + } + } + }); } System.out.println("\n"); } diff --git a/src/main/java/dev/braintrust/instrumentation/anthropic/otel/InstrumentedMessageService.java b/src/main/java/dev/braintrust/instrumentation/anthropic/otel/InstrumentedMessageService.java index 9746783..21d631a 100644 --- a/src/main/java/dev/braintrust/instrumentation/anthropic/otel/InstrumentedMessageService.java +++ b/src/main/java/dev/braintrust/instrumentation/anthropic/otel/InstrumentedMessageService.java @@ -1,9 +1,11 @@ package dev.braintrust.instrumentation.anthropic.otel; import com.anthropic.core.RequestOptions; +import com.anthropic.core.http.StreamResponse; import com.anthropic.models.messages.Message; import com.anthropic.models.messages.MessageCreateParams; import com.anthropic.models.messages.MessageParam; +import com.anthropic.models.messages.RawMessageStreamEvent; import com.anthropic.models.messages.TextBlockParam; import com.anthropic.services.blocking.MessageService; import com.fasterxml.jackson.databind.ObjectMapper; @@ -47,15 +49,31 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); - if (methodName.equals("create")) { - if (parameterTypes.length >= 1 && parameterTypes[0] == MessageCreateParams.class) { - if (parameterTypes.length == 1) { - return create((MessageCreateParams) args[0], RequestOptions.none()); - } else if (parameterTypes.length == 2 - && parameterTypes[1] == RequestOptions.class) { - return create((MessageCreateParams) args[0], (RequestOptions) args[1]); + switch (methodName) { + case "create": + if (parameterTypes.length >= 1 && parameterTypes[0] == MessageCreateParams.class) { + if (parameterTypes.length == 1) { + return create((MessageCreateParams) args[0], RequestOptions.none()); + } else if (parameterTypes.length == 2 + && parameterTypes[1] == RequestOptions.class) { + return create((MessageCreateParams) args[0], (RequestOptions) args[1]); + } } - } + break; + case "createStreaming": + if (parameterTypes.length >= 1 && parameterTypes[0] == MessageCreateParams.class) { + if (parameterTypes.length == 1) { + return createStreaming( + (MessageCreateParams) args[0], RequestOptions.none()); + } else if (parameterTypes.length == 2 + && parameterTypes[1] == RequestOptions.class) { + return createStreaming( + (MessageCreateParams) args[0], (RequestOptions) args[1]); + } + } + break; + default: + // fallthrough } return super.invoke(proxy, method, args); @@ -98,6 +116,55 @@ private Message create(MessageCreateParams inputMessage, RequestOptions requestO return outputMessage; } + private StreamResponse createStreaming( + MessageCreateParams inputMessage, RequestOptions requestOptions) { + Context parentContext = Context.current(); + if (!instrumenter.shouldStart(parentContext, inputMessage)) { + return createStreamingWithLogs(parentContext, inputMessage, requestOptions, false); + } + + Context context = instrumenter.start(parentContext, inputMessage); + try (Scope ignored = context.makeCurrent()) { + return createStreamingWithLogs(context, inputMessage, requestOptions, true); + } catch (Throwable t) { + instrumenter.end(context, inputMessage, null, t); + throw t; + } + } + + @SneakyThrows + private StreamResponse createStreamingWithLogs( + Context context, + MessageCreateParams inputMessage, + RequestOptions requestOptions, + boolean newSpan) { + List inputMessages = new ArrayList<>(inputMessage.messages()); + // Put system in the input message so the backend will pick it up in the LLM display + if (inputMessage.system().isPresent()) { + inputMessages.add( + 0, + MessageParam.builder() + .role(MessageParam.Role.of("system")) + .content(inputMessage.system().get().asString()) + .build()); + } + Span.fromContext(context) + .setAttribute( + "braintrust.input_json", JSON_MAPPER.writeValueAsString(inputMessages)); + + StreamResponse result = + delegate.createStreaming(inputMessage, requestOptions); + return new TracingStreamedResponse( + result, + new StreamListener( + context, + inputMessage, + instrumenter, + eventLogger, + captureMessageContent, + newSpan)); + } + private static String contentToString(MessageCreateParams.System content) { if (content.isString()) { return content.asString(); diff --git a/src/main/java/dev/braintrust/instrumentation/anthropic/otel/StreamListener.java b/src/main/java/dev/braintrust/instrumentation/anthropic/otel/StreamListener.java new file mode 100644 index 0000000..d73b3c4 --- /dev/null +++ b/src/main/java/dev/braintrust/instrumentation/anthropic/otel/StreamListener.java @@ -0,0 +1,154 @@ +package dev.braintrust.instrumentation.anthropic.otel; + +import com.anthropic.models.messages.Message; +import com.anthropic.models.messages.MessageCreateParams; +import com.anthropic.models.messages.MessageDeltaUsage; +import com.anthropic.models.messages.Model; +import com.anthropic.models.messages.RawMessageStreamEvent; +import com.anthropic.models.messages.Usage; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.annotation.Nullable; +import lombok.SneakyThrows; + +final class StreamListener { + private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); + + private final Context context; + private final MessageCreateParams request; + private final Instrumenter instrumenter; + private final Logger eventLogger; + private final boolean captureMessageContent; + private final boolean newSpan; + private final AtomicBoolean hasEnded; + + private final StringBuilder contentBuilder = new StringBuilder(); + + @Nullable private Usage usage; + @Nullable private MessageDeltaUsage deltaUsage; + @Nullable private Model model; + @Nullable private String responseId; + @Nullable private String stopReason; + + StreamListener( + Context context, + MessageCreateParams request, + Instrumenter instrumenter, + Logger eventLogger, + boolean captureMessageContent, + boolean newSpan) { + this.context = context; + this.request = request; + this.instrumenter = instrumenter; + this.eventLogger = eventLogger; + this.captureMessageContent = captureMessageContent; + this.newSpan = newSpan; + hasEnded = new AtomicBoolean(); + } + + @SneakyThrows + void onEvent(RawMessageStreamEvent event) { + // Handle message_start event + if (event.messageStart().isPresent()) { + var messageStart = event.messageStart().get(); + model = messageStart.message().model(); + responseId = messageStart.message().id(); + if (messageStart.message().usage() != null) { + usage = messageStart.message().usage(); + } + } + + // Handle content_block_delta event - accumulate text + if (event.contentBlockDelta().isPresent()) { + var delta = event.contentBlockDelta().get(); + if (delta.delta().text().isPresent()) { + contentBuilder.append(delta.delta().text().get().text()); + } + } + + // Handle message_delta event + if (event.messageDelta().isPresent()) { + var messageDelta = event.messageDelta().get(); + if (messageDelta.delta().stopReason().isPresent()) { + stopReason = messageDelta.delta().stopReason().get().toString(); + } + if (messageDelta.usage() != null) { + deltaUsage = messageDelta.usage(); + } + } + + // Handle content_block_stop - write output + if (event.contentBlockStop().isPresent()) { + ArrayNode outputArray = JSON_MAPPER.createArrayNode(); + ObjectNode message = JSON_MAPPER.createObjectNode(); + message.put("role", "assistant"); + message.put("content", contentBuilder.toString()); + outputArray.add(message); + + Span.fromContext(context) + .setAttribute( + "braintrust.output_json", JSON_MAPPER.writeValueAsString(outputArray)); + } + } + + void endSpan(@Nullable Throwable error) { + // Use an atomic operation since close() type of methods are exposed to the user + // and can come from any thread. + if (!hasEnded.compareAndSet(false, true)) { + return; + } + + if (!newSpan) { + return; + } + + if (model == null || responseId == null) { + // Only happens if we got no events, so we have no response. + instrumenter.end(context, request, null, error); + return; + } + + // Set response attributes directly on the span since building a valid Message is complex + // The content was already written to braintrust.output_json in onEvent + Span span = Span.fromContext(context); + + // Set model and response ID + if (model != null) { + span.setAttribute("gen_ai.response.model", model.asString()); + } + if (responseId != null) { + span.setAttribute("gen_ai.response.id", responseId); + } + if (stopReason != null) { + span.setAttribute( + AttributeKey.stringArrayKey("gen_ai.response.finish_reasons"), + Arrays.asList(stopReason)); + } + + // Set usage metrics - combine from both message_start and message_delta + // message_start has input_tokens, message_delta has final output_tokens + if (usage != null) { + span.setAttribute("gen_ai.usage.input_tokens", usage.inputTokens()); + } + if (deltaUsage != null) { + // message_delta may also have input_tokens, prefer it if present + deltaUsage + .inputTokens() + .ifPresent(tokens -> span.setAttribute("gen_ai.usage.input_tokens", tokens)); + span.setAttribute("gen_ai.usage.output_tokens", deltaUsage.outputTokens()); + } else if (usage != null) { + // Fallback to usage from message_start for output_tokens if no delta + span.setAttribute("gen_ai.usage.output_tokens", usage.outputTokens()); + } + + instrumenter.end(context, request, null, error); + } +} diff --git a/src/main/java/dev/braintrust/instrumentation/anthropic/otel/TracingStreamedResponse.java b/src/main/java/dev/braintrust/instrumentation/anthropic/otel/TracingStreamedResponse.java new file mode 100644 index 0000000..312fd87 --- /dev/null +++ b/src/main/java/dev/braintrust/instrumentation/anthropic/otel/TracingStreamedResponse.java @@ -0,0 +1,83 @@ +package dev.braintrust.instrumentation.anthropic.otel; + +import com.anthropic.core.http.StreamResponse; +import com.anthropic.models.messages.RawMessageStreamEvent; +import java.util.Comparator; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; + +final class TracingStreamedResponse implements StreamResponse { + + private final StreamResponse delegate; + private final StreamListener listener; + + TracingStreamedResponse( + StreamResponse delegate, StreamListener listener) { + this.delegate = delegate; + this.listener = listener; + } + + @Override + public Stream stream() { + return StreamSupport.stream(new TracingSpliterator(delegate.stream().spliterator()), false); + } + + @Override + public void close() { + listener.endSpan(null); + delegate.close(); + } + + private class TracingSpliterator implements Spliterator { + + private final Spliterator delegateSpliterator; + + private TracingSpliterator(Spliterator delegateSpliterator) { + this.delegateSpliterator = delegateSpliterator; + } + + @Override + public boolean tryAdvance(Consumer action) { + boolean eventReceived = + delegateSpliterator.tryAdvance( + event -> { + listener.onEvent(event); + action.accept(event); + }); + if (!eventReceived) { + listener.endSpan(null); + } + return eventReceived; + } + + @Override + @Nullable + public Spliterator trySplit() { + // do not support parallelism to reliably catch the last event + return null; + } + + @Override + public long estimateSize() { + return delegateSpliterator.estimateSize(); + } + + @Override + public long getExactSizeIfKnown() { + return delegateSpliterator.getExactSizeIfKnown(); + } + + @Override + public int characteristics() { + return delegateSpliterator.characteristics(); + } + + @Override + public Comparator getComparator() { + return delegateSpliterator.getComparator(); + } + } +} diff --git a/src/test/java/dev/braintrust/instrumentation/anthropic/BraintrustAnthropicTest.java b/src/test/java/dev/braintrust/instrumentation/anthropic/BraintrustAnthropicTest.java index 176644a..718c9b4 100644 --- a/src/test/java/dev/braintrust/instrumentation/anthropic/BraintrustAnthropicTest.java +++ b/src/test/java/dev/braintrust/instrumentation/anthropic/BraintrustAnthropicTest.java @@ -174,4 +174,141 @@ void testWrapAnthropic() { assertEquals(8, messageZero.get("usage").get("output_tokens").asInt()); assertEquals(20, messageZero.get("usage").get("input_tokens").asInt()); } + + @Test + @SneakyThrows + void testWrapAnthropicStreaming() { + // Mock the Anthropic streaming API response + String streamingResponse = + """ + event: message_start + data: {"type":"message_start","message":{"id":"msg_test123","type":"message","role":"assistant","model":"claude-3-5-haiku-20241022","content":[],"stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":20,"output_tokens":1}}} + + event: content_block_start + data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"The"}} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" capital"}} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" of"}} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" France"}} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" is"}} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" Paris"}} + + event: content_block_delta + data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"."}} + + event: content_block_stop + data: {"type":"content_block_stop","index":0} + + event: message_delta + data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":8}} + + event: message_stop + data: {"type":"message_stop"} + + """; + + wireMock.stubFor( + post(urlEqualTo("/v1/messages")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "text/event-stream") + .withBody(streamingResponse))); + + var openTelemetry = (OpenTelemetrySdk) BraintrustTracing.of(config, true); + + // Create Anthropic client pointing to WireMock server + AnthropicClient anthropicClient = + AnthropicOkHttpClient.builder() + .baseUrl("http://localhost:" + wireMock.getPort()) + .apiKey("test-api-key") + .build(); + + // Wrap with Braintrust instrumentation + anthropicClient = BraintrustAnthropic.wrap(openTelemetry, anthropicClient); + + var request = + MessageCreateParams.builder() + .model(Model.CLAUDE_3_5_HAIKU_20241022) + .system("You are a helpful assistant") + .addUserMessage("What is the capital of France?") + .maxTokens(50) + .temperature(0.0) + .build(); + + // Consume the stream + StringBuilder fullResponse = new StringBuilder(); + try (var stream = anthropicClient.messages().createStreaming(request)) { + stream.stream() + .forEach( + event -> { + if (event.contentBlockDelta().isPresent()) { + var delta = event.contentBlockDelta().get().delta(); + if (delta.text().isPresent()) { + fullResponse.append(delta.text().get().text()); + } + } + }); + } + + // Verify the response + assertEquals("The capital of France is Paris.", fullResponse.toString()); + wireMock.verify(1, postRequestedFor(urlEqualTo("/v1/messages"))); + + // Verify spans were exported + assertTrue( + openTelemetry + .getSdkTracerProvider() + .forceFlush() + .join(10, TimeUnit.SECONDS) + .isSuccess()); + var spanData = + getExportedBraintrustSpans().get(config.getBraintrustParentValue().orElseThrow()); + assertNotNull(spanData); + assertEquals(1, spanData.size()); + var span = spanData.get(0); + + // Verify standard GenAI attributes + assertEquals( + "anthropic", span.getAttributes().get(AttributeKey.stringKey("gen_ai.system"))); + assertEquals( + "claude-3-5-haiku-20241022", + span.getAttributes().get(AttributeKey.stringKey("gen_ai.request.model"))); + assertEquals( + "claude-3-5-haiku-20241022", + span.getAttributes().get(AttributeKey.stringKey("gen_ai.response.model"))); + assertEquals( + "chat", span.getAttributes().get(AttributeKey.stringKey("gen_ai.operation.name"))); + assertEquals( + "msg_test123", + span.getAttributes().get(AttributeKey.stringKey("gen_ai.response.id"))); + + // Verify usage metrics were captured from streaming + assertEquals( + 20L, span.getAttributes().get(AttributeKey.longKey("gen_ai.usage.input_tokens"))); + assertEquals( + 8L, span.getAttributes().get(AttributeKey.longKey("gen_ai.usage.output_tokens"))); + + // Verify output JSON was captured + String outputJson = + span.getAttributes().get(AttributeKey.stringKey("braintrust.output_json")); + assertNotNull(outputJson); + var outputMessages = JSON_MAPPER.readTree(outputJson); + assertEquals(1, outputMessages.size()); + var messageZero = outputMessages.get(0); + assertEquals("assistant", messageZero.get("role").asText()); + assertEquals("The capital of France is Paris.", messageZero.get("content").asText()); + } }