diff --git a/dd-java-agent/agent-jmxfetch/integrations-core b/dd-java-agent/agent-jmxfetch/integrations-core index 3189af0e0ae..5240f2a7cdc 160000 --- a/dd-java-agent/agent-jmxfetch/integrations-core +++ b/dd-java-agent/agent-jmxfetch/integrations-core @@ -1 +1 @@ -Subproject commit 3189af0e0ae840c9a4bab3131662c7fd6b0de7fb +Subproject commit 5240f2a7cdcabc6ae7787b9191b9189438671f3e diff --git a/dd-trace-api/build.gradle b/dd-trace-api/build.gradle index 4175700c6ce..bbf7d4a0953 100644 --- a/dd-trace-api/build.gradle +++ b/dd-trace-api/build.gradle @@ -51,6 +51,7 @@ excludedClassesCoverage += [ 'datadog.trace.context.NoopTraceScope', 'datadog.trace.payloadtags.PayloadTagsData', 'datadog.trace.payloadtags.PayloadTagsData.PathAndValue', + 'datadog.trace.api.llmobs.LLMObsTags', ] description = 'dd-trace-api' diff --git a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java index 04f9738c643..4e41df93738 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/DDSpanTypes.java @@ -36,6 +36,7 @@ public class DDSpanTypes { public static final String PROTOBUF = "protobuf"; public static final String MULE = "mule"; + public static final String VALKEY = "valkey"; public static final String WEBSOCKET = "websocket"; diff --git a/dd-trace-core/build.gradle b/dd-trace-core/build.gradle index cfc50ded09b..409477bac3f 100644 --- a/dd-trace-core/build.gradle +++ b/dd-trace-core/build.gradle @@ -37,6 +37,8 @@ excludedClassesCoverage += [ // Interface with an empty defender method 'datadog.trace.core.propagation.HttpCodec.Extractor', 'datadog.trace.core.flare.*', + 'datadog.trace.llmobs.writer.ddintake.LLMObsSpanMapper', + 'datadog.trace.llmobs.writer.ddintake.LLMObsSpanMapper.PayloadV1', // FIXME(DSM): test coverage needed 'datadog.trace.core.datastreams.DataStreamContextInjector', 'datadog.trace.common.sampling.TraceSamplingRules.RuleAdapter', diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java index dab97edd63a..baa55d5dbf1 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java @@ -68,7 +68,8 @@ public static Writer createWriter( if (!DD_AGENT_WRITER_TYPE.equals(configuredType) && !DD_INTAKE_WRITER_TYPE.equals(configuredType)) { log.warn( - "Writer type not configured correctly: Type {} not recognized. Ignoring", configuredType); + "Writer type not configured correctly: Type {} not recognized. Ignoring ", + configuredType); configuredType = datadog.trace.api.ConfigDefaults.DEFAULT_AGENT_WRITER_TYPE; } @@ -84,7 +85,7 @@ public static Writer createWriter( // The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is // enabled, check if we can use the IntakeWriter instead. - if (DD_AGENT_WRITER_TYPE.equals(configuredType) && config.isCiVisibilityEnabled()) { + if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled())) { if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) { configuredType = DD_INTAKE_WRITER_TYPE; } else { @@ -92,6 +93,14 @@ public static Writer createWriter( "CI Visibility functionality is limited. Please upgrade to Agent v6.40+ or v7.40+ or enable Agentless mode."); } } + if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isLlmObsEnabled())) { + if (featuresDiscovery.supportsEvpProxy() || config.isLlmObsAgentlessEnabled()) { + configuredType = DD_INTAKE_WRITER_TYPE; + } else { + log.info("LLM Observability functionality is limited."); + // TODO: add supported agent version to this log line for llm obs + } + } RemoteWriter remoteWriter; if (DD_INTAKE_WRITER_TYPE.equals(configuredType)) { @@ -117,7 +126,11 @@ public static Writer createWriter( createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.CITESTCOV); builder.addTrack(TrackType.CITESTCOV, coverageApi); } - + if (config.isLlmObsEnabled()) { + final RemoteApi llmobsApi = + createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.LLMOBS); + builder.addTrack(TrackType.LLMOBS, llmobsApi); + } remoteWriter = builder.build(); } else { // configuredType == DDAgentWriter @@ -173,7 +186,14 @@ private static RemoteApi createDDIntakeRemoteApi( SharedCommunicationObjects commObjects, DDAgentFeaturesDiscovery featuresDiscovery, TrackType trackType) { - if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled()) { + boolean evpProxySupported = featuresDiscovery.supportsEvpProxy(); + boolean useProxyApi = + (evpProxySupported && TrackType.LLMOBS == trackType && !config.isLlmObsAgentlessEnabled()) + || (evpProxySupported + && (TrackType.CITESTCOV == trackType || TrackType.CITESTCYCLE == trackType) + && !config.isCiVisibilityAgentlessEnabled()); + + if (useProxyApi) { return DDEvpProxyApi.builder() .httpClient(commObjects.okHttpClient) .agentUrl(commObjects.agentUrl) @@ -181,12 +201,19 @@ private static RemoteApi createDDIntakeRemoteApi( .trackType(trackType) .compressionEnabled(featuresDiscovery.supportsContentEncodingHeadersWithEvpProxy()) .build(); - } else { HttpUrl hostUrl = null; + String llmObsAgentlessUrl = config.getLlMObsAgentlessUrl(); + if (config.getCiVisibilityAgentlessUrl() != null) { hostUrl = HttpUrl.get(config.getCiVisibilityAgentlessUrl()); log.info("Using host URL '{}' to report CI Visibility traces in Agentless mode.", hostUrl); + } else if (config.isLlmObsEnabled() + && config.isLlmObsAgentlessEnabled() + && llmObsAgentlessUrl != null + && !llmObsAgentlessUrl.isEmpty()) { + hostUrl = HttpUrl.get(llmObsAgentlessUrl); + log.info("Using host URL '{}' to report LLM Obs traces in Agentless mode.", hostUrl); } return DDIntakeApi.builder() .hostUrl(hostUrl) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java index 6cc38f8a3a6..5123e6fe06e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDIntakeMapperDiscovery.java @@ -6,6 +6,7 @@ import datadog.trace.civisibility.writer.ddintake.CiTestCycleMapperV1; import datadog.trace.common.writer.RemoteMapper; import datadog.trace.common.writer.RemoteMapperDiscovery; +import datadog.trace.llmobs.writer.ddintake.LLMObsSpanMapper; /** * Mapper discovery logic when a DDIntake is used. The mapper is discovered based on a backend @@ -40,6 +41,8 @@ public void discover() { mapper = new CiTestCycleMapperV1(wellKnownTags, compressionEnabled); } else if (TrackType.CITESTCOV.equals(trackType)) { mapper = new CiTestCovMapperV2(compressionEnabled); + } else if (TrackType.LLMOBS.equals(trackType)) { + mapper = new LLMObsSpanMapper(); } else { mapper = RemoteMapper.NO_OP; } diff --git a/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java new file mode 100644 index 00000000000..1d832bf3523 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapper.java @@ -0,0 +1,381 @@ +package datadog.trace.llmobs.writer.ddintake; + +import static datadog.communication.http.OkHttpUtils.gzippedMsgpackRequestBodyOf; + +import datadog.communication.serialization.Writable; +import datadog.trace.api.DDTags; +import datadog.trace.api.intake.TrackType; +import datadog.trace.api.llmobs.LLMObs; +import datadog.trace.api.llmobs.LLMObsTags; +import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.common.writer.Payload; +import datadog.trace.common.writer.RemoteMapper; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.Metadata; +import datadog.trace.core.MetadataConsumer; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import okhttp3.RequestBody; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LLMObsSpanMapper implements RemoteMapper { + + // Well known tags for LLM obs will be prefixed with _ml_obs_(tags|metrics). + // Prefix for tags + private static final String LLMOBS_TAG_PREFIX = "_ml_obs_tag."; + // Prefix for metrics + private static final String LLMOBS_METRIC_PREFIX = "_ml_obs_metric."; + + // internal tags to be prefixed + private static final String INPUT = "input"; + private static final String OUTPUT = "output"; + private static final String SPAN_KIND_TAG_KEY = LLMOBS_TAG_PREFIX + Tags.SPAN_KIND; + + private static final Logger LOGGER = LoggerFactory.getLogger(LLMObsSpanMapper.class); + + private static final byte[] STAGE = "_dd.stage".getBytes(StandardCharsets.UTF_8); + private static final byte[] EVENT_TYPE = "event_type".getBytes(StandardCharsets.UTF_8); + + private static final byte[] SPAN_ID = "span_id".getBytes(StandardCharsets.UTF_8); + private static final byte[] TRACE_ID = "trace_id".getBytes(StandardCharsets.UTF_8); + private static final byte[] PARENT_ID = "parent_id".getBytes(StandardCharsets.UTF_8); + private static final byte[] NAME = "name".getBytes(StandardCharsets.UTF_8); + private static final byte[] DURATION = "duration".getBytes(StandardCharsets.UTF_8); + private static final byte[] START_NS = "start_ns".getBytes(StandardCharsets.UTF_8); + private static final byte[] STATUS = "status".getBytes(StandardCharsets.UTF_8); + private static final byte[] ERROR = "error".getBytes(StandardCharsets.UTF_8); + + private static final byte[] META = "meta".getBytes(StandardCharsets.UTF_8); + private static final byte[] METADATA = "metadata".getBytes(StandardCharsets.UTF_8); + private static final byte[] SPAN_KIND = "span.kind".getBytes(StandardCharsets.UTF_8); + private static final byte[] SPANS = "spans".getBytes(StandardCharsets.UTF_8); + private static final byte[] METRICS = "metrics".getBytes(StandardCharsets.UTF_8); + private static final byte[] TAGS = "tags".getBytes(StandardCharsets.UTF_8); + + private static final byte[] LLM_MESSAGE_ROLE = "role".getBytes(StandardCharsets.UTF_8); + private static final byte[] LLM_MESSAGE_CONTENT = "content".getBytes(StandardCharsets.UTF_8); + private static final byte[] LLM_MESSAGE_TOOL_CALLS = + "tool_calls".getBytes(StandardCharsets.UTF_8); + + private static final byte[] LLM_TOOL_CALL_NAME = "name".getBytes(StandardCharsets.UTF_8); + private static final byte[] LLM_TOOL_CALL_TYPE = "type".getBytes(StandardCharsets.UTF_8); + private static final byte[] LLM_TOOL_CALL_TOOL_ID = "tool_id".getBytes(StandardCharsets.UTF_8); + private static final byte[] LLM_TOOL_CALL_ARGUMENTS = + "arguments".getBytes(StandardCharsets.UTF_8); + + private final LLMObsSpanMapper.MetaWriter metaWriter = new MetaWriter(); + private final int size; + + public LLMObsSpanMapper() { + this(5 << 20); + } + + private LLMObsSpanMapper(int size) { + this.size = size; + } + + @Override + public void map(List> trace, Writable writable) { + List> llmobsSpans = + trace.stream().filter(LLMObsSpanMapper::isLLMObsSpan).collect(Collectors.toList()); + + writable.startMap(3); + + writable.writeUTF8(EVENT_TYPE); + writable.writeString("span", null); + + writable.writeUTF8(STAGE); + writable.writeString("raw", null); + + writable.writeUTF8(SPANS); + writable.startArray(llmobsSpans.size()); + for (CoreSpan span : llmobsSpans) { + writable.startMap(11); + // 1 + writable.writeUTF8(SPAN_ID); + writable.writeString(String.valueOf(span.getSpanId()), null); + + // 2 + writable.writeUTF8(TRACE_ID); + writable.writeString(span.getTraceId().toHexString(), null); + + // 3 + writable.writeUTF8(PARENT_ID); + // TODO fix after parent ID tracking is in place + writable.writeString("undefined", null); + + // 4 + writable.writeUTF8(NAME); + writable.writeString(span.getOperationName(), null); + + // 5 + writable.writeUTF8(START_NS); + writable.writeUnsignedLong(span.getStartTime()); + + // 6 + writable.writeUTF8(DURATION); + writable.writeFloat(span.getDurationNano()); + + // 7 + writable.writeUTF8(ERROR); + writable.writeInt(span.getError()); + + boolean errored = span.getError() == 1; + + // 8 + writable.writeUTF8(STATUS); + writable.writeString(errored ? "error" : "ok", null); + + /* 9 (metrics), 10 (tags), 11 meta */ + span.processTagsAndBaggage(metaWriter.withWritable(writable, getErrorsMap(span))); + } + } + + private static boolean isLLMObsSpan(CoreSpan span) { + CharSequence type = span.getType(); + return type != null && type.toString().contentEquals(InternalSpanTypes.LLMOBS); + } + + @Override + public Payload newPayload() { + return new PayloadV1(); + } + + @Override + public int messageBufferSize() { + return size; + } + + @Override + public void reset() {} + + @Override + public String endpoint() { + return TrackType.LLMOBS + "/v2"; + } + + private static Map getErrorsMap(CoreSpan span) { + Map errors = new HashMap<>(); + String errorMsg = span.getTag(DDTags.ERROR_MSG); + if (errorMsg != null && !errorMsg.isEmpty()) { + errors.put(DDTags.ERROR_MSG, errorMsg); + } + String errorType = span.getTag(DDTags.ERROR_TYPE); + if (errorType != null && !errorType.isEmpty()) { + errors.put(DDTags.ERROR_TYPE, errorType); + } + String errorStack = span.getTag(DDTags.ERROR_STACK); + if (errorStack != null && !errorStack.isEmpty()) { + errors.put(DDTags.ERROR_STACK, errorStack); + } + return errors; + } + + private static final class MetaWriter implements MetadataConsumer { + + private Writable writable; + private Map errorInfo; + + private static final Set TAGS_FOR_REMAPPING = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + LLMOBS_TAG_PREFIX + INPUT, + LLMOBS_TAG_PREFIX + OUTPUT, + LLMOBS_TAG_PREFIX + LLMObsTags.MODEL_NAME, + LLMOBS_TAG_PREFIX + LLMObsTags.MODEL_PROVIDER, + LLMOBS_TAG_PREFIX + LLMObsTags.MODEL_VERSION, + LLMOBS_TAG_PREFIX + LLMObsTags.METADATA))); + + LLMObsSpanMapper.MetaWriter withWritable(Writable writable, Map errorInfo) { + this.writable = writable; + this.errorInfo = errorInfo; + return this; + } + + @Override + public void accept(Metadata metadata) { + Map tagsToRemapToMeta = new HashMap<>(); + int metricsSize = 0, tagsSize = 0; + String spanKind = "unknown"; + for (Map.Entry tag : metadata.getTags().entrySet()) { + String key = tag.getKey(); + if (key.equals(SPAN_KIND_TAG_KEY)) { + spanKind = String.valueOf(tag.getValue()); + } else if (TAGS_FOR_REMAPPING.contains(key)) { + tagsToRemapToMeta.put(key, tag.getValue()); + } else if (key.startsWith(LLMOBS_METRIC_PREFIX) && tag.getValue() instanceof Number) { + ++metricsSize; + } else if (key.startsWith(LLMOBS_TAG_PREFIX)) { + if (key.startsWith(LLMOBS_TAG_PREFIX)) { + key = key.substring(LLMOBS_TAG_PREFIX.length()); + } + if (TAGS_FOR_REMAPPING.contains(key)) { + tagsToRemapToMeta.put(key, tag.getValue()); + } else { + ++tagsSize; + } + } + } + + if (!spanKind.equals("unknown")) { + metadata.getTags().remove(SPAN_KIND_TAG_KEY); + } else { + LOGGER.warn("missing span kind"); + } + + // write metrics (9) + writable.writeUTF8(METRICS); + writable.startMap(metricsSize); + for (Map.Entry tag : metadata.getTags().entrySet()) { + String tagKey = tag.getKey(); + if (tagKey.startsWith(LLMOBS_METRIC_PREFIX) && tag.getValue() instanceof Number) { + writable.writeString(tagKey.substring(LLMOBS_METRIC_PREFIX.length()), null); + writable.writeObject(tag.getValue(), null); + } + } + + // write tags (10) + writable.writeUTF8(TAGS); + writable.startArray(tagsSize + 1); + writable.writeString("language:jvm", null); + for (Map.Entry tag : metadata.getTags().entrySet()) { + String key = tag.getKey(); + Object value = tag.getValue(); + if (!tagsToRemapToMeta.containsKey(key) && key.startsWith(LLMOBS_TAG_PREFIX)) { + writable.writeObject(key.substring(LLMOBS_TAG_PREFIX.length()) + ":" + value, null); + } + } + + // write meta (11) + int metaSize = tagsToRemapToMeta.size() + 1 + (null != errorInfo ? errorInfo.size() : 0); + writable.writeUTF8(META); + writable.startMap(metaSize); + writable.writeUTF8(SPAN_KIND); + writable.writeString(spanKind, null); + + for (Map.Entry error : errorInfo.entrySet()) { + writable.writeUTF8(error.getKey().getBytes()); + writable.writeString(error.getValue(), null); + } + + for (Map.Entry tag : tagsToRemapToMeta.entrySet()) { + String key = tag.getKey().substring(LLMOBS_TAG_PREFIX.length()); + Object val = tag.getValue(); + if (key.equals(INPUT) || key.equals(OUTPUT)) { + if (!spanKind.equals(Tags.LLMOBS_LLM_SPAN_KIND)) { + key += ".value"; + writable.writeString(key, null); + writable.writeObject(val, null); + } else { + if (!(val instanceof List)) { + LOGGER.warn( + "unexpectedly found incorrect type for LLM span IO {}, expecting list", + val.getClass().getName()); + continue; + } + // llm span kind must have llm objects + List messages = (List) val; + key += ".messages"; + writable.writeString(key, null); + writable.startArray(messages.size()); + for (LLMObs.LLMMessage message : messages) { + List toolCalls = message.getToolCalls(); + boolean hasToolCalls = null != toolCalls && !toolCalls.isEmpty(); + writable.startMap(hasToolCalls ? 3 : 2); + writable.writeUTF8(LLM_MESSAGE_ROLE); + writable.writeString(message.getRole(), null); + writable.writeUTF8(LLM_MESSAGE_CONTENT); + writable.writeString(message.getContent(), null); + if (hasToolCalls) { + writable.writeUTF8(LLM_MESSAGE_TOOL_CALLS); + writable.startArray(toolCalls.size()); + for (LLMObs.ToolCall toolCall : toolCalls) { + Map arguments = toolCall.getArguments(); + boolean hasArguments = null != arguments && !arguments.isEmpty(); + writable.startMap(hasArguments ? 4 : 3); + writable.writeUTF8(LLM_TOOL_CALL_NAME); + writable.writeString(toolCall.getName(), null); + writable.writeUTF8(LLM_TOOL_CALL_TYPE); + writable.writeString(toolCall.getType(), null); + writable.writeUTF8(LLM_TOOL_CALL_TOOL_ID); + writable.writeString(toolCall.getToolId(), null); + if (hasArguments) { + writable.writeUTF8(LLM_TOOL_CALL_ARGUMENTS); + writable.startMap(arguments.size()); + for (Map.Entry argument : arguments.entrySet()) { + writable.writeString(argument.getKey(), null); + writable.writeObject(argument.getValue(), null); + } + } + } + } + } + } + } else if (key.equals(LLMObsTags.METADATA) && val instanceof Map) { + Map metadataMap = (Map) val; + writable.writeUTF8(METADATA); + writable.startMap(metadataMap.size()); + for (Map.Entry entry : metadataMap.entrySet()) { + writable.writeString(entry.getKey(), null); + writable.writeObject(entry.getValue(), null); + } + } else { + writable.writeString(key, null); + writable.writeObject(val, null); + } + } + } + } + + private static class PayloadV1 extends Payload { + + @Override + public int sizeInBytes() { + if (traceCount() == 0) { + return msgpackMapHeaderSize(0); + } + + return body.array().length; + } + + @Override + public void writeTo(WritableByteChannel channel) throws IOException { + // If traceCount is 0, we write a map with 0 elements in MsgPack format. + if (traceCount() == 0) { + ByteBuffer emptyDict = msgpackMapHeader(0); + while (emptyDict.hasRemaining()) { + channel.write(emptyDict); + } + } else { + while (body.hasRemaining()) { + channel.write(body); + } + } + } + + @Override + public RequestBody toRequest() { + List buffers; + if (traceCount() == 0) { + buffers = Collections.singletonList(msgpackMapHeader(0)); + } else { + buffers = Collections.singletonList(body); + } + + return gzippedMsgpackRequestBodyOf(buffers); + } + } +} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapperTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapperTest.groovy new file mode 100644 index 00000000000..8a161ed3141 --- /dev/null +++ b/dd-trace-core/src/test/groovy/datadog/trace/llmobs/writer/ddintake/LLMObsSpanMapperTest.groovy @@ -0,0 +1,121 @@ +package datadog.trace.llmobs.writer.ddintake + +import com.fasterxml.jackson.databind.ObjectMapper +import datadog.communication.serialization.ByteBufferConsumer +import datadog.communication.serialization.FlushingBuffer +import datadog.communication.serialization.msgpack.MsgPackWriter +import datadog.trace.api.llmobs.LLMObs +import datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.common.writer.ListWriter +import datadog.trace.core.test.DDCoreSpecification +import org.msgpack.jackson.dataformat.MessagePackFactory +import spock.lang.Shared + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +class LLMObsSpanMapperTest extends DDCoreSpecification { + + @Shared + ObjectMapper objectMapper = new ObjectMapper(new MessagePackFactory()) + + def "test LLMObsSpanMapper serialization"() { + setup: + def mapper = new LLMObsSpanMapper() + def tracer = tracerBuilder().writer(new ListWriter()).build() + + + // Create a real LLMObs span using the tracer + def llmSpan = tracer.buildSpan("chat-completion") + .withTag("_ml_obs_tag.span.kind", Tags.LLMOBS_LLM_SPAN_KIND) + .withTag("_ml_obs_tag.model_name", "gpt-4") + .withTag("_ml_obs_tag.model_provider", "openai") + .withTag("_ml_obs_metric.input_tokens", 50) + .withTag("_ml_obs_metric.output_tokens", 25) + .withTag("_ml_obs_metric.total_tokens", 75) + .start() + + llmSpan.setSpanType(InternalSpanTypes.LLMOBS) + + def inputMessages = [LLMObs.LLMMessage.from("user", "Hello, what's the weather like?")] + def outputMessages = [LLMObs.LLMMessage.from("assistant", "I'll help you check the weather.")] + llmSpan.setTag("_ml_obs_tag.input", inputMessages) + llmSpan.setTag("_ml_obs_tag.output", outputMessages) + llmSpan.setTag("_ml_obs_tag.metadata", [temperature: 0.7, max_tokens: 100]) + + llmSpan.finish() + + def trace = [llmSpan] + CapturingByteBufferConsumer sink = new CapturingByteBufferConsumer() + MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(1024, sink)) + + when: + packer.format(trace, mapper) + packer.flush() + + then: + sink.captured != null + def payload = mapper.newPayload() + payload.withBody(1, sink.captured) + def channel = new ByteArrayOutputStream() + payload.writeTo(new WritableByteChannel() { + @Override + int write(ByteBuffer src) throws IOException { + def bytes = new byte[src.remaining()] + src.get(bytes) + channel.write(bytes) + return bytes.length + } + + @Override + boolean isOpen() { return true } + + @Override + void close() throws IOException { } + }) + def result = objectMapper.readValue(channel.toByteArray(), Map) + + then: + result.containsKey("event_type") + result["event_type"] == "span" + result.containsKey("_dd.stage") + result["_dd.stage"] == "raw" + result.containsKey("spans") + result["spans"] instanceof List + result["spans"].size() == 1 + + def spanData = result["spans"][0] + spanData["name"] == "chat-completion" + spanData.containsKey("span_id") + spanData.containsKey("trace_id") + spanData.containsKey("start_ns") + spanData.containsKey("duration") + spanData["error"] == 0 + + spanData.containsKey("meta") + spanData["meta"]["span.kind"] == "llm" + spanData["meta"].containsKey("input.messages") + spanData["meta"].containsKey("output.messages") + spanData["meta"].containsKey("metadata") + + spanData.containsKey("metrics") + spanData["metrics"]["input_tokens"] == 50.0 + spanData["metrics"]["output_tokens"] == 25.0 + spanData["metrics"]["total_tokens"] == 75.0 + + spanData.containsKey("tags") + spanData["tags"].contains("language:jvm") + } + + static class CapturingByteBufferConsumer implements ByteBufferConsumer { + + ByteBuffer captured + + @Override + void accept(int messageCount, ByteBuffer buffer) { + captured = buffer + } + } + +} diff --git a/internal-api/src/main/java/datadog/trace/api/civisibility/telemetry/tag/Endpoint.java b/internal-api/src/main/java/datadog/trace/api/civisibility/telemetry/tag/Endpoint.java index 6c2d9b6b25c..529644493b2 100644 --- a/internal-api/src/main/java/datadog/trace/api/civisibility/telemetry/tag/Endpoint.java +++ b/internal-api/src/main/java/datadog/trace/api/civisibility/telemetry/tag/Endpoint.java @@ -5,7 +5,8 @@ /** The type of endpoint where a request is sent */ public enum Endpoint implements TagValue { TEST_CYCLE, - CODE_COVERAGE; + CODE_COVERAGE, + LLMOBS; // TODO this is probably not right, need to probably move this enum to a common package? private final String s; diff --git a/internal-api/src/main/java/datadog/trace/api/intake/TrackType.java b/internal-api/src/main/java/datadog/trace/api/intake/TrackType.java index 4d99f0655a8..e50f46b6f13 100644 --- a/internal-api/src/main/java/datadog/trace/api/intake/TrackType.java +++ b/internal-api/src/main/java/datadog/trace/api/intake/TrackType.java @@ -6,6 +6,7 @@ public enum TrackType { CITESTCYCLE(Endpoint.TEST_CYCLE), CITESTCOV(Endpoint.CODE_COVERAGE), + LLMOBS(Endpoint.LLMOBS), NOOP(null); @Nullable public final Endpoint endpoint; diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InternalSpanTypes.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InternalSpanTypes.java index c67cfe6b6e4..6682a98f8b5 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InternalSpanTypes.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/InternalSpanTypes.java @@ -46,6 +46,8 @@ public class InternalSpanTypes { UTF8BytesString.create(DDSpanTypes.VULNERABILITY); public static final UTF8BytesString PROTOBUF = UTF8BytesString.create(DDSpanTypes.PROTOBUF); + public static final UTF8BytesString LLMOBS = UTF8BytesString.create(DDSpanTypes.LLMOBS); + public static final UTF8BytesString TIBCO_BW = UTF8BytesString.create("tibco_bw"); public static final UTF8BytesString MULE = UTF8BytesString.create(DDSpanTypes.MULE); public static final CharSequence VALKEY = UTF8BytesString.create(DDSpanTypes.VALKEY);