diff --git a/CHANGES.md b/CHANGES.md
index 717dae3830..0066b0a57b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -20,6 +20,7 @@ Release Notes.
* Add tags `mq.message.keys` and `mq.message.tags` for RocketMQ producer span
* Clean the trace context which injected into Pulsar MessageImpl after the instance recycled
* Fix In the higher version of mysql-connector-java 8x, there is an error in the value of db.instance.
+* Add support for KafkaClients 3.2.x
#### Documentation
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
index 1693585679..742fbe4edf 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
@@ -28,7 +28,7 @@
apm-kafka-plugin
- 2.0.1
+ 3.2.3
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/Kafka3ConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/Kafka3ConsumerInterceptor.java
new file mode 100644
index 0000000000..b99813d7a4
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/Kafka3ConsumerInterceptor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.internals.Fetch;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.Map;
+
+public class Kafka3ConsumerInterceptor extends KafkaConsumerInterceptor {
+ @SuppressWarnings({"unchecked"})
+ @Override
+ protected Map>> fetchRecords(Object retObj) {
+ return ((Fetch) retObj).records();
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
index 01f7febf1b..14a7728cef 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java
@@ -61,7 +61,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
if (ret == null) {
return ret;
}
- Map>> records = (Map>>) ret;
+ Map>> records = fetchRecords(ret);
//
// The entry span will only be created when the consumer received at least one message.
//
@@ -100,6 +100,11 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
return ret;
}
+ @SuppressWarnings({"unchecked"})
+ protected Map>> fetchRecords(Object retObj) {
+ return (Map>>) retObj;
+ }
+
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class>[] argumentsTypes, Throwable t) {
diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
index 507e367317..e29996586c 100644
--- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java
@@ -24,7 +24,10 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import java.util.Map;
+
import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.returns;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
@@ -45,6 +48,7 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithConsumerConfigInterceptPoint";
public static final String MAP_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithMapInterceptPoint";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
+ public static final String INTERCEPTOR_CLASS_KAFKA3_2 = "org.apache.skywalking.apm.plugin.kafka.Kafka3ConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
@@ -91,7 +95,8 @@ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher getMethodsMatcher() {
- return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD));
+ // targeting Kafka Client < 3.2
+ return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
}
@Override
@@ -104,6 +109,23 @@ public boolean isOverrideArgs() {
return false;
}
},
+ new InstanceMethodsInterceptPoint() {
+ @Override
+ public ElementMatcher getMethodsMatcher() {
+ // targeting Kafka Client >= 3.2
+ return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
+ }
+
+ @Override
+ public String getMethodsInterceptor() {
+ return INTERCEPTOR_CLASS_KAFKA3_2;
+ }
+
+ @Override
+ public boolean isOverrideArgs() {
+ return false;
+ }
+ },
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher getMethodsMatcher() {
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 1d616863ae..9796544607 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -66,7 +66,7 @@ metrics based on the tracing data.
* [JSONRPC4J](https://github.com/briandilley/jsonrpc4j) 1.2.0 -> 1.6
* MQ
* [RocketMQ](https://github.com/apache/rocketmq) 4.x
- * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.8.0
+ * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 3.x-> 5.x
diff --git a/test/plugin/scenarios/kafka-scenario/support-version.list b/test/plugin/scenarios/kafka-scenario/support-version.list
index 51b8ffdc4e..1ffc63cb11 100644
--- a/test/plugin/scenarios/kafka-scenario/support-version.list
+++ b/test/plugin/scenarios/kafka-scenario/support-version.list
@@ -25,3 +25,6 @@
2.6.1
2.7.0
2.8.0
+3.0.2
+3.1.2
+3.2.3