Skip to content

Commit 64a33b2

Browse files
authored
Support mannual propagation of tracing context to next stream for webflux. (#371)
1 parent 6648e45 commit 64a33b2

19 files changed

Lines changed: 812 additions & 2 deletions

File tree

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Release Notes.
2424
* Polish up activemq plugin to fix missing broker tag on consumer side
2525
* Enhance MQ plugin relative tests to check key tags not blank.
2626
* Add RocketMQ test scenarios for version 4.3 - 4.9. No 4.0 - 4.2 release images for testing.
27+
* Support mannual propagation of tracing context to next operators for webflux.
2728
* Add MQ_TOPIC and MQ_BROKER tags for RocketMQ consumer's span.
2829

2930
#### Documentation
@@ -32,6 +33,8 @@ Release Notes.
3233
* Update plugin dev tags for cache relative tags.
3334
* Add plugin dev docs for virtual database tags.
3435
* Add plugin dev docs for virtual MQ tags.
36+
* Add doc about kafka plugin Manual APIs.
37+
3538

3639
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/150?closed=1)
3740

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
~
18+
-->
19+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
20+
<parent>
21+
<artifactId>apm-application-toolkit</artifactId>
22+
<groupId>org.apache.skywalking</groupId>
23+
<version>8.13.0-SNAPSHOT</version>
24+
</parent>
25+
<modelVersion>4.0.0</modelVersion>
26+
27+
<artifactId>apm-toolkit-webflux</artifactId>
28+
29+
<properties>
30+
<spring-webflux.version>5.1.0.RELEASE</spring-webflux.version>
31+
</properties>
32+
33+
<dependencies>
34+
<dependency>
35+
<groupId>org.springframework</groupId>
36+
<artifactId>spring-webflux</artifactId>
37+
<version>${spring-webflux.version}</version>
38+
<scope>provided</scope>
39+
</dependency>
40+
</dependencies>
41+
42+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.toolkit.webflux;
20+
21+
import org.springframework.web.server.ServerWebExchange;
22+
import reactor.core.publisher.Signal;
23+
import reactor.core.publisher.SignalType;
24+
import reactor.util.context.Context;
25+
26+
import java.util.concurrent.Callable;
27+
import java.util.function.Consumer;
28+
29+
/**
30+
* WebFlux operators that are capable to reuse tracing context from Reactor's Context.
31+
*/
32+
public final class WebFluxSkyWalkingOperators {
33+
34+
private WebFluxSkyWalkingOperators() {
35+
throw new IllegalStateException("You can't instantiate a utility class");
36+
}
37+
38+
/**
39+
* Wraps a runnable with a local span and continue tracing context.
40+
*
41+
* @param signalType - Reactor's signal type
42+
* @param runnable - lambda to execute within the tracing context
43+
* @return consumer of a signal
44+
*/
45+
public static Consumer<Signal<?>> continueTracing(SignalType signalType, Runnable runnable) {
46+
return signal -> {
47+
if (signalType != signal.getType()) {
48+
return;
49+
}
50+
continueTracing(runnable).accept(signal);
51+
};
52+
}
53+
54+
/**
55+
* Wraps a consumer with a local span and continue tracing context.
56+
*
57+
* @param signalType - Reactor's signal type
58+
* @param consumer - lambda to execute within the tracing context
59+
* @return consumer of a signal
60+
*/
61+
public static Consumer<Signal> continueTracing(SignalType signalType, Consumer<Signal> consumer) {
62+
return signal -> {
63+
if (signalType != signal.getType()) {
64+
return;
65+
}
66+
continueTracing(signal.getContext(), () -> consumer.accept(signal));
67+
};
68+
}
69+
70+
/**
71+
* Wraps a runnable with a local span and continue tracing context.
72+
*
73+
* @param runnable - lambda to execute within the tracing context
74+
* @return consumer of a signal
75+
*/
76+
public static Consumer<Signal> continueTracing(Runnable runnable) {
77+
return signal -> {
78+
Context context = signal.getContext();
79+
continueTracing(context, runnable);
80+
};
81+
}
82+
83+
/**
84+
* Wraps a runnable with a local span and continue tracing context.
85+
*
86+
* @param context - Reactor context that contains the tracing context
87+
* @param runnable - lambda to execute within the tracing context
88+
*/
89+
public static void continueTracing(Context context, Runnable runnable) {
90+
runnable.run();
91+
}
92+
93+
/**
94+
* Wraps a callable with a local span and continue tracing context.
95+
*
96+
* @param context - Reactor context that contains the tracing context
97+
* @param callable - lambda to execute within the tracing context
98+
* @param <T> callable's return type
99+
* @return value from the callable
100+
*/
101+
public static <T> T continueTracing(Context context, Callable<T> callable) {
102+
try {
103+
return callable.call();
104+
} catch (Exception e) {
105+
return sneakyThrow(e);
106+
}
107+
}
108+
109+
/**
110+
* Wraps a callable with a local span and continue tracing context.
111+
*
112+
* @param serverWebExchange - EnhancedInstance that contains the tracing context
113+
* @param callable - lambda to execute within the tracing context
114+
* @param <T> callable's return type
115+
* @return value from the callable
116+
*/
117+
public static <T> T continueTracing(ServerWebExchange serverWebExchange, Callable<T> callable) {
118+
try {
119+
return callable.call();
120+
} catch (Exception e) {
121+
return sneakyThrow(e);
122+
}
123+
}
124+
125+
/**
126+
* Wraps a runnable with a local span and continue tracing context.
127+
*
128+
* @param serverWebExchange - EnhancedInstance that contains the tracing context
129+
* @param runnable - lambda to execute within the tracing context
130+
*/
131+
public static void continueTracing(ServerWebExchange serverWebExchange, Runnable runnable) {
132+
runnable.run();
133+
}
134+
135+
private static <T extends Throwable, R> R sneakyThrow(Throwable t) throws T {
136+
throw (T) t;
137+
}
138+
}

