Skip to content

Commit 1a7d2f7

Browse files
authored
[fix][plugin][pulsar] Clean message context after message is recycled to prevent memory leak. (#405)
1 parent 4f9c2d8 commit 1a7d2f7

2 files changed

Lines changed: 66 additions & 2 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.pulsar.common;
20+
21+
import java.lang.reflect.Method;
22+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
25+
26+
public class MessageRecycleMethodInterceptor implements InstanceMethodsAroundInterceptor {
27+
@Override
28+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
29+
objInst.setSkyWalkingDynamicField(null);
30+
}
31+
32+
@Override
33+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
34+
return ret;
35+
}
36+
37+
@Override
38+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
39+
// NOOP
40+
}
41+
}

apm-sniffer/apm-sdk-plugin/pulsar-common/src/main/java/org/apache/skywalking/apm/plugin/pulsar/common/define/BaseMessageInstrumentation.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
package org.apache.skywalking.apm.plugin.pulsar.common.define;
2020

21+
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
22+
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
23+
import static net.bytebuddy.matcher.ElementMatchers.named;
24+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
2125
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
2226

2327
import net.bytebuddy.description.method.MethodDescription;
@@ -42,10 +46,12 @@ public class BaseMessageInstrumentation extends ClassInstanceMethodsEnhancePlugi
4246

4347
public static final String ENHANCE_CLASS = "org.apache.pulsar.client.impl.MessageImpl";
4448
public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.common.MessageConstructorInterceptor";
49+
public static final String ENHANCE_METHOD = "recycle";
50+
public static final String METHOD_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.pulsar.common.MessageRecycleMethodInterceptor";
4551

4652
@Override
4753
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
48-
return new ConstructorInterceptPoint[] {
54+
return new ConstructorInterceptPoint[]{
4955
new ConstructorInterceptPoint() {
5056
@Override
5157
public ElementMatcher<MethodDescription> getConstructorMatcher() {
@@ -62,7 +68,24 @@ public String getConstructorInterceptor() {
6268

6369
@Override
6470
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
65-
return new InstanceMethodsInterceptPoint[0];
71+
return new InstanceMethodsInterceptPoint[]{
72+
new InstanceMethodsInterceptPoint() {
73+
@Override
74+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
75+
return isMethod().and(named(ENHANCE_METHOD)).and(isPublic()).and(takesArguments(0));
76+
}
77+
78+
@Override
79+
public String getMethodsInterceptor() {
80+
return METHOD_INTERCEPTOR_CLASS;
81+
}
82+
83+
@Override
84+
public boolean isOverrideArgs() {
85+
return false;
86+
}
87+
}
88+
};
6689
}
6790

6891
@Override

0 commit comments

Comments
 (0)