diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index 7e02f82b0703..d7e52fa40e1f 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -48,6 +48,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.common.jackson.JacksonUtils; +import org.apache.druid.query.lookup.LookupModule; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexMergerV9; @@ -83,7 +84,7 @@ */ public class HadoopDruidIndexerConfig { - private static final Injector injector; + static final Injector injector; public static final String CONFIG_PROPERTY = "druid.indexer.config"; public static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode"); @@ -115,9 +116,9 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HadoopKerberosConfig.class); } }, - new IndexingHadoopModule() - ) - ); + new IndexingHadoopModule(), + new LookupModule() + ), true); JSON_MAPPER = injector.getInstance(ObjectMapper.class); INDEX_IO = injector.getInstance(IndexIO.class); INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 0fb017e967d4..06d984e20284 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -38,9 +38,11 @@ import org.apache.druid.indexer.hadoop.SegmentInputRow; import org.apache.druid.indexer.path.DatasourcePathSpec; import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.ParseException; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -310,6 +312,7 @@ public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper typeHelperMap; @@ -319,6 +322,13 @@ protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); + lifecycle = config.injector.getInstance(Lifecycle.class); + try { + lifecycle.start(); + } + catch (Throwable t) { + throw new IOE(t, "Error when setup lifecycle."); + } aggregators = config.getSchema().getDataSchema().getAggregators(); if (DatasourcePathSpec.checkIfReindexingAndIsUseAggEnabled(config.getSchema().getIOConfig().getPathSpec())) { @@ -338,6 +348,15 @@ protected void setup(Context context) .getDimensionsSpec()); } + @Override + protected void cleanup(Context context) throws IOException, InterruptedException + { + super.cleanup(context); + if (lifecycle != null) { + lifecycle.stop(); + } + } + @Override protected void innerMap( InputRow inputRow, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java index 44d68b1e395b..4429075eafe6 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/JobHelper.java @@ -121,9 +121,9 @@ public static void authenticate(HadoopDruidIndexerConfig config) } /** - * Uploads jar files to hdfs and configures the classpath. + * Uploads resource files to hdfs and configures the classpath. * Snapshot jar files are uploaded to intermediateClasspath and not shared across multiple jobs. - * Non-Snapshot jar files are uploaded to a distributedClasspath and shared across multiple jobs. + * Non-Snapshot resource files are uploaded to a distributedClasspath and shared across multiple jobs. * * @param distributedClassPath classpath shared across multiple jobs * @param intermediateClassPath classpath exclusive for this job. used to upload SNAPSHOT jar files. @@ -143,7 +143,7 @@ public static void setupClasspath( classpathProperty = System.getProperty("java.class.path"); } - String[] jarFiles = classpathProperty.split(File.pathSeparator); + String[] resourceFiles = classpathProperty.split(File.pathSeparator); final Configuration conf = job.getConfiguration(); final FileSystem fs = distributedClassPath.getFileSystem(conf); @@ -152,17 +152,18 @@ public static void setupClasspath( return; } - for (String jarFilePath : jarFiles) { + for (String resourceFilePath : resourceFiles) { - final File jarFile = new File(jarFilePath); - if (jarFile.getName().endsWith(".jar")) { + final File resourceFile = new File(resourceFilePath); + if (resourceFile.getName().endsWith(".jar") || + resourceFile.getName().endsWith(".properties")) { try { RetryUtils.retry( () -> { - if (isSnapshot(jarFile)) { - addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job); + if (isSnapshot(resourceFile)) { + addSnapshotJarToClassPath(resourceFile, intermediateClassPath, fs, job); } else { - addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job); + addResourceToClassPath(resourceFile, distributedClassPath, intermediateClassPath, fs, job); } return true; }, @@ -173,6 +174,22 @@ public static void setupClasspath( catch (Exception e) { throw new RuntimeException(e); } + } else if (resourceFile.isDirectory()) { + for (File propFile : resourceFile.listFiles((dir, name) -> name.endsWith(".properties"))) { + try { + RetryUtils.retry( + () -> { + addResourceToClassPath(propFile, distributedClassPath, intermediateClassPath, fs, job); + return true; + }, + shouldRetryPredicate(), + NUM_RETRIES + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } } } } @@ -195,8 +212,8 @@ public boolean apply(Throwable input) }; } - static void addJarToClassPath( - File jarFile, + static void addResourceToClassPath( + File resourceFile, Path distributedClassPath, Path intermediateClassPath, FileSystem fs, @@ -209,15 +226,15 @@ static void addJarToClassPath( fs.mkdirs(distributedClassPath); // Non-snapshot jar files are uploaded to the shared classpath. - final Path hdfsPath = new Path(distributedClassPath, jarFile.getName()); - if (shouldUploadOrReplace(jarFile, hdfsPath, fs)) { + final Path hdfsPath = new Path(distributedClassPath, resourceFile.getName()); + if (shouldUploadOrReplace(resourceFile, hdfsPath, fs)) { // Muliple jobs can try to upload the jar here, // to avoid them from overwriting files, first upload to intermediateClassPath and then rename to the distributedClasspath. - final Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName()); - uploadJar(jarFile, intermediateHdfsPath, fs); + final Path intermediateHdfsPath = new Path(intermediateClassPath, resourceFile.getName()); + uploadResource(resourceFile, intermediateHdfsPath, fs); IOException exception = null; try { - log.info("Renaming jar to path[%s]", hdfsPath); + log.info("Renaming resource to path[%s]", hdfsPath); fs.rename(intermediateHdfsPath, hdfsPath); if (!fs.exists(hdfsPath)) { throw new IOE("File does not exist even after moving from[%s] to [%s]", intermediateHdfsPath, hdfsPath); @@ -227,7 +244,7 @@ static void addJarToClassPath( // rename failed, possibly due to race condition. check if some other job has uploaded the jar file. try { if (!fs.exists(hdfsPath)) { - log.error(e, "IOException while Renaming jar file"); + log.error(e, "IOException while Renaming resource file"); exception = e; } } @@ -284,16 +301,16 @@ static void addSnapshotJarToClassPath( final Path hdfsPath = new Path(intermediateClassPath, jarFile.getName()); // Prevent uploading same file multiple times in same run. if (!fs.exists(hdfsPath)) { - uploadJar(jarFile, hdfsPath, fs); + uploadResource(jarFile, hdfsPath, fs); } job.addFileToClassPath(hdfsPath); } - static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException + static void uploadResource(File resourceFile, final Path path, final FileSystem fs) throws IOException { - log.info("Uploading jar to path[%s]", path); + log.info("Uploading resource to path[%s]", path); try (OutputStream os = fs.create(path)) { - Files.asByteSource(jarFile).copyTo(os); + Files.asByteSource(resourceFile).copyTo(os); } } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java index d304f342c25d..aff70340d60e 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HdfsClasspathSetupTest.java @@ -135,7 +135,7 @@ public void testAddNonSnapshotJarToClasspath() throws IOException { Job job = Job.getInstance(conf, "test-job"); DistributedFileSystem fs = miniCluster.getFileSystem(); - JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job); + JobHelper.addResourceToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job); Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName()); // check file gets uploaded to final HDFS path Assert.assertTrue(fs.exists(expectedJarPath)); @@ -180,7 +180,7 @@ public Boolean call() throws Exception int id = barrier.await(); Job job = Job.getInstance(conf, "test-job-" + id); Path intermediatePathForJob = new Path(intermediatePath, "job-" + id); - JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job); + JobHelper.addResourceToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job); // check file gets uploaded to final HDFS path Assert.assertTrue(fs.exists(expectedJarPath)); // check that the intermediate file is not present diff --git a/server/src/main/java/org/apache/druid/initialization/Initialization.java b/server/src/main/java/org/apache/druid/initialization/Initialization.java index 1bbaee84d166..2e8a978fffa3 100644 --- a/server/src/main/java/org/apache/druid/initialization/Initialization.java +++ b/server/src/main/java/org/apache/druid/initialization/Initialization.java @@ -146,6 +146,29 @@ public static Collection getFromExtensions(ExtensionsConfig config, Class return (Collection) modules; } + /** + * Look for implementations for the given class from both classpath and extensions directory, using {@link + * ServiceLoader}. A user should never put the same two extensions in classpath and extensions directory, if he/she + * does that, the one that is in the classpath will be loaded, the other will be ignored. + * + * @param config Extensions configuration + * @param serviceClass The class to look the implementations of (e.g., DruidModule) + * @param onlyLoadFromClassLoader Whether only load extensions from classloader + * + * @return A collection that contains implementations (of distinct concrete classes) of the given class. The order of + * elements in the returned collection is not specified and not guaranteed to be the same for different calls to + * getFromExtensions(). + */ + public static synchronized Collection getFromExtensions( + ExtensionsConfig config, + Class serviceClass, + boolean onlyLoadFromClassLoader) + { + Collection modulesToLoad = new ServiceLoadingFromExtensions<>(config, serviceClass, onlyLoadFromClassLoader).implsToLoad; + extensionsMap.put(serviceClass, modulesToLoad); + return modulesToLoad; + } + private static class ServiceLoadingFromExtensions { private final ExtensionsConfig extensionsConfig; @@ -154,13 +177,23 @@ private static class ServiceLoadingFromExtensions private final Set implClassNamesToLoad = new HashSet<>(); private ServiceLoadingFromExtensions(ExtensionsConfig extensionsConfig, Class serviceClass) + { + this(extensionsConfig, serviceClass, false); + } + + private ServiceLoadingFromExtensions( + ExtensionsConfig extensionsConfig, + Class serviceClass, + boolean onlyLoadFromClassLoader) { this.extensionsConfig = extensionsConfig; this.serviceClass = serviceClass; if (extensionsConfig.searchCurrentClassloader()) { addAllFromCurrentClassLoader(); } - addAllFromFileSystem(); + if (!onlyLoadFromClassLoader) { + addAllFromFileSystem(); + } } private void addAllFromCurrentClassLoader() @@ -361,7 +394,10 @@ public boolean accept(File dir, String name) } } - public static Injector makeInjectorWithModules(final Injector baseInjector, Iterable modules) + public static Injector makeInjectorWithModules( + final Injector baseInjector, + Iterable modules, + boolean onlyLoadExtensionFromClassLoader) { final ModuleList defaultModules = new ModuleList(baseInjector); defaultModules.addModules( @@ -412,13 +448,18 @@ public static Injector makeInjectorWithModules(final Injector baseInjector, Iter ModuleList extensionModules = new ModuleList(baseInjector); final ExtensionsConfig config = baseInjector.getInstance(ExtensionsConfig.class); - for (DruidModule module : Initialization.getFromExtensions(config, DruidModule.class)) { + for (DruidModule module : Initialization.getFromExtensions(config, DruidModule.class, onlyLoadExtensionFromClassLoader)) { extensionModules.addModule(module); } return Guice.createInjector(Modules.override(intermediateModules).with(extensionModules.getModules())); } + public static Injector makeInjectorWithModules(final Injector baseInjector, Iterable modules) + { + return makeInjectorWithModules(baseInjector, modules, false); + } + private static class ModuleList { private final Injector baseInjector;