apm-application-toolkit/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,5 +35,6 @@
3535
<module>apm-toolkit-meter</module>
3636
<module>apm-toolkit-micrometer-registry</module>
3737
<module>apm-toolkit-kafka</module>
38+
<module>apm-toolkit-webflux</module>
3839
</modules>
3940
</project>
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
~
18+
-->
19+
20+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<parent>
22+
<artifactId>apm-toolkit-activation</artifactId>
23+
<groupId>org.apache.skywalking</groupId>
24+
<version>8.13.0-SNAPSHOT</version>
25+
</parent>
26+
<modelVersion>4.0.0</modelVersion>
27+
28+
<artifactId>apm-toolkit-webflux-activation</artifactId>
29+
30+
<properties>
31+
<spring-webflux.version>5.1.0.RELEASE</spring-webflux.version>
32+
</properties>
33+
34+
<dependencies>
35+
<dependency>
36+
<groupId>org.springframework</groupId>
37+
<artifactId>spring-webflux</artifactId>
38+
<version>${spring-webflux.version}</version>
39+
<scope>provided</scope>
40+
</dependency>
41+
</dependencies>
42+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.toolkit.activation.webflux;
20+
21+
import net.bytebuddy.description.method.MethodDescription;
22+
import net.bytebuddy.matcher.ElementMatcher;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassStaticMethodsEnhancePluginDefine;
26+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
27+
28+
import static net.bytebuddy.matcher.ElementMatchers.named;
29+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
30+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
31+
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
32+
33+
/**
34+
*/
35+
public class WebFluxSkyWalkingOperatorsActivation extends ClassStaticMethodsEnhancePluginDefine {
36+
37+
public static final String INTERCEPT_CLASS =
38+
"org.apache.skywalking.apm.toolkit.activation.webflux.WebFluxSkyWalkingOperatorsInterceptor";
39+
public static final String ENHANCE_CLASS =
40+
"org.apache.skywalking.apm.toolkit.webflux.WebFluxSkyWalkingOperators";
41+
public static final String ENHANCE_METHOD = "continueTracing";
42+
43+
@Override
44+
protected ClassMatch enhanceClass() {
45+
return byName(ENHANCE_CLASS);
46+
}
47+
48+
@Override
49+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
50+
return null;
51+
}
52+
53+
@Override
54+
public StaticMethodsInterceptPoint[] getStaticMethodsInterceptPoints() {
55+
return new StaticMethodsInterceptPoint[] {
56+
new StaticMethodsInterceptPoint() {
57+
@Override
58+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
59+
return named(ENHANCE_METHOD).and(takesArguments(2))
60+
.and(takesArgument(0, named("reactor.util.context.Context"))
61+
.or(takesArgument(0, named("org.springframework.web.server.ServerWebExchange"))));
62+
}
63+
64+
@Override
65+
public String getMethodsInterceptor() {
66+
return INTERCEPT_CLASS;
67+
}
68+
69+
@Override
70+
public boolean isOverrideArgs() {
71+
return false;
72+
}
73+
}
74+
};
75+
}
76+
}

0 commit comments

Comments
 (0)