Skip to content

Commit 6648e45

Browse files
zhyyuyuzhongyuwu-sheng
authored
Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span (#369)
* add rocketmq consumer tag * update CHANGES.md * Update CHANGES.md * change consumer tag not null to not blank * set rocketmq consumer span's peer to namesrv Co-authored-by: yuzhongyu <yuzhongyu@cestc.cn> Co-authored-by: 吴晟 Wu Sheng <wu.sheng@foxmail.com>
1 parent 5c23b38 commit 6648e45

7 files changed

Lines changed: 171 additions & 2 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Release Notes.
2424
* Polish up activemq plugin to fix missing broker tag on consumer side
2525
* Enhance MQ plugin relative tests to check key tags not blank.
2626
* Add RocketMQ test scenarios for version 4.3 - 4.9. No 4.0 - 4.2 release images for testing.
27+
* Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span.
2728

2829
#### Documentation
2930

apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rocketMQ/v4/AbstractMessageConsumeInterceptor.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@
2020

2121
import java.lang.reflect.Method;
2222
import java.util.List;
23+
import org.apache.commons.lang3.StringUtils;
2324
import org.apache.rocketmq.common.message.MessageExt;
2425
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
2526
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
2627
import org.apache.skywalking.apm.agent.core.context.ContextManager;
28+
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
2729
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
2830
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
2931
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
3032
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
3133
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
3234
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
35+
import org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumerEnhanceInfos;
3336

3437
/**
3538
* {@link AbstractMessageConsumeInterceptor} create entry span when the <code>consumeMessage</code> in the {@link
@@ -48,13 +51,23 @@ public final void beforeMethod(EnhancedInstance objInst, Method method, Object[]
4851
ContextCarrier contextCarrier = getContextCarrierFromMessage(msgs.get(0));
4952
AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + msgs.get(0)
5053
.getTopic() + "/Consumer", contextCarrier);
51-
54+
Tags.MQ_TOPIC.set(span, msgs.get(0).getTopic());
55+
if (msgs.get(0).getStoreHost() != null) {
56+
String brokerAddress = msgs.get(0).getStoreHost().toString();
57+
brokerAddress = StringUtils.removeStart(brokerAddress, "/");
58+
Tags.MQ_BROKER.set(span, brokerAddress);
59+
}
5260
span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);
5361
SpanLayer.asMQ(span);
5462
for (int i = 1; i < msgs.size(); i++) {
5563
ContextManager.extract(getContextCarrierFromMessage(msgs.get(i)));
5664
}
5765

66+
Object skyWalkingDynamicField = objInst.getSkyWalkingDynamicField();
67+
if (skyWalkingDynamicField != null) {
68+
ConsumerEnhanceInfos consumerEnhanceInfos = (ConsumerEnhanceInfos) skyWalkingDynamicField;
69+
span.setPeer(consumerEnhanceInfos.getNamesrvAddr());
70+
}
5871
}
5972

6073
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.rocketMQ.v4;
20+
21+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
22+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
25+
import org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumerEnhanceInfos;
26+
27+
import java.lang.reflect.Method;
28+
29+
public class RegisterMessageListenerInterceptor implements InstanceMethodsAroundInterceptor {
30+
@Override
31+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
32+
DefaultMQPushConsumer defaultMQPushConsumer = (DefaultMQPushConsumer) objInst;
33+
String namesrvAddr = defaultMQPushConsumer.getNamesrvAddr();
34+
ConsumerEnhanceInfos consumerEnhanceInfos = new ConsumerEnhanceInfos(namesrvAddr);
35+
36+
if (allArguments[0] instanceof EnhancedInstance) {
37+
EnhancedInstance enhancedMessageListener = (EnhancedInstance) allArguments[0];
38+
enhancedMessageListener.setSkyWalkingDynamicField(consumerEnhanceInfos);
39+
}
40+
}
41+
42+
@Override
43+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
44+
return ret;
45+
}
46+
47+
@Override
48+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
49+
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
20+
21+
public class ConsumerEnhanceInfos {
22+
23+
private String namesrvAddr;
24+
25+
public ConsumerEnhanceInfos(String namesrvAddr) {
26+
this.namesrvAddr = namesrvAddr;
27+
}
28+
29+
public String getNamesrvAddr() {
30+
return namesrvAddr;
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.rocketMQ.v4.define;
20+
21+
import net.bytebuddy.description.method.MethodDescription;
22+
import net.bytebuddy.matcher.ElementMatcher;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
26+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
27+
28+
import static net.bytebuddy.matcher.ElementMatchers.named;
29+
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
30+
31+
public class DefaultMQPushConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
32+
33+
private static final String ENHANCE_CLASS = "org.apache.rocketmq.client.consumer.DefaultMQPushConsumer";
34+
private static final String REGISTER_MESSAGE_LISTENER_METHOD_NAME = "registerMessageListener";
35+
public static final String REGISTER_MESSAGE_LISTENER_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.rocketMQ.v4.RegisterMessageListenerInterceptor";
36+
37+
@Override
38+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
39+
return new ConstructorInterceptPoint[0];
40+
}
41+
42+
@Override
43+
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
44+
return new InstanceMethodsInterceptPoint[] {
45+
new InstanceMethodsInterceptPoint() {
46+
@Override
47+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
48+
return named(REGISTER_MESSAGE_LISTENER_METHOD_NAME);
49+
}
50+
51+
@Override
52+
public String getMethodsInterceptor() {
53+
return REGISTER_MESSAGE_LISTENER_INTERCEPT_CLASS;
54+
}
55+
56+
@Override
57+
public boolean isOverrideArgs() {
58+
return false;
59+
}
60+
}
61+
};
62+
}
63+
64+
@Override
65+
protected ClassMatch enhanceClass() {
66+
return byName(ENHANCE_CLASS);
67+
}
68+
69+
}

apm-sniffer/apm-sdk-plugin/rocketMQ-4.x-plugin/src/main/resources/skywalking-plugin.def

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@ rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageC
1818
rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.ConsumeMessageOrderlyInstrumentation
1919
rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.MQClientAPIImplInstrumentation
2020
rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.SendCallbackInstrumentation
21+
rocketMQ-4.x=org.apache.skywalking.apm.plugin.rocketMQ.v4.define.DefaultMQPushConsumerInstrumentation

test/plugin/scenarios/rocketmq-scenario/config/expectedData.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@ segmentItems:
5959
componentId: 39
6060
isError: false
6161
spanType: Entry
62-
peer: ''
62+
peer: not blank
6363
tags:
6464
- {key: transmission.latency, value: not null}
65+
- {key: mq.topic, value: TopicTest }
66+
- {key: mq.broker, value: not blank }
6567
refs:
6668
- {parentEndpoint: GET:/case/rocketmq-scenario, networkAddress: not null,
6769
refType: CrossProcess, parentSpanId: 1, parentTraceSegmentId: not null,

0 commit comments

Comments
 (0)