Skip to content

Commit a4c3c85

Browse files
authored
Add tags mq.message.keys and mq.message.tags for RocketMQ producer span (#404)
1 parent 364a4af commit a4c3c85

6 files changed

Lines changed: 25 additions & 9 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Release Notes.
1616
* Report the agent version to OAP as an instance attribute
1717
* Polish jedis-4.x-plugin to change command to lowercase, which is consistent with jedis-2.x-3.x-plugin
1818
* Add micronauthttpclient,micronauthttpserver,memcached,ehcache,guavacache,jedis,redisson plugin config properties to agent.config
19+
* Add tags `mq.message.keys` and `mq.message.tags` for RocketMQ producer span
1920

2021
#### Documentation
2122

apm-sniffer/apm-sdk-plugin/rocketMQ-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v3/MessageSendInterceptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
5959
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
6060
Tags.MQ_BROKER.set(span, (String) allArguments[0]);
6161
Tags.MQ_TOPIC.set(span, message.getTopic());
62+
String keys = message.getKeys();
63+
if (StringUtil.isNotBlank(keys)) {
64+
span.tag(Tags.ofKey("mq.message.keys"), keys);
65+
}
66+
String tags = message.getTags();
67+
if (StringUtil.isNotBlank(tags)) {
68+
span.tag(Tags.ofKey("mq.message.tags"), tags);
69+
}
70+
6271
contextCarrier.extensionInjector().injectSendingTimestamp();
6372
SpanLayer.asMQ(span);
6473

apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/MessageSendInterceptor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
5959
span.setComponent(ComponentsDefine.ROCKET_MQ_PRODUCER);
6060
Tags.MQ_BROKER.set(span, (String) allArguments[0]);
6161
Tags.MQ_TOPIC.set(span, message.getTopic());
62+
String keys = message.getKeys();
63+
if (StringUtil.isNotBlank(keys)) {
64+
span.tag(Tags.ofKey("mq.message.keys"), keys);
65+
}
66+
String tags = message.getTags();
67+
if (StringUtil.isNotBlank(tags)) {
68+
span.tag(Tags.ofKey("mq.message.tags"), tags);
69+
}
70+
6271
contextCarrier.extensionInjector().injectSendingTimestamp();
6372
SpanLayer.asMQ(span);
6473

test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ segmentItems:
3232
tags:
3333
- {key: mq.broker, value: not null}
3434
- {key: mq.topic, value: TopicTest}
35+
- {key: mq.message.keys, value: KeyA}
36+
- {key: mq.message.tags, value: TagA}
3537
skipAnalysis: 'false'
3638
- operationName: GET:/case/rocketmq-scenario
3739
parentSpanId: -1

test/plugin/scenarios/rocketmq-scenario/pom.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
<properties>
3131
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
3232
<compiler.version>1.8</compiler.version>
33+
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
3334
<test.framework.version>4.9.4</test.framework.version>
3435
<spring.boot.version>2.1.6.RELEASE</spring.boot.version>
3536
<lombok.version>1.18.20</lombok.version>
@@ -97,6 +98,7 @@
9798
</plugin>
9899
<plugin>
99100
<artifactId>maven-compiler-plugin</artifactId>
101+
<version>${maven-compiler-plugin.version}</version>
100102
<configuration>
101103
<source>${compiler.version}</source>
102104
<target>${compiler.version}</target>

test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller/CaseController.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,10 @@ public String testcase() {
5959

6060
// send msg
6161
Message msg = new Message("TopicTest",
62-
"TagA",
6362
("Hello RocketMQ sendMsg " + new Date()).getBytes(RemotingHelper.DEFAULT_CHARSET)
6463
);
64+
msg.setTags("TagA");
65+
msg.setKeys("KeyA");
6566
SendResult sendResult = producer.send(msg);
6667
System.out.printf("%s send msg: %s%n", new Date(), sendResult);
6768

@@ -102,14 +103,6 @@ public String healthCheck() throws Exception {
102103
producer.setNamesrvAddr(namerServer);
103104
producer.start();
104105
System.out.printf("HealthCheck Provider Started.%n");
105-
106-
// send msg
107-
Message msg = new Message("HealthCheckTopicTest",
108-
"TagA",
109-
("Hello RocketMQ sendMsg " + new Date()).getBytes(RemotingHelper.DEFAULT_CHARSET)
110-
);
111-
SendResult sendResult = producer.send(msg);
112-
System.out.printf("healthCheck %s send msg: %s%n", new Date(), sendResult);
113106
return SUCCESS;
114107
}
115108

0 commit comments

Comments
 (0)