Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Release Notes.
* Add tags `mq.message.keys` and `mq.message.tags` for RocketMQ producer span
* Clean the trace context which injected into Pulsar MessageImpl after the instance recycled
* Fix In the higher version of mysql-connector-java 8x, there is an error in the value of db.instance.
* Add support for KafkaClients 3.2.x

#### Documentation

Expand Down
2 changes: 1 addition & 1 deletion apm-sniffer/apm-sdk-plugin/kafka-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<artifactId>apm-kafka-plugin</artifactId>

<properties>
<kafka-clients.version>2.0.1</kafka-clients.version>
<kafka-clients.version>3.2.3</kafka-clients.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.apm.plugin.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.common.TopicPartition;

import java.util.List;
import java.util.Map;

public class Kafka3ConsumerInterceptor extends KafkaConsumerInterceptor {
@SuppressWarnings({"unchecked"})
@Override
protected Map<TopicPartition, List<ConsumerRecord<?, ?>>> fetchRecords(Object retObj) {
return ((Fetch) retObj).records();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
if (ret == null) {
return ret;
}
Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = (Map<TopicPartition, List<ConsumerRecord<?, ?>>>) ret;
Map<TopicPartition, List<ConsumerRecord<?, ?>>> records = fetchRecords(ret);
//
// The entry span will only be created when the consumer received at least one message.
//
Expand Down Expand Up @@ -100,6 +100,11 @@ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allA
return ret;
}

@SuppressWarnings({"unchecked"})
protected Map<TopicPartition, List<ConsumerRecord<?, ?>>> fetchRecords(Object retObj) {
return (Map<TopicPartition, List<ConsumerRecord<?, ?>>>) retObj;
}

@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;

import java.util.Map;

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;

Expand All @@ -45,6 +48,7 @@ public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {
public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithConsumerConfigInterceptPoint";
public static final String MAP_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConstructorWithMapInterceptPoint";
public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
public static final String INTERCEPTOR_CLASS_KAFKA3_2 = "org.apache.skywalking.apm.plugin.kafka.Kafka3ConsumerInterceptor";
public static final String ENHANCE_METHOD = "pollOnce";
public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
Expand Down Expand Up @@ -91,7 +95,8 @@ public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD));
// targeting Kafka Client < 3.2
return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD).and(returns(Map.class)));
}

@Override
Expand All @@ -104,6 +109,23 @@ public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
// targeting Kafka Client >= 3.2
return named(ENHANCE_COMPATIBLE_METHOD).and(returns(named("org.apache.kafka.clients.consumer.internals.Fetch")));
}

@Override
public String getMethodsInterceptor() {
return INTERCEPTOR_CLASS_KAFKA3_2;
}

@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
Expand Down
2 changes: 1 addition & 1 deletion docs/en/setup/service-agent/java-agent/Supported-list.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ metrics based on the tracing data.
* [JSONRPC4J](https://github.com/briandilley/jsonrpc4j) 1.2.0 -> 1.6
* MQ
* [RocketMQ](https://github.com/apache/rocketmq) 4.x
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 2.8.0
* [Kafka](http://kafka.apache.org) 0.11.0.0 -> 3.2.3
* [Spring-Kafka](https://github.com/spring-projects/spring-kafka) Spring Kafka Consumer 1.3.x -> 2.3.x (2.0.x and 2.1.x not tested and not recommended by [the official document](https://spring.io/projects/spring-kafka))
* [ActiveMQ](https://github.com/apache/activemq) 5.10.0 -> 5.15.4
* [RabbitMQ](https://www.rabbitmq.com/) 3.x-> 5.x
Expand Down
3 changes: 3 additions & 0 deletions test/plugin/scenarios/kafka-scenario/support-version.list
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@
2.6.1
2.7.0
2.8.0
3.0.2
3.1.2
3.2.3
Comment on lines +28 to +30
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you are adding supports to 3.x, not just 3.2.x. Please update the PR title.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed