Skip to content

Commit ea1c204

Browse files
add Kafka Clients 3.x support (#429)
1 parent 37a880e commit ea1c204

7 files changed

Lines changed: 69 additions & 4 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Release Notes.
2020
* Add tags `mq.message.keys` and `mq.message.tags` for RocketMQ producer span
2121
* Clean the trace context which injected into Pulsar MessageImpl after the instance recycled
2222
* Fix In the higher version of mysql-connector-java 8x, there is an error in the value of db.instance.
23+
* Add support for KafkaClients 3.2.x
2324

2425
#### Documentation
2526

apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<artifactId>apm-kafka-plugin</artifactId>
2929

3030
<properties>
31-
<kafka-clients.version>2.0.1</kafka-clients.version>
31+
<kafka-clients.version>3.2.3</kafka-clients.version>
3232
</properties>
3333

3434
<dependencies>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.kafka;
20+
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.internals.Fetch;
23+
import org.apache.kafka.common.TopicPartition;
24+
25+
import java.util.List;
26+
import java.util.Map;
27+
28+
public class Kafka3ConsumerInterceptor extends KafkaConsumerInterceptor {
29+
@SuppressWarnings({"unchecked"})
30+
@Override
31+
protected Map<TopicPartition, List<ConsumerRecord<?, ?>>> fetchRecords(Object retObj) {
32+
return ((Fetch) retObj).records();
33+
}
34+
}

apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
6161
if (ret == null) {
6262
return ret;
6363
}
64-
Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>) ret;
64+
Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = fetchRecords(ret);
6565
//
6666
// The entry span will only be created when the consumer received at least one message.
6767
//
@@ -100,6 +100,11 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
100100
return ret;
101101
}
102102

103+
@SuppressWarnings({"unchecked"})
104+
protected Map<TopicPartition, List<ConsumerRecord<?, ?>>> fetchRecords(Object retObj) {
105+
return (Map<TopicPartition, List<ConsumerRecord<?, ?>>>) retObj;
106+
}
107+
103108
@Override
104109
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
105110
Class<?>[] argumentsTypes, Throwable t) {

apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@
2424
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
2525
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
2626

27+
import java.util.Map;
28+
2729
import static net.bytebuddy.matcher.ElementMatchers.named;
30+
import static net.bytebuddy.matcher.ElementMatchers.returns;
2831
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
2932
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
3033

@@ -45,6 +48,7 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
4548
public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithConsumerConfigInterceptPoint";
4649
public static final String MAP_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithMapInterceptPoint";
4750
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
51+
public static final String INTERCEPTOR_CLASS_KAFKA3_2 = "org.apache.skywalking.apm.plugin.kafka.Kafka3ConsumerInterceptor";
4852
public static final String ENHANCE_METHOD = "pollOnce";
4953
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
5054
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
@@ -91,7 +95,8 @@ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
9195
new InstanceMethodsInterceptPoint() {
9296
@Override
9397
public ElementMatcher<MethodDescription> getMethodsMatcher() {
94-
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD));
98+
// targeting Kafka Client < 3.2
99+
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
95100
}
96101

97102
@Override
@@ -104,6 +109,23 @@ public boolean isOverrideArgs() {
104109
return false;
105110
}
106111
},
112+
new InstanceMethodsInterceptPoint() {
113+
@Override
114+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
115+
// targeting Kafka Client >= 3.2
116+
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
117+
}
118+
119+
@Override
120+
public String getMethodsInterceptor() {
121+
return INTERCEPTOR_CLASS_KAFKA3_2;
122+
}
123+
124+
@Override
125+
public boolean isOverrideArgs() {
126+
return false;
127+
}
128+
},
107129
new InstanceMethodsInterceptPoint() {
108130
@Override
109131
public ElementMatcher<MethodDescription> getMethodsMatcher() {

docs/en/setup/service-agent/java-agent/Supported-list.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ metrics based on the tracing data.
6666
* [JSONRPC4J](https://github.com/briandilley/jsonrpc4j) 1.2.0 -> 1.6
6767
* MQ
6868
* [RocketMQ](https://github.com/apache/rocketmq) 4.x
69-
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.8.0
69+
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3
7070
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
7171
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
7272
* [RabbitMQ](https://www.rabbitmq.com/) 3.x-> 5.x

test/plugin/scenarios/kafka-scenario/support-version.list

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@
2525
2.6.1
2626
2.7.0
2727
2.8.0
28+
3.0.2
29+
3.1.2
30+
3.2.3

0 commit comments

Comments
 (0)