Skip to content

Commit f17a4ea

Browse files
authored
Polish up Pulsar plugins (#375)
1 parent 35a3f5a commit f17a4ea

18 files changed

Lines changed: 223 additions & 660 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Release Notes.
2626
* Add RocketMQ test scenarios for version 4.3 - 4.9. No 4.0 - 4.2 release images for testing.
2727
* Support mannual propagation of tracing context to next operators for webflux.
2828
* Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span.
29+
* Polish up Pulsar plugins to remove unnecessary dynamic value , set peer at consumer side
2930

3031
#### Documentation
3132

apm-sniffer/apm-sdk-plugin/pulsar-2.2-2.7-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptor.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,13 @@
1818

1919
package org.apache.skywalking.apm.plugin.pulsar;
2020

21-
import org.apache.pulsar.client.impl.PulsarClientImpl;
2221
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
2322
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
24-
import org.apache.skywalking.apm.plugin.pulsar.common.ProducerEnhanceRequiredInfo;
2523

26-
/**
27-
* Interceptor of pulsar producer constructor.
28-
* <p>
29-
* The interceptor create {@link ProducerEnhanceRequiredInfo} which is required by instance method interceptor, So use
30-
* it to update the skywalking dynamic field of pulsar producer enhanced instance. So that the instance methods can get
31-
* the {@link ProducerEnhanceRequiredInfo}
32-
*/
3324
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
3425

3526
@Override
3627
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
37-
PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
38-
String topic = (String) allArguments[1];
39-
ProducerEnhanceRequiredInfo producerEnhanceRequiredInfo = new ProducerEnhanceRequiredInfo();
40-
producerEnhanceRequiredInfo.setTopic(topic);
41-
/*
42-
* Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
43-
* can handle the service url provider which use a dynamic service url
44-
*/
45-
producerEnhanceRequiredInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
46-
producerEnhanceRequiredInfo.setPropertiesInjector(new MessagePropertiesInjectorV27());
47-
objInst.setSkyWalkingDynamicField(producerEnhanceRequiredInfo);
28+
objInst.setSkyWalkingDynamicField(new MessagePropertiesInjectorV27());
4829
}
4930
}

apm-sniffer/apm-sdk-plugin/pulsar-2.2-2.7-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/MockMessage.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

apm-sniffer/apm-sdk-plugin/pulsar-2.2-2.7-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/ProducerConstructorInterceptorTest.java

Lines changed: 0 additions & 96 deletions
This file was deleted.

apm-sniffer/apm-sdk-plugin/pulsar-2.8.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/pulsar/v28x/ProducerConstructorInterceptor.java

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,13 @@
1818

1919
package org.apache.skywalking.apm.plugin.pulsar.v28x;
2020

21-
import org.apache.pulsar.client.impl.PulsarClientImpl;
2221
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
2322
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
24-
import org.apache.skywalking.apm.plugin.pulsar.common.ProducerEnhanceRequiredInfo;
2523

26-
/**
27-
* Interceptor of pulsar producer constructor.
28-
* <p>
29-
* The interceptor create {@link ProducerEnhanceRequiredInfo} which is required by instance method interceptor, So use
30-
* it to update the skywalking dynamic field of pulsar producer enhanced instance. So that the instance methods can get
31-
* the {@link ProducerEnhanceRequiredInfo}
32-
*/
3324
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {
3425

3526
@Override
3627
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
37-
PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
38-
String topic = (String) allArguments[1];
39-
ProducerEnhanceRequiredInfo producerEnhanceRequiredInfo = new ProducerEnhanceRequiredInfo();
40-
producerEnhanceRequiredInfo.setTopic(topic);
41-
/*
42-
* Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
43-
* can handle the service url provider which use a dynamic service url
44-
*/
45-
producerEnhanceRequiredInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
46-
producerEnhanceRequiredInfo.setPropertiesInjector(new MessagePropertiesInjectorV28());
47-
objInst.setSkyWalkingDynamicField(producerEnhanceRequiredInfo);
28+
objInst.setSkyWalkingDynamicField(new MessagePropertiesInjectorV28());
4829
}
4930
}

apm-sniffer/apm-sdk-plugin/pulsar-2.8.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/pulsar/v28x/ProducerConstructorInterceptorTest.java

Lines changed: 0 additions & 98 deletions
This file was deleted.

apm-sniffer/apm-sdk-plugin/pulsar-common/src/main/java/org/apache/skywalking/apm/plugin/pulsar/common/ConsumerConstructorInterceptor.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,15 @@
1818

1919
package org.apache.skywalking.apm.plugin.pulsar.common;
2020

21-
import org.apache.pulsar.client.impl.PulsarClientImpl;
2221
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
2322
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
2423
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
2524

26-
/**
27-
* Interceptor of pulsar consumer constructor.
28-
* <p>
29-
* The interceptor create {@link ConsumerEnhanceRequiredInfo} which is required by instance method interceptor, So use
30-
* it to update the skywalking dynamic field of pulsar consumer enhanced instance. So that the instance methods can get
31-
* the {@link ConsumerEnhanceRequiredInfo}
32-
*/
3325
public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {
3426

3527
@Override
3628
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
37-
PulsarClientImpl pulsarClient = (PulsarClientImpl) allArguments[0];
38-
String topic = (String) allArguments[1];
3929
ConsumerConfigurationData consumerConfigurationData = (ConsumerConfigurationData) allArguments[2];
40-
ConsumerEnhanceRequiredInfo requireInfo = new ConsumerEnhanceRequiredInfo();
41-
/*
42-
* Pulsar url can specify with specific URL or a service url provider, use pulsarClient.getLookup().getServiceUrl()
43-
* can handle the service url provider which use a dynamic service url
44-
*/
45-
requireInfo.setServiceUrl(pulsarClient.getLookup().getServiceUrl());
46-
requireInfo.setTopic(topic);
47-
requireInfo.setSubscriptionName(consumerConfigurationData.getSubscriptionName());
48-
objInst.setSkyWalkingDynamicField(requireInfo);
30+
objInst.setSkyWalkingDynamicField(consumerConfigurationData.getSubscriptionName());
4931
}
5032
}

0 commit comments

Comments
 (0)