From 4e5efa4c2ed918ab16a1bfa21be9f646a386fe2d Mon Sep 17 00:00:00 2001 From: Megrez Lu Date: Fri, 23 Dec 2022 11:24:15 +0800 Subject: [PATCH] add Kafka Clients 3.2.x support --- CHANGES.md | 1 + .../apm-sdk-plugin/kafka-plugin/pom.xml | 2 +- .../kafka/Kafka3ConsumerInterceptor.java | 34 +++++++++++++++++++ .../kafka/KafkaConsumerInterceptor.java | 7 +++- .../define/KafkaConsumerInstrumentation.java | 24 ++++++++++++- .../java-agent/Supported-list.md | 2 +- .../kafka-scenario/support-version.list | 3 ++ 7 files changed, 69 insertions(+), 4 deletions(-) create mode 100644 apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/Kafka3ConsumerInterceptor.java diff --git a/CHANGES.md b/CHANGES.md index 9c4cdb3b94..5742992bcd 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,7 @@ Release Notes. * Add [Micrometer Observation](https://github.com/micrometer-metrics/micrometer/) support * Add tags `mq.message.keys` and `mq.message.tags` for RocketMQ producer span * Clean the trace context which injected into Pulsar MessageImpl after the instance recycled +* Add support for KafkaClients 3.2.x #### Documentation diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml index 1693585679..742fbe4edf 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml @@ -28,7 +28,7 @@ apm-kafka-plugin - 2.0.1 + 3.2.3 diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/Kafka3ConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/Kafka3ConsumerInterceptor.java new file mode 100644 index 0000000000..b99813d7a4 --- /dev/null +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/Kafka3ConsumerInterceptor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.internals.Fetch; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; +import java.util.Map; + +public class Kafka3ConsumerInterceptor extends KafkaConsumerInterceptor { + @SuppressWarnings({"unchecked"}) + @Override + protected Map>> fetchRecords(Object retObj) { + return ((Fetch) retObj).records(); + } +} diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java index 01f7febf1b..14a7728cef 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java @@ -61,7 +61,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA if (ret == null) { return ret; } - Map>> records = (Map>>) ret; + Map>> records = fetchRecords(ret); // // The entry span will only be created when the consumer received at least one message. // @@ -100,6 +100,11 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA return ret; } + @SuppressWarnings({"unchecked"}) + protected Map>> fetchRecords(Object retObj) { + return (Map>>) retObj; + } + @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class[] argumentsTypes, Throwable t) { diff --git a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java index 507e367317..e29996586c 100644 --- a/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java +++ b/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java @@ -24,7 +24,10 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint; import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch; +import java.util.Map; + import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType; import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName; @@ -45,6 +48,7 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation { public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithConsumerConfigInterceptPoint"; public static final String MAP_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithMapInterceptPoint"; public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor"; + public static final String INTERCEPTOR_CLASS_KAFKA3_2 = "org.apache.skywalking.apm.plugin.kafka.Kafka3ConsumerInterceptor"; public static final String ENHANCE_METHOD = "pollOnce"; public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches"; public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer"; @@ -91,7 +95,8 @@ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { new InstanceMethodsInterceptPoint() { @Override public ElementMatcher getMethodsMatcher() { - return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD)); + // targeting Kafka Client < 3.2 + return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class))); } @Override @@ -104,6 +109,23 @@ public boolean isOverrideArgs() { return false; } }, + new InstanceMethodsInterceptPoint() { + @Override + public ElementMatcher getMethodsMatcher() { + // targeting Kafka Client >= 3.2 + return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch"))); + } + + @Override + public String getMethodsInterceptor() { + return INTERCEPTOR_CLASS_KAFKA3_2; + } + + @Override + public boolean isOverrideArgs() { + return false; + } + }, new InstanceMethodsInterceptPoint() { @Override public ElementMatcher getMethodsMatcher() { diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md index 1d616863ae..9796544607 100644 --- a/docs/en/setup/service-agent/java-agent/Supported-list.md +++ b/docs/en/setup/service-agent/java-agent/Supported-list.md @@ -66,7 +66,7 @@ metrics based on the tracing data. * [JSONRPC4J](https://github.com/briandilley/jsonrpc4j) 1.2.0 -> 1.6 * MQ * [RocketMQ](https://github.com/apache/rocketmq) 4.x - * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.8.0 + * [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3 * [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka)) * [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4 * [RabbitMQ](https://www.rabbitmq.com/) 3.x-> 5.x diff --git a/test/plugin/scenarios/kafka-scenario/support-version.list b/test/plugin/scenarios/kafka-scenario/support-version.list index 51b8ffdc4e..1ffc63cb11 100644 --- a/test/plugin/scenarios/kafka-scenario/support-version.list +++ b/test/plugin/scenarios/kafka-scenario/support-version.list @@ -25,3 +25,6 @@ 2.6.1 2.7.0 2.8.0 +3.0.2 +3.1.2 +3.2.3