Skip to content

Commit d4d21ca

Browse files
authored
Polish up kafka,nats,activemq,rabbitmq plugins to set peer at comsumer side (#377)
1 parent f17a4ea commit d4d21ca

15 files changed

Lines changed: 29 additions & 17 deletions

File tree

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ Release Notes.
2727
* Support mannual propagation of tracing context to next operators for webflux.
2828
* Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span.
2929
* Polish up Pulsar plugins to remove unnecessary dynamic value , set peer at consumer side
30+
* Polish Kafka plugin to set peer at the consumer side.
31+
* Polish NATS plugin to set peer at the consumer side.
32+
* Polish ActiveMQ plugin to set peer at the consumer side.
33+
* Polish RabbitMQ plugin to set peer at the consumer side.
3034

3135
#### Documentation
3236

apm-sniffer/apm-sdk-plugin/activemq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/activemq/MessageConsumerDequeueInterceptor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public Object afterMethod(final EnhancedInstance objInst,
8383
if (activeSpan == null) {
8484
return ret;
8585
}
86+
activeSpan.setPeer(url);
8687
activeSpan.setComponent(ComponentsDefine.ACTIVEMQ_CONSUMER);
8788
SpanLayer.asMQ(activeSpan);
8889
CarrierItem next = contextCarrier.items();

apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
7979
SpanLayer.asMQ(activeSpan);
8080
Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
8181
Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());
82-
82+
activeSpan.setPeer(requiredInfo.getBrokerServers());
8383
for (List<ConsumerRecord<?, ?>> consumerRecords : records.values()) {
8484
for (ConsumerRecord<?, ?> record : consumerRecords) {
8585
ContextCarrier contextCarrier = new ContextCarrier();

apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/DeliverReplyInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ public class DeliverReplyInterceptor implements InstanceMethodsAroundInterceptor
3636
@Override
3737
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
3838
final AbstractSpan entrySpan = createEntrySpan((Message) allArguments[0]);
39-
Tags.MQ_BROKER.set(entrySpan, buildServers((Connection) objInst));
39+
final String servers = buildServers((Connection) objInst);
40+
Tags.MQ_BROKER.set(entrySpan, servers);
41+
entrySpan.setPeer(servers);
4042
}
4143

4244
@Override

apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/NatsCommons.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ static MessageHandler buildTraceMsgHandler(String servers, MessageHandler msgHan
108108
}
109109
AbstractSpan span = NatsCommons.createEntrySpan(msg);
110110
Tags.MQ_BROKER.set(span, servers);
111+
span.setPeer(servers);
111112
try {
112113
msgHandler.onMessage(msg);
113114
} catch (Exception e) {

apm-sniffer/apm-sdk-plugin/nats-2.14.x-2.15.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/nats/client/SubscriptionNextMsgInterceptor.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
4545
Message msg = (Message) ret;
4646
AbstractSpan span = createEntrySpan(msg);
4747
Tags.MQ_BROKER.set(span , servers);
48+
span.setPeer(servers);
4849
// Close the span immediately , as no chance to trace what user want to do
4950
ContextManager.stopSpan(span);
5051
return ret;

apm-sniffer/apm-sdk-plugin/rabbitmq-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/rabbitmq/TracerConsumer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public void handleDelivery(final String consumerTag,
8282
Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange());
8383
Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey());
8484
activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER);
85+
activeSpan.setPeer(serverUrl);
8586
SpanLayer.asMQ(activeSpan);
8687
CarrierItem next = contextCarrier.items();
8788
while (next.hasNext()) {

docs/en/setup/service-agent/java-agent/Java-Plugin-Development-Guide.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ SkyWalking analysis MQ performance related metrics through the following tags.
230230
* `mq.topic` indicates MQ topic name , It's optional as some MQ don't hava concept of `topic`
231231
* `transmission.latency` The transmission latency from consumer to producer. Usually you needn't to record this tag manually, instead to call `contextCarrier.extensionInjector().injectSendingTimestamp();` to record tag `sendingTimestamp` on producer side , and SkyWalking would record this tag on consumer side if `sw8-x` context carrier(from producer side) contains `sendingTimestamp`
232232

233+
Notice , you should set `peer` at both sides(producer and consumer). And the value of peer should represent the MQ server cluster.
234+
233235

234236
### Advanced APIs
235237

test/plugin/scenarios/activemq-scenario/config/expectedData.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ segmentItems:
2828
componentId: 45
2929
isError: false
3030
spanType: Exit
31-
peer: not null
31+
peer: not blank
3232
tags:
3333
- {key: mq.broker, value: not blank}
3434
- {key: mq.queue, value: test}
@@ -59,7 +59,7 @@ segmentItems:
5959
componentId: 46
6060
isError: false
6161
spanType: Entry
62-
peer: ''
62+
peer: not blank
6363
tags:
6464
- {key: mq.broker, value: not blank}
6565
- {key: mq.queue, value: test}

test/plugin/scenarios/kafka-scenario/config/expectedData.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ segmentItems:
158158
componentId: 41
159159
isError: false
160160
spanType: Entry
161-
peer: ''
161+
peer: kafka-server:9092
162162
tags:
163163
- {key: mq.broker, value: 'kafka-server:9092'}
164164
- {key: mq.topic, value: test}
@@ -194,7 +194,7 @@ segmentItems:
194194
componentId: 41
195195
isError: false
196196
spanType: Entry
197-
peer: ''
197+
peer: kafka-server:9092
198198
tags:
199199
- {key: mq.broker, value: 'kafka-server:9092'}
200200
- {key: mq.topic, value: test.}
@@ -241,7 +241,7 @@ segmentItems:
241241
componentId: 41
242242
isError: false
243243
spanType: Entry
244-
peer: ''
244+
peer: kafka-server:9092
245245
tags:
246246
- { key: mq.broker, value: 'kafka-server:9092' }
247247
- { key: mq.topic, value: assign }

0 commit comments

Comments
 (0)