Skip to content

Commit 2711638

Browse files
Fix SOFA RPC instrumentation: protocol context, server error detection, scope/span ordering
1 parent 8289a28 commit 2711638

9 files changed

Lines changed: 184 additions & 13 deletions

dd-java-agent/instrumentation/sofarpc/sofarpc-5.0/src/main/java/datadog/trace/instrumentation/sofarpc/AbstractClusterInstrumentation.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
1212
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
1313

14+
import com.alipay.sofa.rpc.client.AbstractCluster;
15+
import com.alipay.sofa.rpc.config.ConsumerConfig;
1416
import com.alipay.sofa.rpc.core.request.SofaRequest;
1517
import com.alipay.sofa.rpc.core.response.SofaResponse;
1618
import datadog.trace.agent.tooling.Instrumenter;
@@ -38,10 +40,16 @@ public void methodAdvice(MethodTransformer transformer) {
3840

3941
public static class InvokeAdvice {
4042
@Advice.OnMethodEnter(suppress = Throwable.class)
41-
public static AgentScope enter(@Advice.Argument(0) SofaRequest request) {
43+
public static AgentScope enter(
44+
@Advice.This AbstractCluster self, @Advice.Argument(0) SofaRequest request) {
45+
ConsumerConfig config = self.getConsumerConfig();
46+
String protocol = config != null ? config.getProtocol() : null;
4247
AgentSpan span = startSpan(SOFA_RPC_CLIENT);
4348
DECORATE.afterStart(span);
4449
DECORATE.onRequest(span, request);
50+
if (protocol != null) {
51+
span.setTag("sofarpc.protocol", protocol);
52+
}
4553
AgentScope scope = activateSpan(span);
4654
defaultPropagator().inject(span, request, SETTER);
4755
return scope;
@@ -59,8 +67,8 @@ public static void exit(
5967
DECORATE.onResponse(span, response);
6068
DECORATE.onError(span, throwable);
6169
DECORATE.beforeFinish(span);
62-
span.finish();
6370
scope.close();
71+
span.finish();
6472
}
6573
}
6674
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package datadog.trace.instrumentation.sofarpc;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
6+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
7+
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import net.bytebuddy.asm.Advice;
10+
11+
public class BoltServerProcessorInstrumentation
12+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
13+
14+
@Override
15+
public String instrumentedType() {
16+
return "com.alipay.sofa.rpc.server.bolt.BoltServerProcessor";
17+
}
18+
19+
@Override
20+
public void methodAdvice(MethodTransformer transformer) {
21+
transformer.applyAdvice(
22+
isMethod()
23+
.and(named("handleRequest"))
24+
.and(takesArguments(3))
25+
.and(takesArgument(2, named("com.alipay.sofa.rpc.core.request.SofaRequest"))),
26+
getClass().getName() + "$HandleRequestAdvice");
27+
}
28+
29+
public static class HandleRequestAdvice {
30+
@Advice.OnMethodEnter(suppress = Throwable.class)
31+
public static void enter() {
32+
SofaRpcProtocolContext.set("bolt");
33+
}
34+
35+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
36+
public static void exit() {
37+
SofaRpcProtocolContext.clear();
38+
}
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package datadog.trace.instrumentation.sofarpc;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.takesNoArguments;
6+
7+
import datadog.trace.agent.tooling.Instrumenter;
8+
import net.bytebuddy.asm.Advice;
9+
10+
public class H2cServerTaskInstrumentation
11+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
12+
13+
@Override
14+
public String instrumentedType() {
15+
return "com.alipay.sofa.rpc.server.http.AbstractHttpServerTask";
16+
}
17+
18+
@Override
19+
public void methodAdvice(MethodTransformer transformer) {
20+
transformer.applyAdvice(
21+
isMethod().and(named("run")).and(takesNoArguments()),
22+
getClass().getName() + "$RunAdvice");
23+
}
24+
25+
public static class RunAdvice {
26+
@Advice.OnMethodEnter(suppress = Throwable.class)
27+
public static void enter() {
28+
SofaRpcProtocolContext.set("h2c");
29+
}
30+
31+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
32+
public static void exit() {
33+
SofaRpcProtocolContext.clear();
34+
}
35+
}
36+
}

dd-java-agent/instrumentation/sofarpc/sofarpc-5.0/src/main/java/datadog/trace/instrumentation/sofarpc/ProviderProxyInvokerInstrumentation.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
44
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
55
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
6-
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
76
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
87
import static datadog.trace.instrumentation.sofarpc.SofaRpcExtractAdapter.GETTER;
98
import static datadog.trace.instrumentation.sofarpc.SofaRpcServerDecorator.DECORATE;
@@ -41,17 +40,26 @@ public void methodAdvice(MethodTransformer transformer) {
4140
public static class InvokeAdvice {
4241
@Advice.OnMethodEnter(suppress = Throwable.class)
4342
public static AgentScope enter(@Advice.Argument(0) SofaRequest request) {
44-
// Triple protocol uses gRPC transport: the gRPC instrumentation already handles
45-
// context propagation and creates a grpc.server span. Detect this by checking
46-
// whether an active span is already present on the current thread — if so, skip
47-
// creating a duplicate sofarpc.request span that would start an orphan trace.
48-
if (activeSpan() != null) {
43+
// Protocol is set in thread-local by transport-specific instrumentation before this call.
44+
// If null, the transport is not instrumented — skip.
45+
String protocol = SofaRpcProtocolContext.get();
46+
if (protocol == null) {
4947
return null;
5048
}
51-
AgentSpanContext parentContext = extractContextAndGetSpanContext(request, GETTER);
52-
AgentSpan span = startSpan(SOFA_RPC_SERVER, parentContext);
49+
AgentSpan span;
50+
if ("bolt".equals(protocol) || "h2c".equals(protocol)) {
51+
// Bolt propagates Datadog trace headers via SofaRequest.requestProps — extract from there.
52+
AgentSpanContext parentContext = extractContextAndGetSpanContext(request, GETTER);
53+
span = startSpan(SOFA_RPC_SERVER, parentContext);
54+
} else {
55+
// For Triple and other protocols, trace context is propagated at the transport layer
56+
// (e.g. gRPC Metadata). The transport instrumentation already activated the parent span,
57+
// so startSpan without an explicit parent inherits it automatically.
58+
span = startSpan(SOFA_RPC_SERVER);
59+
}
5360
DECORATE.afterStart(span);
5461
DECORATE.onRequest(span, request);
62+
span.setTag("sofarpc.protocol", protocol);
5563
return activateSpan(span);
5664
}
5765

@@ -67,8 +75,8 @@ public static void exit(
6775
DECORATE.onResponse(span, response);
6876
DECORATE.onError(span, throwable);
6977
DECORATE.beforeFinish(span);
70-
span.finish();
7178
scope.close();
79+
span.finish();
7280
}
7381
}
7482
}

dd-java-agent/instrumentation/sofarpc/sofarpc-5.0/src/main/java/datadog/trace/instrumentation/sofarpc/SofaRpcClientDecorator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ protected String service() {
4040
}
4141

4242
public AgentSpan onRequest(AgentSpan span, SofaRequest request) {
43+
span.setTag("rpc.system", "sofarpc");
4344
if (request == null) {
4445
return span;
4546
}
4647
String serviceName = request.getTargetServiceUniqueName();
4748
String methodName = request.getMethodName();
4849
span.setTag(Tags.RPC_SERVICE, serviceName);
50+
// peer.service is derived automatically by PeerServiceCalculator from rpc.service.
4951
if (serviceName != null && methodName != null) {
5052
span.setResourceName(serviceName + "/" + methodName);
5153
} else if (methodName != null) {

dd-java-agent/instrumentation/sofarpc/sofarpc-5.0/src/main/java/datadog/trace/instrumentation/sofarpc/SofaRpcModule.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,17 @@ public String[] helperClassNames() {
2121
packageName + ".SofaRpcServerDecorator",
2222
packageName + ".SofaRpcInjectAdapter",
2323
packageName + ".SofaRpcExtractAdapter",
24+
packageName + ".SofaRpcProtocolContext",
2425
};
2526
}
2627

2728
@Override
2829
public List<Instrumenter> typeInstrumentations() {
29-
return asList(new AbstractClusterInstrumentation(), new ProviderProxyInvokerInstrumentation());
30+
return asList(
31+
new AbstractClusterInstrumentation(),
32+
new BoltServerProcessorInstrumentation(),
33+
new H2cServerTaskInstrumentation(),
34+
new TripleServerInstrumentation(),
35+
new ProviderProxyInvokerInstrumentation());
3036
}
3137
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package datadog.trace.instrumentation.sofarpc;
2+
3+
/** Thread-local carrier for the SOFA RPC transport protocol name. */
4+
public final class SofaRpcProtocolContext {
5+
6+
private static final ThreadLocal<String> PROTOCOL = new ThreadLocal<>();
7+
8+
private SofaRpcProtocolContext() {}
9+
10+
public static void set(String protocol) {
11+
PROTOCOL.set(protocol);
12+
}
13+
14+
public static String get() {
15+
return PROTOCOL.get();
16+
}
17+
18+
public static void clear() {
19+
PROTOCOL.remove();
20+
}
21+
}

dd-java-agent/instrumentation/sofarpc/sofarpc-5.0/src/main/java/datadog/trace/instrumentation/sofarpc/SofaRpcServerDecorator.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public AgentSpan afterStart(AgentSpan span) {
4141
}
4242

4343
public AgentSpan onRequest(AgentSpan span, SofaRequest request) {
44+
span.setTag("rpc.system", "sofarpc");
4445
if (request == null) {
4546
return span;
4647
}
@@ -56,9 +57,18 @@ public AgentSpan onRequest(AgentSpan span, SofaRequest request) {
5657
}
5758

5859
public AgentSpan onResponse(AgentSpan span, SofaResponse response) {
59-
if (response != null && response.isError()) {
60+
if (response == null) {
61+
return span;
62+
}
63+
if (response.isError()) {
64+
// RPC-layer error (timeout, serialization failure, etc.)
6065
span.setError(true);
6166
span.setTag("error.message", response.getErrorMsg());
67+
} else if (response.getAppResponse() instanceof Throwable) {
68+
// Application exception: ProviderProxyInvoker catches it and stores in appResponse
69+
Throwable t = (Throwable) response.getAppResponse();
70+
span.setError(true);
71+
span.setTag("error.message", t.getMessage());
6272
}
6373
return span;
6474
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package datadog.trace.instrumentation.sofarpc;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
5+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
6+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
7+
8+
import datadog.trace.agent.tooling.Instrumenter;
9+
import net.bytebuddy.asm.Advice;
10+
11+
public class TripleServerInstrumentation
12+
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
13+
14+
@Override
15+
public String instrumentedType() {
16+
return "com.alipay.sofa.rpc.server.triple.UniqueIdInvoker";
17+
}
18+
19+
@Override
20+
public void methodAdvice(MethodTransformer transformer) {
21+
transformer.applyAdvice(
22+
isMethod()
23+
.and(named("invoke"))
24+
.and(takesArguments(1))
25+
.and(takesArgument(0, named("com.alipay.sofa.rpc.core.request.SofaRequest"))),
26+
getClass().getName() + "$InvokeAdvice");
27+
}
28+
29+
public static class InvokeAdvice {
30+
@Advice.OnMethodEnter(suppress = Throwable.class)
31+
public static void enter() {
32+
SofaRpcProtocolContext.set("tri");
33+
}
34+
35+
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
36+
public static void exit() {
37+
SofaRpcProtocolContext.clear();
38+
}
39+
}
40+
}

0 commit comments

Comments
 (0)