From 3129545e3317e552ffba965c6f02a07bed6079b7 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Wed, 8 Oct 2025 18:37:04 -0600 Subject: [PATCH] instrumentation for oai streaming --- .../OpenAIInstrumentationExample.java | 16 ++- .../openai/otel/StreamListener.java | 12 ++ .../openai/BraintrustOpenAITest.java | 129 ++++++++++++++++++ 3 files changed, 155 insertions(+), 2 deletions(-) diff --git a/examples/src/main/java/dev/braintrust/examples/OpenAIInstrumentationExample.java b/examples/src/main/java/dev/braintrust/examples/OpenAIInstrumentationExample.java index eca96e8..33f04ab 100644 --- a/examples/src/main/java/dev/braintrust/examples/OpenAIInstrumentationExample.java +++ b/examples/src/main/java/dev/braintrust/examples/OpenAIInstrumentationExample.java @@ -4,6 +4,7 @@ import com.openai.client.okhttp.OpenAIOkHttpClient; import com.openai.models.ChatModel; import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.models.chat.completions.ChatCompletionStreamOptions; import dev.braintrust.config.BraintrustConfig; import dev.braintrust.instrumentation.openai.BraintrustOpenAI; import dev.braintrust.trace.BraintrustTracing; @@ -23,7 +24,6 @@ public static void main(String[] args) throws Exception { var rootSpan = tracer.spanBuilder("openai-java-instrumentation-example").startSpan(); try (var ignored = rootSpan.makeCurrent()) { chatCompletionsExample(openAIClient); - // streaming instrumentation coming soon // chatCompletionsStreamingExample(openAIClient); } finally { rootSpan.end(); @@ -57,11 +57,23 @@ private static void chatCompletionsStreamingExample(OpenAIClient openAIClient) { .addSystemMessage("You are a helpful assistant") .addUserMessage("What is the capital of France?") .temperature(0.0) + .streamOptions( + ChatCompletionStreamOptions.builder().includeUsage(true).build()) .build(); System.out.println("\n~~~ STREAMING RESPONSE:"); try (var stream = openAIClient.chat().completions().createStreaming(request)) { - stream.stream().forEach(System.out::print); + stream.stream() + .forEach( + chunk -> { + if (!chunk.choices().isEmpty()) { + chunk.choices() + .get(0) + .delta() + .content() + .ifPresent(System.out::print); + } + }); } System.out.println("\n"); } diff --git a/src/main/java/dev/braintrust/instrumentation/openai/otel/StreamListener.java b/src/main/java/dev/braintrust/instrumentation/openai/otel/StreamListener.java index 3a0c164..cc57436 100644 --- a/src/main/java/dev/braintrust/instrumentation/openai/otel/StreamListener.java +++ b/src/main/java/dev/braintrust/instrumentation/openai/otel/StreamListener.java @@ -5,11 +5,14 @@ package dev.braintrust.instrumentation.openai.otel; +import com.fasterxml.jackson.databind.ObjectMapper; import com.openai.models.chat.completions.ChatCompletion; import com.openai.models.chat.completions.ChatCompletionChunk; import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.models.chat.completions.ChatCompletionMessage; import com.openai.models.completions.CompletionUsage; import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import java.util.ArrayList; @@ -17,8 +20,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import javax.annotation.Nullable; +import lombok.SneakyThrows; final class StreamListener { + private static final ObjectMapper JSON_MAPPER = + new com.fasterxml.jackson.databind.ObjectMapper(); private final Context context; private final ChatCompletionCreateParams request; @@ -51,6 +57,7 @@ final class StreamListener { hasEnded = new AtomicBoolean(); } + @SneakyThrows void onChunk(ChatCompletionChunk chunk) { model = chunk.model(); responseId = chunk.id(); @@ -68,6 +75,11 @@ void onChunk(ChatCompletionChunk chunk) { buffer.append(choice.delta()); if (choice.finishReason().isPresent()) { buffer.finishReason = choice.finishReason().get().toString(); + Span.fromContext(context) + .setAttribute( + "braintrust.output_json", + JSON_MAPPER.writeValueAsString( + new ChatCompletionMessage[] {buffer.toChoice().message()})); // message has ended, let's emit ChatCompletionEventsHelper.emitCompletionLogEvent( diff --git a/src/test/java/dev/braintrust/instrumentation/openai/BraintrustOpenAITest.java b/src/test/java/dev/braintrust/instrumentation/openai/BraintrustOpenAITest.java index af71876..434adab 100644 --- a/src/test/java/dev/braintrust/instrumentation/openai/BraintrustOpenAITest.java +++ b/src/test/java/dev/braintrust/instrumentation/openai/BraintrustOpenAITest.java @@ -11,6 +11,7 @@ import com.openai.client.okhttp.OpenAIOkHttpClient; import com.openai.models.ChatModel; import com.openai.models.chat.completions.ChatCompletionCreateParams; +import com.openai.models.chat.completions.ChatCompletionStreamOptions; import dev.braintrust.config.BraintrustConfig; import dev.braintrust.trace.BraintrustTracing; import io.opentelemetry.api.GlobalOpenTelemetry; @@ -166,4 +167,132 @@ void testWrapOpenAi() { "chatcmpl-test123", span.getAttributes().get(AttributeKey.stringKey("gen_ai.response.id"))); } + + @Test + @SneakyThrows + void testWrapOpenAiStreaming() { + // Mock the OpenAI API streaming response + String streamingResponse = + """ + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"The"},"finish_reason":null}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" capital"},"finish_reason":null}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" of"},"finish_reason":null}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" France"},"finish_reason":null}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" is"},"finish_reason":null}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" Paris"},"finish_reason":null}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"."},"finish_reason":null}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} + + data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[],"usage":{"prompt_tokens":20,"completion_tokens":8,"total_tokens":28}} + + data: [DONE] + + """; + + wireMock.stubFor( + post(urlEqualTo("/chat/completions")) + .willReturn( + aResponse() + .withStatus(200) + .withHeader("Content-Type", "text/event-stream") + .withBody(streamingResponse))); + + var openTelemetry = (OpenTelemetrySdk) BraintrustTracing.of(config, true); + + // Create OpenAI client pointing to WireMock server + OpenAIClient openAIClient = + OpenAIOkHttpClient.builder() + .baseUrl("http://localhost:" + wireMock.getPort()) + .apiKey("test-api-key") + .build(); + + // Wrap with Braintrust instrumentation + openAIClient = BraintrustOpenAI.wrapOpenAI(openTelemetry, openAIClient); + + var request = + ChatCompletionCreateParams.builder() + .model(ChatModel.GPT_4O_MINI) + .addSystemMessage("You are a helpful assistant") + .addUserMessage("What is the capital of France?") + .temperature(0.0) + .streamOptions( + ChatCompletionStreamOptions.builder().includeUsage(true).build()) + .build(); + + // Consume the stream + StringBuilder fullResponse = new StringBuilder(); + try (var stream = openAIClient.chat().completions().createStreaming(request)) { + stream.stream() + .forEach( + chunk -> { + if (!chunk.choices().isEmpty()) { + chunk.choices() + .get(0) + .delta() + .content() + .ifPresent(fullResponse::append); + } + }); + } + + // Verify the response + assertEquals("The capital of France is Paris.", fullResponse.toString()); + wireMock.verify(1, postRequestedFor(urlEqualTo("/chat/completions"))); + + // Verify spans were exported + assertTrue( + openTelemetry + .getSdkTracerProvider() + .forceFlush() + .join(10, TimeUnit.SECONDS) + .isSuccess()); + var spanData = + getExportedBraintrustSpans().get(config.getBraintrustParentValue().orElseThrow()); + assertNotNull(spanData); + assertEquals(1, spanData.size()); + var span = spanData.get(0); + + // Verify span attributes + assertEquals("openai", span.getAttributes().get(AttributeKey.stringKey("gen_ai.system"))); + assertEquals( + "gpt-4o-mini", + span.getAttributes().get(AttributeKey.stringKey("gen_ai.request.model"))); + assertEquals( + "gpt-4o-mini", + span.getAttributes().get(AttributeKey.stringKey("gen_ai.response.model"))); + assertEquals( + "[stop]", + span.getAttributes() + .get(AttributeKey.stringArrayKey("gen_ai.response.finish_reasons")) + .toString()); + assertEquals( + "chat", span.getAttributes().get(AttributeKey.stringKey("gen_ai.operation.name"))); + assertEquals( + "chatcmpl-test123", + span.getAttributes().get(AttributeKey.stringKey("gen_ai.response.id"))); + + // Verify usage metrics were captured + assertEquals( + 20L, span.getAttributes().get(AttributeKey.longKey("gen_ai.usage.input_tokens"))); + assertEquals( + 8L, span.getAttributes().get(AttributeKey.longKey("gen_ai.usage.output_tokens"))); + + // Verify output JSON was captured + String outputJson = + span.getAttributes().get(AttributeKey.stringKey("braintrust.output_json")); + assertNotNull(outputJson); + var outputMessages = JSON_MAPPER.readTree(outputJson); + assertEquals(1, outputMessages.size()); + var messageZero = outputMessages.get(0); + assertEquals("The capital of France is Paris.", messageZero.get("content").asText()); + } }