diff --git a/build.gradle b/build.gradle index 696e1c4311e3c..b1a737d8abc87 100644 --- a/build.gradle +++ b/build.gradle @@ -1697,7 +1697,7 @@ project(':connect:runtime') { compile libs.jettyServlet compile libs.jettyServlets compile libs.jettyClient - compile(libs.reflections) + compile(libs.classgraph) compile(libs.mavenArtifact) testCompile project(':clients').sourceSets.test.output diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 71192655f4f17..ce9204e86faf2 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -359,8 +359,7 @@ - - + @@ -403,7 +402,6 @@ - diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java index 86a53c75ced58..e46882c447eaf 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.connect.runtime.isolation; +import io.github.classgraph.ClassGraph; +import io.github.classgraph.ClassInfoList; +import io.github.classgraph.ScanResult; import org.apache.kafka.common.config.provider.ConfigProvider; import org.apache.kafka.connect.components.Versioned; import org.apache.kafka.connect.connector.Connector; @@ -25,15 +28,10 @@ import org.apache.kafka.connect.storage.HeaderConverter; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; -import org.reflections.Configuration; -import org.reflections.Reflections; -import org.reflections.ReflectionsException; -import org.reflections.scanners.SubTypesScanner; -import org.reflections.util.ClasspathHelper; -import org.reflections.util.ConfigurationBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; @@ -52,6 +50,7 @@ import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.ServiceLoader; @@ -215,9 +214,7 @@ private void initPluginLoader(String path) { if (CLASSPATH_NAME.equals(path)) { scanUrlsAndAddPlugins( getParent(), - ClasspathHelper.forJavaClassPath().toArray(new URL[0]), - null - ); + forJavaClassPath().toArray(new URL[0])); } else { Path pluginPath = Paths.get(path).toAbsolutePath(); // Update for exception handling @@ -241,6 +238,30 @@ private void initPluginLoader(String path) { } } + public static Collection forJavaClassPath() { + Collection urls = new ArrayList<>(); + String javaClassPath = System.getProperty("java.class.path"); + if (javaClassPath != null) { + for (String path : javaClassPath.split(File.pathSeparator)) { + try { + urls.add(new File(path).toURI().toURL()); + } catch (Exception e) { + log.debug("Could not get URL", e); + } + } + } + return distinctUrls(urls); + } + + //http://michaelscharf.blogspot.co.il/2006/11/javaneturlequals-and-hashcode-make.html + private static Collection distinctUrls(Collection urls) { + Map distinct = new LinkedHashMap<>(urls.size()); + for (URL url : urls) { + distinct.put(url.toExternalForm(), url); + } + return distinct.values(); + } + private void registerPlugin(Path pluginLocation) throws InstantiationException, IllegalAccessException, IOException { log.info("Loading plugin from: {}", pluginLocation); @@ -257,14 +278,11 @@ private void registerPlugin(Path pluginLocation) urls, this ); - scanUrlsAndAddPlugins(loader, urls, pluginLocation); + scanUrlsAndAddPlugins(loader, urls); } - private void scanUrlsAndAddPlugins( - ClassLoader loader, - URL[] urls, - Path pluginLocation - ) throws InstantiationException, IllegalAccessException { + private void scanUrlsAndAddPlugins(ClassLoader loader, URL[] urls) + throws InstantiationException, IllegalAccessException { PluginScanResult plugins = scanPluginPath(loader, urls); log.info("Registered loader: {}", loader); if (!plugins.isEmpty()) { @@ -326,46 +344,41 @@ private PluginScanResult scanPluginPath( ClassLoader loader, URL[] urls ) throws InstantiationException, IllegalAccessException { - ConfigurationBuilder builder = new ConfigurationBuilder(); - builder.setClassLoaders(new ClassLoader[]{loader}); - builder.addUrls(urls); - builder.setScanners(new SubTypesScanner()); - builder.useParallelExecutor(); - Reflections reflections = new InternalReflections(builder); - - return new PluginScanResult( - getPluginDesc(reflections, Connector.class, loader), - getPluginDesc(reflections, Converter.class, loader), - getPluginDesc(reflections, HeaderConverter.class, loader), - getPluginDesc(reflections, Transformation.class, loader), - getPluginDesc(reflections, Predicate.class, loader), - getServiceLoaderPluginDesc(ConfigProvider.class, loader), - getServiceLoaderPluginDesc(ConnectRestExtension.class, loader), - getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader) - ); + ClassGraph builder = new ClassGraph().enableClassInfo() + .overrideClassLoaders(loader) + .overrideClasspath(Arrays.asList(urls)) + .ignoreParentClassLoaders(); + try (ScanResult classGraph = builder.scan()) { + return new PluginScanResult( + getPluginDesc(classGraph, Connector.class, loader), + getPluginDesc(classGraph, Converter.class, loader), + getPluginDesc(classGraph, HeaderConverter.class, loader), + getPluginDesc(classGraph, Transformation.class, loader), + getPluginDesc(classGraph, Predicate.class, loader), + getServiceLoaderPluginDesc(ConfigProvider.class, loader), + getServiceLoaderPluginDesc(ConnectRestExtension.class, loader), + getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader) + ); + } } private Collection> getPluginDesc( - Reflections reflections, + ScanResult classGraph, Class klass, ClassLoader loader ) throws InstantiationException, IllegalAccessException { - Set> plugins; + ClassInfoList plugins; try { - plugins = reflections.getSubTypesOf(klass); - } catch (ReflectionsException e) { - log.debug("Reflections scanner could not find any classes for URLs: " + - reflections.getConfiguration().getUrls(), e); + plugins = classGraph.getSubclasses(klass.getName()); + } catch (Exception e) { + log.debug("Class scanner could not find any classes for URLs: " + + classGraph.getClasspathURLs(), e); return Collections.emptyList(); } Collection> result = new ArrayList<>(); - for (Class plugin : plugins) { - if (PluginUtils.isConcrete(plugin)) { - result.add(new PluginDesc<>(plugin, versionFor(plugin), loader)); - } else { - log.debug("Skipping {} as it is not concrete implementation", plugin); - } + for (Class plugin : plugins.getStandardClasses().loadClasses(klass)) { + result.add(new PluginDesc<>(plugin, versionFor(plugin), loader)); } return result; } @@ -438,27 +451,6 @@ private void addAliases(Collection> plugins) { } } - private static class InternalReflections extends Reflections { - - public InternalReflections(Configuration configuration) { - super(configuration); - } - - // When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException - // as RuntimeException. Override the scan behavior to emulate the singled-threaded logic. - @Override - protected void scan(URL url) { - try { - super.scan(url); - } catch (ReflectionsException e) { - Logger log = Reflections.log; - if (log != null && log.isWarnEnabled()) { - log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e); - } - } - } - } - @Override public URL getResource(String name) { if (serviceLoaderManifestForPlugin(name)) { diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 57fc4fd325afd..db78e5eeb65fd 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -100,7 +100,7 @@ versions += [ mockito: "3.3.3", owaspDepCheckPlugin: "5.3.2.1", powermock: "2.0.7", - reflections: "0.9.12", + classgraph: "4.8.80", rocksDB: "5.18.4", scalaCollectionCompat: "2.1.6", scalafmt: "1.5.1", @@ -171,7 +171,7 @@ libs += [ mockitoCore: "org.mockito:mockito-core:$versions.mockito", powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock", powermockEasymock: "org.powermock:powermock-api-easymock:$versions.powermock", - reflections: "org.reflections:reflections:$versions.reflections", + classgraph: "io.github.classgraph:classgraph:$versions.classgraph", rocksDBJni: "org.rocksdb:rocksdbjni:$versions.rocksDB", scalaCollectionCompat: "org.scala-lang.modules:scala-collection-compat_$versions.baseScala:$versions.scalaCollectionCompat", scalaJava8Compat: "org.scala-lang.modules:scala-java8-compat_$versions.baseScala:$versions.scalaJava8Compat",