Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,7 @@ project(':connect:runtime') {
compile libs.jettyServlet
compile libs.jettyServlets
compile libs.jettyClient
compile(libs.reflections)
compile(libs.classgraph)
compile(libs.mavenArtifact)

testCompile project(':clients').sourceSets.test.output
Expand Down
4 changes: 1 addition & 3 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,7 @@

<subpackage name="runtime">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.reflections"/>
<allow pkg="org.reflections.util"/>
<allow pkg="io.github.classgraph"/>
<allow pkg="javax.crypto"/>
<allow pkg="org.eclipse.jetty.util" />

Expand Down Expand Up @@ -403,7 +402,6 @@

<subpackage name="util">
<allow pkg="org.apache.kafka.connect" />
<allow pkg="org.reflections.vfs" />
<!-- for annotations to avoid code duplication -->
<allow pkg="com.fasterxml.jackson.annotation" />
<allow pkg="com.fasterxml.jackson.databind" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.kafka.connect.runtime.isolation;

import io.github.classgraph.ClassGraph;
import io.github.classgraph.ClassInfoList;
import io.github.classgraph.ScanResult;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.Connector;
Expand All @@ -25,15 +28,10 @@
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.reflections.Configuration;
import org.reflections.Reflections;
import org.reflections.ReflectionsException;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -52,6 +50,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
Expand Down Expand Up @@ -215,9 +214,7 @@ private void initPluginLoader(String path) {
if (CLASSPATH_NAME.equals(path)) {
scanUrlsAndAddPlugins(
getParent(),
ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
null
);
forJavaClassPath().toArray(new URL[0]));
} else {
Path pluginPath = Paths.get(path).toAbsolutePath();
// Update for exception handling
Expand All @@ -241,6 +238,30 @@ private void initPluginLoader(String path) {
}
}

public static Collection<URL> forJavaClassPath() {
Collection<URL> urls = new ArrayList<>();
String javaClassPath = System.getProperty("java.class.path");
if (javaClassPath != null) {
for (String path : javaClassPath.split(File.pathSeparator)) {
try {
urls.add(new File(path).toURI().toURL());
} catch (Exception e) {
log.debug("Could not get URL", e);
}
}
}
return distinctUrls(urls);
}

//http://michaelscharf.blogspot.co.il/2006/11/javaneturlequals-and-hashcode-make.html
private static Collection<URL> distinctUrls(Collection<URL> urls) {
Map<String, URL> distinct = new LinkedHashMap<>(urls.size());
for (URL url : urls) {
distinct.put(url.toExternalForm(), url);
}
return distinct.values();
}

private void registerPlugin(Path pluginLocation)
throws InstantiationException, IllegalAccessException, IOException {
log.info("Loading plugin from: {}", pluginLocation);
Expand All @@ -257,14 +278,11 @@ private void registerPlugin(Path pluginLocation)
urls,
this
);
scanUrlsAndAddPlugins(loader, urls, pluginLocation);
scanUrlsAndAddPlugins(loader, urls);
}

private void scanUrlsAndAddPlugins(
ClassLoader loader,
URL[] urls,
Path pluginLocation
) throws InstantiationException, IllegalAccessException {
private void scanUrlsAndAddPlugins(ClassLoader loader, URL[] urls)
throws InstantiationException, IllegalAccessException {
PluginScanResult plugins = scanPluginPath(loader, urls);
log.info("Registered loader: {}", loader);
if (!plugins.isEmpty()) {
Expand Down Expand Up @@ -326,46 +344,41 @@ private PluginScanResult scanPluginPath(
ClassLoader loader,
URL[] urls
) throws InstantiationException, IllegalAccessException {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls);
builder.setScanners(new SubTypesScanner());
builder.useParallelExecutor();
Reflections reflections = new InternalReflections(builder);

return new PluginScanResult(
getPluginDesc(reflections, Connector.class, loader),
getPluginDesc(reflections, Converter.class, loader),
getPluginDesc(reflections, HeaderConverter.class, loader),
getPluginDesc(reflections, Transformation.class, loader),
getPluginDesc(reflections, Predicate.class, loader),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
ClassGraph builder = new ClassGraph().enableClassInfo()
.overrideClassLoaders(loader)
.overrideClasspath(Arrays.asList(urls))
.ignoreParentClassLoaders();
try (ScanResult classGraph = builder.scan()) {
return new PluginScanResult(
getPluginDesc(classGraph, Connector.class, loader),
getPluginDesc(classGraph, Converter.class, loader),
getPluginDesc(classGraph, HeaderConverter.class, loader),
getPluginDesc(classGraph, Transformation.class, loader),
getPluginDesc(classGraph, Predicate.class, loader),
getServiceLoaderPluginDesc(ConfigProvider.class, loader),
getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
);
}
}

private <T> Collection<PluginDesc<T>> getPluginDesc(
Reflections reflections,
ScanResult classGraph,
Class<T> klass,
ClassLoader loader
) throws InstantiationException, IllegalAccessException {
Set<Class<? extends T>> plugins;
ClassInfoList plugins;
try {
plugins = reflections.getSubTypesOf(klass);
} catch (ReflectionsException e) {
log.debug("Reflections scanner could not find any classes for URLs: " +
reflections.getConfiguration().getUrls(), e);
plugins = classGraph.getSubclasses(klass.getName());
} catch (Exception e) {
log.debug("Class scanner could not find any classes for URLs: " +
classGraph.getClasspathURLs(), e);
return Collections.emptyList();
}

Collection<PluginDesc<T>> result = new ArrayList<>();
for (Class<? extends T> plugin : plugins) {
if (PluginUtils.isConcrete(plugin)) {
result.add(new PluginDesc<>(plugin, versionFor(plugin), loader));
} else {
log.debug("Skipping {} as it is not concrete implementation", plugin);
}
for (Class<? extends T> plugin : plugins.getStandardClasses().loadClasses(klass)) {
result.add(new PluginDesc<>(plugin, versionFor(plugin), loader));
}
return result;
}
Expand Down Expand Up @@ -438,27 +451,6 @@ private <S> void addAliases(Collection<PluginDesc<S>> plugins) {
}
}

private static class InternalReflections extends Reflections {

public InternalReflections(Configuration configuration) {
super(configuration);
}

// When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException
// as RuntimeException. Override the scan behavior to emulate the singled-threaded logic.
@Override
protected void scan(URL url) {
try {
super.scan(url);
} catch (ReflectionsException e) {
Logger log = Reflections.log;
if (log != null && log.isWarnEnabled()) {
log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e);
}
}
}
}

@Override
public URL getResource(String name) {
if (serviceLoaderManifestForPlugin(name)) {
Expand Down
4 changes: 2 additions & 2 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ versions += [
mockito: "3.3.3",
owaspDepCheckPlugin: "5.3.2.1",
powermock: "2.0.7",
reflections: "0.9.12",
classgraph: "4.8.80",
rocksDB: "5.18.4",
scalaCollectionCompat: "2.1.6",
scalafmt: "1.5.1",
Expand Down Expand Up @@ -171,7 +171,7 @@ libs += [
mockitoCore: "org.mockito:mockito-core:$versions.mockito",
powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock",
reflections: "org.reflections:reflections:$versions.reflections",
classgraph: "io.github.classgraph:classgraph:$versions.classgraph",
rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB",
scalaCollectionCompat: "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat",
scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",
Expand Down