From 151305b7a17977cbd22a2e6f4f8666620e176b77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Fri, 28 Feb 2025 15:32:40 +0100 Subject: [PATCH 1/5] fix error happening when sqs message attributes are readonly --- .../instrumentation/aws/v1/sqs/SqsInterceptor.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index 4b2e431a6a6..0a38a21a7dc 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -12,6 +12,7 @@ import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; @@ -22,7 +23,9 @@ import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.Map; public class SqsInterceptor extends RequestHandler2 { @@ -42,9 +45,14 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); Context context = newContext(request, queueUrl); + // making a copy of the MessageAttributes before modifying them because they can be stored in + // a kind of ImmutableMap + Map messageAttributes = + new HashMap<>(smRequest.getMessageAttributes()); + dsmPropagator.inject(context, messageAttributes, SETTER); // note: modifying message attributes has to be done before marshalling, otherwise the changes // are not reflected in the actual request (and the MD5 check on send will fail). - dsmPropagator.inject(context, smRequest.getMessageAttributes(), SETTER); + smRequest.setMessageAttributes(messageAttributes); } else if (request instanceof SendMessageBatchRequest) { SendMessageBatchRequest smbRequest = (SendMessageBatchRequest) request; From 63627c20c1afe5150c4723ba677295fa4868dd3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 3 Mar 2025 15:10:24 +0100 Subject: [PATCH 2/5] add test --- .../src/test/groovy/SqsClientTest.groovy | 96 ++++++++++++++----- 1 file changed, 74 insertions(+), 22 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index e2d3961f79e..2c3f370a839 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -7,7 +7,9 @@ import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.sqs.AmazonSQSClientBuilder import com.amazonaws.services.sqs.model.Message import com.amazonaws.services.sqs.model.MessageAttributeValue +import com.amazonaws.services.sqs.model.ReceiveMessageRequest import com.amazonaws.services.sqs.model.SendMessageRequest +import com.google.common.collect.ImmutableMap import datadog.trace.agent.test.naming.VersionedNamingTestBase import datadog.trace.agent.test.utils.TraceUtils import datadog.trace.api.Config @@ -87,9 +89,9 @@ abstract class SqsClientTest extends VersionedNamingTestBase { def "trace details propagated via SQS system message attributes"() { setup: def client = AmazonSQSClientBuilder.standard() - .withEndpointConfiguration(endpoint) - .withCredentials(credentialsProvider) - .build() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() def queueUrl = client.createQueue('somequeue').queueUrl TEST_WRITER.clear() @@ -188,6 +190,56 @@ abstract class SqsClientTest extends VersionedNamingTestBase { client.shutdown() } + @IgnoreIf({ !instance.isDataStreamsEnabled() }) + def "propagation even when message attributes are readonly"() { + setup: + def client = AmazonSQSClientBuilder.standard() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() + def queueUrl = client.createQueue('somequeue').queueUrl + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace('parent', { + def my_attribute = new MessageAttributeValue() + my_attribute.setStringValue("hello world") + my_attribute.setDataType("String") + def readonlyAttributes = ImmutableMap.of("my_key", my_attribute) + def req = new SendMessageRequest(queueUrl, 'sometext') + req.setMessageAttributes(readonlyAttributes) + client.sendMessage(req) + }) + + TEST_DATA_STREAMS_WRITER.waitForGroups(1) + + then: + assertTraces(1) { + trace(2) { + basicSpan(it, "parent") + span { + serviceName expectedService("SQS", "ReceiveMessage") + operationName expectedOperation("SQS", "ReceiveMessage") + resourceName "SQS.SendMessage" + spanType DDSpanTypes.HTTP_CLIENT + errored false + childOf(span(0)) + } + } + } + + and: + def recv = new ReceiveMessageRequest(queueUrl) + recv.withMessageAttributeNames("my_key") + def messages = client.receiveMessage(recv).messages + + assert messages[0].messageAttributes.containsKey("my_key") // what we set initially + assert messages[0].messageAttributes.containsKey("_datadog") // what was injected + + cleanup: + client.shutdown() + } + @IgnoreIf({ instance.isDataStreamsEnabled() }) def "trace details propagated via embedded SQS message attribute (string)"() { setup: @@ -196,8 +248,8 @@ abstract class SqsClientTest extends VersionedNamingTestBase { when: def message = new Message() message.addMessageAttributesEntry('_datadog', new MessageAttributeValue().withDataType('String').withStringValue( - "{\"x-datadog-trace-id\": \"4948377316357291421\", \"x-datadog-parent-id\": \"6746998015037429512\", \"x-datadog-sampling-priority\": \"1\"}" - )) + "{\"x-datadog-trace-id\": \"4948377316357291421\", \"x-datadog-parent-id\": \"6746998015037429512\", \"x-datadog-sampling-priority\": \"1\"}" + )) def messages = new TracingList([message], "http://localhost:${address.port}/000000000000/somequeue") messages.forEach {/* consume to create message spans */ } @@ -237,8 +289,8 @@ abstract class SqsClientTest extends VersionedNamingTestBase { when: def message = new Message() message.addMessageAttributesEntry('_datadog', new MessageAttributeValue().withDataType('Binary').withBinaryValue( - headerValue - )) + headerValue + )) def messages = new TracingList([message], "http://localhost:${address.port}/000000000000/somequeue") messages.forEach {/* consume to create message spans */ } @@ -281,9 +333,9 @@ abstract class SqsClientTest extends VersionedNamingTestBase { def "trace details propagated from SQS to JMS"() { setup: def client = AmazonSQSClientBuilder.standard() - .withEndpointConfiguration(endpoint) - .withCredentials(credentialsProvider) - .build() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() def connectionFactory = new SQSConnectionFactory(new ProviderConfiguration(), client) def connection = connectionFactory.createConnection() @@ -295,12 +347,12 @@ abstract class SqsClientTest extends VersionedNamingTestBase { when: def ddMsgAttribute = new MessageAttributeValue() - .withBinaryValue(ByteBuffer.wrap("hello world".getBytes(Charset.defaultCharset()))) - .withDataType("Binary") + .withBinaryValue(ByteBuffer.wrap("hello world".getBytes(Charset.defaultCharset()))) + .withDataType("Binary") connection.start() TraceUtils.runUnderTrace('parent') { client.sendMessage(new SendMessageRequest(queue.queueUrl, 'sometext') - .withMessageAttributes([_datadog: ddMsgAttribute])) + .withMessageAttributes([_datadog: ddMsgAttribute])) } def message = consumer.receive() consumer.receiveNoWait() @@ -558,9 +610,9 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest { def "Data streams context extracted from message body"() { setup: def client = AmazonSQSClientBuilder.standard() - .withEndpointConfiguration(endpoint) - .withCredentials(credentialsProvider) - .build() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() def queueUrl = client.createQueue('somequeue').queueUrl TEST_WRITER.clear() @@ -588,9 +640,9 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest { def "Data streams context not extracted from message body when message attributes are not present"() { setup: def client = AmazonSQSClientBuilder.standard() - .withEndpointConfiguration(endpoint) - .withCredentials(credentialsProvider) - .build() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() def queueUrl = client.createQueue('somequeue').queueUrl TEST_WRITER.clear() @@ -619,9 +671,9 @@ class SqsClientV1DataStreamsForkedTest extends SqsClientTest { def "Data streams context not extracted from message body when message is not a Json"() { setup: def client = AmazonSQSClientBuilder.standard() - .withEndpointConfiguration(endpoint) - .withCredentials(credentialsProvider) - .build() + .withEndpointConfiguration(endpoint) + .withCredentials(credentialsProvider) + .build() def queueUrl = client.createQueue('somequeue').queueUrl TEST_WRITER.clear() From 3f9489ae059d1933535ee9f028d03f33a710c383 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 3 Mar 2025 15:35:48 +0100 Subject: [PATCH 3/5] same for batch requests --- .../trace/instrumentation/aws/v1/sqs/SqsInterceptor.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index 0a38a21a7dc..76c356506cb 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -62,7 +62,10 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request Propagator dsmPropagator = Propagators.forConcern(DSM_CONCERN); Context context = newContext(request, queueUrl); for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) { - dsmPropagator.inject(context, entry.getMessageAttributes(), SETTER); + Map messageAttributes = + new HashMap<>(entry.getMessageAttributes()); + dsmPropagator.inject(context, messageAttributes, SETTER); + entry.setMessageAttributes(messageAttributes); } } else if (request instanceof ReceiveMessageRequest) { ReceiveMessageRequest rmRequest = (ReceiveMessageRequest) request; From 24a09e008d53a1b27ed7d98796398e446077e760 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 3 Mar 2025 15:38:30 +0100 Subject: [PATCH 4/5] same for receive message requests --- .../trace/instrumentation/aws/v1/sqs/SqsInterceptor.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java index 76c356506cb..4b353f12591 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sqs/SqsInterceptor.java @@ -23,8 +23,10 @@ import datadog.trace.api.datastreams.DataStreamsContext; import datadog.trace.bootstrap.ContextStore; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; public class SqsInterceptor extends RequestHandler2 { @@ -71,7 +73,9 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request ReceiveMessageRequest rmRequest = (ReceiveMessageRequest) request; if (rmRequest.getMessageAttributeNames().size() < 10 && !rmRequest.getMessageAttributeNames().contains(DATADOG_KEY)) { - rmRequest.getMessageAttributeNames().add(DATADOG_KEY); + List attributeNames = new ArrayList<>(rmRequest.getMessageAttributeNames()); + attributeNames.add(DATADOG_KEY); + rmRequest.setMessageAttributeNames(attributeNames); } } return request; From 8b7286667a399afe708185b5ccc8d1a933e24bd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Vandon?= Date: Mon, 3 Mar 2025 16:50:46 +0100 Subject: [PATCH 5/5] fix test assertion --- .../aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy index 2c3f370a839..9ae956e5068 100644 --- a/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sqs-1.0/src/test/groovy/SqsClientTest.groovy @@ -218,8 +218,8 @@ abstract class SqsClientTest extends VersionedNamingTestBase { trace(2) { basicSpan(it, "parent") span { - serviceName expectedService("SQS", "ReceiveMessage") - operationName expectedOperation("SQS", "ReceiveMessage") + serviceName expectedService("SQS", "SendMessage") + operationName expectedOperation("SQS", "SendMessage") resourceName "SQS.SendMessage" spanType DDSpanTypes.HTTP_CLIENT errored false