Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.openai.client.okhttp.OpenAIOkHttpClient;
import com.openai.models.ChatModel;
import com.openai.models.chat.completions.ChatCompletionCreateParams;
import com.openai.models.chat.completions.ChatCompletionStreamOptions;
import dev.braintrust.config.BraintrustConfig;
import dev.braintrust.instrumentation.openai.BraintrustOpenAI;
import dev.braintrust.trace.BraintrustTracing;
Expand All @@ -23,7 +24,6 @@ public static void main(String[] args) throws Exception {
var rootSpan = tracer.spanBuilder("openai-java-instrumentation-example").startSpan();
try (var ignored = rootSpan.makeCurrent()) {
chatCompletionsExample(openAIClient);
// streaming instrumentation coming soon
// chatCompletionsStreamingExample(openAIClient);
} finally {
rootSpan.end();
Expand Down Expand Up @@ -57,11 +57,23 @@ private static void chatCompletionsStreamingExample(OpenAIClient openAIClient) {
.addSystemMessage("You are a helpful assistant")
.addUserMessage("What is the capital of France?")
.temperature(0.0)
.streamOptions(
ChatCompletionStreamOptions.builder().includeUsage(true).build())
.build();

System.out.println("\n~~~ STREAMING RESPONSE:");
try (var stream = openAIClient.chat().completions().createStreaming(request)) {
stream.stream().forEach(System.out::print);
stream.stream()
.forEach(
chunk -> {
if (!chunk.choices().isEmpty()) {
chunk.choices()
.get(0)
.delta()
.content()
.ifPresent(System.out::print);
}
});
}
System.out.println("\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@

package dev.braintrust.instrumentation.openai.otel;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.openai.models.chat.completions.ChatCompletion;
import com.openai.models.chat.completions.ChatCompletionChunk;
import com.openai.models.chat.completions.ChatCompletionCreateParams;
import com.openai.models.chat.completions.ChatCompletionMessage;
import com.openai.models.completions.CompletionUsage;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.SneakyThrows;

final class StreamListener {
private static final ObjectMapper JSON_MAPPER =
new com.fasterxml.jackson.databind.ObjectMapper();

private final Context context;
private final ChatCompletionCreateParams request;
Expand Down Expand Up @@ -51,6 +57,7 @@ final class StreamListener {
hasEnded = new AtomicBoolean();
}

@SneakyThrows
void onChunk(ChatCompletionChunk chunk) {
model = chunk.model();
responseId = chunk.id();
Expand All @@ -68,6 +75,11 @@ void onChunk(ChatCompletionChunk chunk) {
buffer.append(choice.delta());
if (choice.finishReason().isPresent()) {
buffer.finishReason = choice.finishReason().get().toString();
Span.fromContext(context)
.setAttribute(
"braintrust.output_json",
JSON_MAPPER.writeValueAsString(
new ChatCompletionMessage[] {buffer.toChoice().message()}));

// message has ended, let's emit
ChatCompletionEventsHelper.emitCompletionLogEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.openai.client.okhttp.OpenAIOkHttpClient;
import com.openai.models.ChatModel;
import com.openai.models.chat.completions.ChatCompletionCreateParams;
import com.openai.models.chat.completions.ChatCompletionStreamOptions;
import dev.braintrust.config.BraintrustConfig;
import dev.braintrust.trace.BraintrustTracing;
import io.opentelemetry.api.GlobalOpenTelemetry;
Expand Down Expand Up @@ -166,4 +167,132 @@ void testWrapOpenAi() {
"chatcmpl-test123",
span.getAttributes().get(AttributeKey.stringKey("gen_ai.response.id")));
}

@Test
@SneakyThrows
void testWrapOpenAiStreaming() {
// Mock the OpenAI API streaming response
String streamingResponse =
"""
data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"The"},"finish_reason":null}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" capital"},"finish_reason":null}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" of"},"finish_reason":null}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" France"},"finish_reason":null}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" is"},"finish_reason":null}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":" Paris"},"finish_reason":null}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"."},"finish_reason":null}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: {"id":"chatcmpl-test123","object":"chat.completion.chunk","created":1677652288,"model":"gpt-4o-mini","choices":[],"usage":{"prompt_tokens":20,"completion_tokens":8,"total_tokens":28}}

data: [DONE]

""";

wireMock.stubFor(
post(urlEqualTo("/chat/completions"))
.willReturn(
aResponse()
.withStatus(200)
.withHeader("Content-Type", "text/event-stream")
.withBody(streamingResponse)));

var openTelemetry = (OpenTelemetrySdk) BraintrustTracing.of(config, true);

// Create OpenAI client pointing to WireMock server
OpenAIClient openAIClient =
OpenAIOkHttpClient.builder()
.baseUrl("http://localhost:" + wireMock.getPort())
.apiKey("test-api-key")
.build();

// Wrap with Braintrust instrumentation
openAIClient = BraintrustOpenAI.wrapOpenAI(openTelemetry, openAIClient);

var request =
ChatCompletionCreateParams.builder()
.model(ChatModel.GPT_4O_MINI)
.addSystemMessage("You are a helpful assistant")
.addUserMessage("What is the capital of France?")
.temperature(0.0)
.streamOptions(
ChatCompletionStreamOptions.builder().includeUsage(true).build())
.build();

// Consume the stream
StringBuilder fullResponse = new StringBuilder();
try (var stream = openAIClient.chat().completions().createStreaming(request)) {
stream.stream()
.forEach(
chunk -> {
if (!chunk.choices().isEmpty()) {
chunk.choices()
.get(0)
.delta()
.content()
.ifPresent(fullResponse::append);
}
});
}

// Verify the response
assertEquals("The capital of France is Paris.", fullResponse.toString());
wireMock.verify(1, postRequestedFor(urlEqualTo("/chat/completions")));

// Verify spans were exported
assertTrue(
openTelemetry
.getSdkTracerProvider()
.forceFlush()
.join(10, TimeUnit.SECONDS)
.isSuccess());
var spanData =
getExportedBraintrustSpans().get(config.getBraintrustParentValue().orElseThrow());
assertNotNull(spanData);
assertEquals(1, spanData.size());
var span = spanData.get(0);

// Verify span attributes
assertEquals("openai", span.getAttributes().get(AttributeKey.stringKey("gen_ai.system")));
assertEquals(
"gpt-4o-mini",
span.getAttributes().get(AttributeKey.stringKey("gen_ai.request.model")));
assertEquals(
"gpt-4o-mini",
span.getAttributes().get(AttributeKey.stringKey("gen_ai.response.model")));
assertEquals(
"[stop]",
span.getAttributes()
.get(AttributeKey.stringArrayKey("gen_ai.response.finish_reasons"))
.toString());
assertEquals(
"chat", span.getAttributes().get(AttributeKey.stringKey("gen_ai.operation.name")));
assertEquals(
"chatcmpl-test123",
span.getAttributes().get(AttributeKey.stringKey("gen_ai.response.id")));

// Verify usage metrics were captured
assertEquals(
20L, span.getAttributes().get(AttributeKey.longKey("gen_ai.usage.input_tokens")));
assertEquals(
8L, span.getAttributes().get(AttributeKey.longKey("gen_ai.usage.output_tokens")));

// Verify output JSON was captured
String outputJson =
span.getAttributes().get(AttributeKey.stringKey("braintrust.output_json"));
assertNotNull(outputJson);
var outputMessages = JSON_MAPPER.readTree(outputJson);
assertEquals(1, outputMessages.size());
var messageZero = outputMessages.get(0);
assertEquals("The capital of France is Paris.", messageZero.get("content").asText());
}
}