Skip to content

Commit 7af3f06

Browse files
authored
Fix the thread safety bug of finishing operation for the span named "SpringCloudGateway/sendRequest" (#555)
1 parent 35388ac commit 7af3f06

6 files changed

Lines changed: 181 additions & 29 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ Release Notes.
99
* Support Jetty 11.x plugin
1010
* Fix the scenario of using the HBase plugin with spring-data-hadoop.
1111
* Add RocketMQ 5.x plugin
12+
* Fix the thread safety bug of finishing operation for the span named "SpringCloudGateway/sendRequest"
1213

1314
#### Documentation
1415

apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/HttpClientRequestInterceptor.java

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,16 @@
2121
import java.net.URL;
2222
import java.util.function.BiConsumer;
2323
import java.util.function.Function;
24+
2425
import org.apache.skywalking.apm.agent.core.context.CarrierItem;
2526
import org.apache.skywalking.apm.agent.core.context.ContextCarrier;
2627
import org.apache.skywalking.apm.agent.core.context.ContextManager;
2728
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
2829
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
2930
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
3031
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
31-
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
32-
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
32+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
33+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
3334
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define.EnhanceCacheObject;
3435
import org.reactivestreams.Publisher;
3536
import reactor.core.publisher.Mono;
@@ -38,13 +39,14 @@
3839

3940
import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.SPRING_CLOUD_GATEWAY;
4041

41-
public class HttpClientRequestInterceptor implements InstanceMethodsAroundInterceptor {
42+
public class HttpClientRequestInterceptor implements InstanceMethodsAroundInterceptorV2 {
43+
4244
@Override
4345
public void beforeMethod(final EnhancedInstance objInst,
4446
final Method method,
4547
final Object[] allArguments,
4648
final Class<?>[] argumentsTypes,
47-
final MethodInterceptResult result) throws Throwable {
49+
final MethodInvocationContext context) throws Throwable {
4850

4951
/*
5052
In this plug-in, the HttpClientFinalizerSendInterceptor depends on the NettyRoutingFilterInterceptor
@@ -54,13 +56,13 @@ public void beforeMethod(final EnhancedInstance objInst,
5456
if (!ContextManager.isActive()) {
5557
return;
5658
}
57-
59+
5860
AbstractSpan span = ContextManager.activeSpan();
5961

6062
URL url = new URL((String) allArguments[1]);
6163
ContextCarrier contextCarrier = new ContextCarrier();
6264
AbstractSpan abstractSpan = ContextManager.createExitSpan(
63-
"SpringCloudGateway/sendRequest", contextCarrier, getPeer(url));
65+
"SpringCloudGateway/sendRequest", contextCarrier, getPeer(url));
6466
abstractSpan.prepareForAsync();
6567
Tags.URL.set(abstractSpan, String.valueOf(allArguments[1]));
6668
abstractSpan.setLayer(SpanLayer.HTTP);
@@ -80,36 +82,53 @@ public Publisher<Void> apply(final HttpClientRequest httpClientRequest) {
8082
}
8183
};
8284

83-
objInst.setSkyWalkingDynamicField(new EnhanceCacheObject(span, abstractSpan));
85+
context.setContext(new EnhanceCacheObject(span, abstractSpan));
8486
}
8587

8688
@Override
8789
public Object afterMethod(final EnhancedInstance objInst,
8890
final Method method,
8991
final Object[] allArguments,
9092
final Class<?>[] argumentsTypes,
91-
final Object ret) {
92-
EnhanceCacheObject enhanceCacheObject = (EnhanceCacheObject) objInst.getSkyWalkingDynamicField();
93+
final Object ret,
94+
MethodInvocationContext context) {
95+
EnhanceCacheObject enhanceCacheObject = (EnhanceCacheObject) context.getContext();
9396
Mono<HttpClientResponse> responseMono = (Mono<HttpClientResponse>) ret;
9497
return responseMono.doAfterSuccessOrError(new BiConsumer<HttpClientResponse, Throwable>() {
9598
@Override
9699
public void accept(final HttpClientResponse httpClientResponse, final Throwable throwable) {
100+
doAfterSuccessOrError(httpClientResponse, throwable, enhanceCacheObject);
101+
}
102+
});
103+
}
97104

98-
AbstractSpan abstractSpan = enhanceCacheObject.getSendSpan();
99-
if (abstractSpan != null) {
100-
if (throwable != null) {
101-
abstractSpan.log(throwable);
102-
} else if (httpClientResponse.status().code() > 400) {
103-
abstractSpan.errorOccurred();
104-
}
105-
Tags.HTTP_RESPONSE_STATUS_CODE.set(abstractSpan, httpClientResponse.status().code());
106-
abstractSpan.asyncFinish();
107-
}
105+
void doAfterSuccessOrError(HttpClientResponse httpClientResponse, Throwable throwable, EnhanceCacheObject enhanceCacheObject) {
106+
try {
107+
//When executing the beforeMethod method, if the ContextManager is inactive, the enhanceCacheObject will be null.
108+
if (enhanceCacheObject == null) {
109+
return;
110+
}
108111

109-
objInst.setSkyWalkingDynamicField(null);
110-
enhanceCacheObject.getFilterSpan().asyncFinish();
112+
//The doAfterSuccessOrError method may be executed multiple times.
113+
if (enhanceCacheObject.isSpanFinish()) {
114+
return;
111115
}
112-
});
116+
117+
AbstractSpan abstractSpan = enhanceCacheObject.getSendSpan();
118+
if (throwable != null) {
119+
abstractSpan.log(throwable);
120+
} else if (httpClientResponse.status().code() > 400) {
121+
abstractSpan.errorOccurred();
122+
}
123+
Tags.HTTP_RESPONSE_STATUS_CODE.set(abstractSpan, httpClientResponse.status().code());
124+
125+
abstractSpan.asyncFinish();
126+
enhanceCacheObject.getFilterSpan().asyncFinish();
127+
128+
enhanceCacheObject.setSpanFinish(true);
129+
} catch (Throwable e) {
130+
//Catch unknown exceptions to avoid interrupting business processes.
131+
}
113132
}
114133

115134
private String getPeer(URL url) {
@@ -121,7 +140,8 @@ public void handleMethodException(final EnhancedInstance objInst,
121140
final Method method,
122141
final Object[] allArguments,
123142
final Class<?>[] argumentsTypes,
124-
final Throwable t) {
143+
final Throwable t,
144+
MethodInvocationContext context) {
125145

126146
}
127147
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define;
19+
20+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
21+
22+
public abstract class AbstractGateway200EnhancePluginDefineV2 extends ClassInstanceMethodsEnhancePluginDefineV2 {
23+
24+
@Override
25+
protected String[] witnessClasses() {
26+
return new String[] {
27+
"org.springframework.cloud.gateway.config.GatewayAutoConfiguration$1"
28+
};
29+
}
30+
}

apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/EnhanceCacheObject.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
public class EnhanceCacheObject {
2323
private final AbstractSpan filterSpan;
2424
private final AbstractSpan sendSpan;
25+
private volatile boolean spanFinish = false;
2526

2627
public EnhanceCacheObject(final AbstractSpan filterSpan, final AbstractSpan sendSpan) {
2728
this.filterSpan = filterSpan;
@@ -35,4 +36,12 @@ public AbstractSpan getFilterSpan() {
3536
public AbstractSpan getSendSpan() {
3637
return sendSpan;
3738
}
39+
40+
public boolean isSpanFinish() {
41+
return spanFinish;
42+
}
43+
44+
public void setSpanFinish(boolean spanFinish) {
45+
this.spanFinish = spanFinish;
46+
}
3847
}

apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.0.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v20x/define/HttpClientInstrumentation.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
import net.bytebuddy.description.method.MethodDescription;
2121
import net.bytebuddy.matcher.ElementMatcher;
2222
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
23-
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
2424
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
2525

2626
import static net.bytebuddy.matcher.ElementMatchers.named;
2727
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
2828

29-
public class HttpClientInstrumentation extends AbstractGateway200EnhancePluginDefine {
29+
public class HttpClientInstrumentation extends AbstractGateway200EnhancePluginDefineV2 {
3030

3131
@Override
3232
protected ClassMatch enhanceClass() {
@@ -39,16 +39,16 @@ public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
3939
}
4040

4141
@Override
42-
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
43-
return new InstanceMethodsInterceptPoint[] {
44-
new InstanceMethodsInterceptPoint() {
42+
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
43+
return new InstanceMethodsInterceptV2Point[] {
44+
new InstanceMethodsInterceptV2Point() {
4545
@Override
4646
public ElementMatcher<MethodDescription> getMethodsMatcher() {
4747
return named("request");
4848
}
4949

5050
@Override
51-
public String getMethodsInterceptor() {
51+
public String getMethodsInterceptorV2() {
5252
return Constants.REQUEST_INTERCEPTOR;
5353
}
5454

@@ -59,4 +59,5 @@ public boolean isOverrideArgs() {
5959
}
6060
};
6161
}
62+
6263
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x;
19+
20+
import io.netty.handler.codec.http.HttpResponseStatus;
21+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
22+
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
23+
import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
24+
import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
25+
import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
26+
import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
27+
import org.apache.skywalking.apm.plugin.spring.cloud.gateway.v20x.define.EnhanceCacheObject;
28+
import org.junit.Before;
29+
import org.junit.Rule;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
import org.mockito.Mockito;
33+
import org.mockito.junit.MockitoJUnit;
34+
import org.mockito.junit.MockitoRule;
35+
import reactor.ipc.netty.http.client.HttpClientResponse;
36+
37+
import static org.mockito.Mockito.spy;
38+
import static org.mockito.Mockito.verify;
39+
40+
@RunWith(TracingSegmentRunner.class)
41+
public class HttpClientRequestInterceptorTest {
42+
43+
private HttpClientRequestInterceptor httpClientRequestInterceptor = new HttpClientRequestInterceptor();
44+
45+
@Rule
46+
public AgentServiceRule serviceRule = new AgentServiceRule();
47+
@Rule
48+
public MockitoRule rule = MockitoJUnit.rule();
49+
50+
@SegmentStoragePoint
51+
private SegmentStorage segmentStorage;
52+
53+
private HttpClientResponse httpClientResponse;
54+
55+
@Before
56+
public void setUp() throws Exception {
57+
58+
httpClientResponse = Mockito.mock(HttpClientResponse.class);
59+
HttpResponseStatus httpResponseStatus = Mockito.mock(HttpResponseStatus.class);
60+
61+
Mockito.when(httpResponseStatus.code()).thenReturn(200);
62+
Mockito.when(httpClientResponse.status()).thenReturn(httpResponseStatus);
63+
}
64+
65+
@Test
66+
public void testDoAfterSuccessOrError() {
67+
AbstractSpan filterSpan = ContextManager.createLocalSpan("mockFilterSpan");
68+
filterSpan.prepareForAsync();
69+
ContextManager.stopSpan(filterSpan);
70+
71+
AbstractSpan sendSpan = ContextManager.createExitSpan("SpringCloudGateway/sendRequest", "http://127.0.0.1:80");
72+
sendSpan.prepareForAsync();
73+
ContextManager.stopSpan(sendSpan);
74+
75+
EnhanceCacheObject enhanceCacheObject = new EnhanceCacheObject(filterSpan, sendSpan);
76+
enhanceCacheObject = spy(enhanceCacheObject);
77+
78+
//Test the ContextManager is inactive.
79+
httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, null, null);
80+
verify(enhanceCacheObject, Mockito.times(0)).setSpanFinish(true);
81+
82+
//Test normal scenario.
83+
httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, null, enhanceCacheObject);
84+
verify(enhanceCacheObject, Mockito.times(1)).setSpanFinish(true);
85+
86+
//Test the doAfterSuccessOrError method is executed multiple times.
87+
httpClientRequestInterceptor.doAfterSuccessOrError(httpClientResponse, null, enhanceCacheObject);
88+
verify(enhanceCacheObject, Mockito.times(1)).setSpanFinish(true);
89+
}
90+
91+
}

0 commit comments

Comments
 (0)