diff --git a/CHANGES.md b/CHANGES.md index 376dbd9ee2..ab70ca9621 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -16,6 +16,7 @@ Release Notes. * Report the agent version to OAP as an instance attribute * Polish jedis-4.x-plugin to change command to lowercase, which is consistent with jedis-2.x-3.x-plugin * Add micronauthttpclient,micronauthttpserver,memcached,ehcache,guavacache,jedis,redisson plugin config properties to agent.config +* Add tags `mq.message.keys` and `mq.message.tags` for RocketMQ producer span #### Documentation diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java index 2313bcf49d..42898d3577 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java @@ -59,6 +59,15 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER); Tags.MQ_BROKER.set(span, (String) allArguments[0]); Tags.MQ_TOPIC.set(span, message.getTopic()); + String keys = message.getKeys(); + if (StringUtil.isNotBlank(keys)) { + span.tag(Tags.ofKey("mq.message.keys"), keys); + } + String tags = message.getTags(); + if (StringUtil.isNotBlank(tags)) { + span.tag(Tags.ofKey("mq.message.tags"), tags); + } + contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(span); diff --git a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java index b27e438f6b..acb175b747 100644 --- a/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java @@ -59,6 +59,15 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER); Tags.MQ_BROKER.set(span, (String) allArguments[0]); Tags.MQ_TOPIC.set(span, message.getTopic()); + String keys = message.getKeys(); + if (StringUtil.isNotBlank(keys)) { + span.tag(Tags.ofKey("mq.message.keys"), keys); + } + String tags = message.getTags(); + if (StringUtil.isNotBlank(tags)) { + span.tag(Tags.ofKey("mq.message.tags"), tags); + } + contextCarrier.extensionInjector().injectSendingTimestamp(); SpanLayer.asMQ(span); diff --git a/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml b/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml index 3d669b5f2d..9eeff18822 100644 --- a/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml +++ b/test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml @@ -32,6 +32,8 @@ segmentItems: tags: - {key: mq.broker, value: not null} - {key: mq.topic, value: TopicTest} + - {key: mq.message.keys, value: KeyA} + - {key: mq.message.tags, value: TagA} skipAnalysis: 'false' - operationName: GET:/case/rocketmq-scenario parentSpanId: -1 diff --git a/test/plugin/scenarios/rocketmq-scenario/pom.xml b/test/plugin/scenarios/rocketmq-scenario/pom.xml index 71cec225a5..91358a0001 100644 --- a/test/plugin/scenarios/rocketmq-scenario/pom.xml +++ b/test/plugin/scenarios/rocketmq-scenario/pom.xml @@ -30,6 +30,7 @@ UTF-8 1.8 + 3.8.1 4.9.4 2.1.6.RELEASE 1.18.20 @@ -97,6 +98,7 @@ maven-compiler-plugin + ${maven-compiler-plugin.version} ${compiler.version} ${compiler.version} diff --git a/test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller/CaseController.java b/test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller/CaseController.java index 3ce019c59d..ab96fd6a7c 100644 --- a/test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller/CaseController.java +++ b/test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller/CaseController.java @@ -59,9 +59,10 @@ public String testcase() { // send msg Message msg = new Message("TopicTest", - "TagA", ("Hello RocketMQ sendMsg " + new Date()).getBytes(RemotingHelper.DEFAULT_CHARSET) ); + msg.setTags("TagA"); + msg.setKeys("KeyA"); SendResult sendResult = producer.send(msg); System.out.printf("%s send msg: %s%n", new Date(), sendResult); @@ -102,14 +103,6 @@ public String healthCheck() throws Exception { producer.setNamesrvAddr(namerServer); producer.start(); System.out.printf("HealthCheck Provider Started.%n"); - - // send msg - Message msg = new Message("HealthCheckTopicTest", - "TagA", - ("Hello RocketMQ sendMsg " + new Date()).getBytes(RemotingHelper.DEFAULT_CHARSET) - ); - SendResult sendResult = producer.send(msg); - System.out.printf("healthCheck %s send msg: %s%n", new Date(), sendResult); return SUCCESS; }