Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
Expand Down Expand Up @@ -83,7 +84,7 @@
*/
public class HadoopDruidIndexerConfig
{
private static final Injector injector;
static final Injector injector;

public static final String CONFIG_PROPERTY = "druid.indexer.config";
public static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode");
Expand Down Expand Up @@ -115,9 +116,9 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HadoopKerberosConfig.class);
}
},
new IndexingHadoopModule()
)
);
new IndexingHadoopModule(),
new LookupModule()
), true);
JSON_MAPPER = injector.getInstance(ObjectMapper.class);
INDEX_IO = injector.getInstance(IndexIO.class);
INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.apache.druid.indexer.hadoop.SegmentInputRow;
import org.apache.druid.indexer.path.DatasourcePathSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.query.aggregation.AggregatorFactory;
Expand Down Expand Up @@ -310,6 +312,7 @@ public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesW
private static final HashFunction hashFunction = Hashing.murmur3_128();

private AggregatorFactory[] aggregators;
private Lifecycle lifecycle;

private AggregatorFactory[] aggsForSerializingSegmentInputRow;
private Map<String, InputRowSerde.IndexSerdeTypeHelper> typeHelperMap;
Expand All @@ -319,6 +322,13 @@ protected void setup(Context context)
throws IOException, InterruptedException
{
super.setup(context);
lifecycle = config.injector.getInstance(Lifecycle.class);
try {
lifecycle.start();
}
catch (Throwable t) {
throw new IOE(t, "Error when setup lifecycle.");
}
aggregators = config.getSchema().getDataSchema().getAggregators();

if (DatasourcePathSpec.checkIfReindexingAndIsUseAggEnabled(config.getSchema().getIOConfig().getPathSpec())) {
Expand All @@ -338,6 +348,15 @@ protected void setup(Context context)
.getDimensionsSpec());
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException
{
super.cleanup(context);
if (lifecycle != null) {
lifecycle.stop();
}
}

@Override
protected void innerMap(
InputRow inputRow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ public static void authenticate(HadoopDruidIndexerConfig config)
}

/**
* Uploads jar files to hdfs and configures the classpath.
* Uploads resource files to hdfs and configures the classpath.
* Snapshot jar files are uploaded to intermediateClasspath and not shared across multiple jobs.
* Non-Snapshot jar files are uploaded to a distributedClasspath and shared across multiple jobs.
* Non-Snapshot resource files are uploaded to a distributedClasspath and shared across multiple jobs.
*
* @param distributedClassPath classpath shared across multiple jobs
* @param intermediateClassPath classpath exclusive for this job. used to upload SNAPSHOT jar files.
Expand All @@ -143,7 +143,7 @@ public static void setupClasspath(
classpathProperty = System.getProperty("java.class.path");
}

String[] jarFiles = classpathProperty.split(File.pathSeparator);
String[] resourceFiles = classpathProperty.split(File.pathSeparator);

final Configuration conf = job.getConfiguration();
final FileSystem fs = distributedClassPath.getFileSystem(conf);
Expand All @@ -152,17 +152,18 @@ public static void setupClasspath(
return;
}

for (String jarFilePath : jarFiles) {
for (String resourceFilePath : resourceFiles) {

final File jarFile = new File(jarFilePath);
if (jarFile.getName().endsWith(".jar")) {
final File resourceFile = new File(resourceFilePath);
if (resourceFile.getName().endsWith(".jar") ||
resourceFile.getName().endsWith(".properties")) {
try {
RetryUtils.retry(
() -> {
if (isSnapshot(jarFile)) {
addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job);
if (isSnapshot(resourceFile)) {
addSnapshotJarToClassPath(resourceFile, intermediateClassPath, fs, job);
} else {
addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job);
addResourceToClassPath(resourceFile, distributedClassPath, intermediateClassPath, fs, job);
}
return true;
},
Expand All @@ -173,6 +174,22 @@ public static void setupClasspath(
catch (Exception e) {
throw new RuntimeException(e);
}
} else if (resourceFile.isDirectory()) {
for (File propFile : resourceFile.listFiles((dir, name) -> name.endsWith(".properties"))) {
try {
RetryUtils.retry(
() -> {
addResourceToClassPath(propFile, distributedClassPath, intermediateClassPath, fs, job);
return true;
},
shouldRetryPredicate(),
NUM_RETRIES
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}
Expand All @@ -195,8 +212,8 @@ public boolean apply(Throwable input)
};
}

static void addJarToClassPath(
File jarFile,
static void addResourceToClassPath(
File resourceFile,
Path distributedClassPath,
Path intermediateClassPath,
FileSystem fs,
Expand All @@ -209,15 +226,15 @@ static void addJarToClassPath(
fs.mkdirs(distributedClassPath);

// Non-snapshot jar files are uploaded to the shared classpath.
final Path hdfsPath = new Path(distributedClassPath, jarFile.getName());
if (shouldUploadOrReplace(jarFile, hdfsPath, fs)) {
final Path hdfsPath = new Path(distributedClassPath, resourceFile.getName());
if (shouldUploadOrReplace(resourceFile, hdfsPath, fs)) {
// Muliple jobs can try to upload the jar here,
// to avoid them from overwriting files, first upload to intermediateClassPath and then rename to the distributedClasspath.
final Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName());
uploadJar(jarFile, intermediateHdfsPath, fs);
final Path intermediateHdfsPath = new Path(intermediateClassPath, resourceFile.getName());
uploadResource(resourceFile, intermediateHdfsPath, fs);
IOException exception = null;
try {
log.info("Renaming jar to path[%s]", hdfsPath);
log.info("Renaming resource to path[%s]", hdfsPath);
fs.rename(intermediateHdfsPath, hdfsPath);
if (!fs.exists(hdfsPath)) {
throw new IOE("File does not exist even after moving from[%s] to [%s]", intermediateHdfsPath, hdfsPath);
Expand All @@ -227,7 +244,7 @@ static void addJarToClassPath(
// rename failed, possibly due to race condition. check if some other job has uploaded the jar file.
try {
if (!fs.exists(hdfsPath)) {
log.error(e, "IOException while Renaming jar file");
log.error(e, "IOException while Renaming resource file");
exception = e;
}
}
Expand Down Expand Up @@ -284,16 +301,16 @@ static void addSnapshotJarToClassPath(
final Path hdfsPath = new Path(intermediateClassPath, jarFile.getName());
// Prevent uploading same file multiple times in same run.
if (!fs.exists(hdfsPath)) {
uploadJar(jarFile, hdfsPath, fs);
uploadResource(jarFile, hdfsPath, fs);
}
job.addFileToClassPath(hdfsPath);
}

static void uploadJar(File jarFile, final Path path, final FileSystem fs) throws IOException
static void uploadResource(File resourceFile, final Path path, final FileSystem fs) throws IOException
{
log.info("Uploading jar to path[%s]", path);
log.info("Uploading resource to path[%s]", path);
try (OutputStream os = fs.create(path)) {
Files.asByteSource(jarFile).copyTo(os);
Files.asByteSource(resourceFile).copyTo(os);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void testAddNonSnapshotJarToClasspath() throws IOException
{
Job job = Job.getInstance(conf, "test-job");
DistributedFileSystem fs = miniCluster.getFileSystem();
JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job);
JobHelper.addResourceToClassPath(dummyJarFile, finalClasspath, intermediatePath, fs, job);
Path expectedJarPath = new Path(finalClasspath, dummyJarFile.getName());
// check file gets uploaded to final HDFS path
Assert.assertTrue(fs.exists(expectedJarPath));
Expand Down Expand Up @@ -180,7 +180,7 @@ public Boolean call() throws Exception
int id = barrier.await();
Job job = Job.getInstance(conf, "test-job-" + id);
Path intermediatePathForJob = new Path(intermediatePath, "job-" + id);
JobHelper.addJarToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job);
JobHelper.addResourceToClassPath(dummyJarFile, finalClasspath, intermediatePathForJob, fs, job);
// check file gets uploaded to final HDFS path
Assert.assertTrue(fs.exists(expectedJarPath));
// check that the intermediate file is not present
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,29 @@ public static <T> Collection<T> getFromExtensions(ExtensionsConfig config, Class
return (Collection<T>) modules;
}

/**
* Look for implementations for the given class from both classpath and extensions directory, using {@link
* ServiceLoader}. A user should never put the same two extensions in classpath and extensions directory, if he/she
* does that, the one that is in the classpath will be loaded, the other will be ignored.
*
* @param config Extensions configuration
* @param serviceClass The class to look the implementations of (e.g., DruidModule)
* @param onlyLoadFromClassLoader Whether only load extensions from classloader
*
* @return A collection that contains implementations (of distinct concrete classes) of the given class. The order of
* elements in the returned collection is not specified and not guaranteed to be the same for different calls to
* getFromExtensions().
*/
public static synchronized <T> Collection<T> getFromExtensions(
ExtensionsConfig config,
Class<T> serviceClass,
boolean onlyLoadFromClassLoader)
{
Collection<T> modulesToLoad = new ServiceLoadingFromExtensions<>(config, serviceClass, onlyLoadFromClassLoader).implsToLoad;
extensionsMap.put(serviceClass, modulesToLoad);
return modulesToLoad;
}

private static class ServiceLoadingFromExtensions<T>
{
private final ExtensionsConfig extensionsConfig;
Expand All @@ -154,13 +177,23 @@ private static class ServiceLoadingFromExtensions<T>
private final Set<String> implClassNamesToLoad = new HashSet<>();

private ServiceLoadingFromExtensions(ExtensionsConfig extensionsConfig, Class<T> serviceClass)
{
this(extensionsConfig, serviceClass, false);
}

private ServiceLoadingFromExtensions(
ExtensionsConfig extensionsConfig,
Class<T> serviceClass,
boolean onlyLoadFromClassLoader)
{
this.extensionsConfig = extensionsConfig;
this.serviceClass = serviceClass;
if (extensionsConfig.searchCurrentClassloader()) {
addAllFromCurrentClassLoader();
}
addAllFromFileSystem();
if (!onlyLoadFromClassLoader) {
addAllFromFileSystem();
}
}

private void addAllFromCurrentClassLoader()
Expand Down Expand Up @@ -361,7 +394,10 @@ public boolean accept(File dir, String name)
}
}

public static Injector makeInjectorWithModules(final Injector baseInjector, Iterable<? extends Module> modules)
public static Injector makeInjectorWithModules(
final Injector baseInjector,
Iterable<? extends Module> modules,
boolean onlyLoadExtensionFromClassLoader)
{
final ModuleList defaultModules = new ModuleList(baseInjector);
defaultModules.addModules(
Expand Down Expand Up @@ -412,13 +448,18 @@ public static Injector makeInjectorWithModules(final Injector baseInjector, Iter

ModuleList extensionModules = new ModuleList(baseInjector);
final ExtensionsConfig config = baseInjector.getInstance(ExtensionsConfig.class);
for (DruidModule module : Initialization.getFromExtensions(config, DruidModule.class)) {
for (DruidModule module : Initialization.getFromExtensions(config, DruidModule.class, onlyLoadExtensionFromClassLoader)) {
extensionModules.addModule(module);
}

return Guice.createInjector(Modules.override(intermediateModules).with(extensionModules.getModules()));
}

public static Injector makeInjectorWithModules(final Injector baseInjector, Iterable<? extends Module> modules)
{
return makeInjectorWithModules(baseInjector, modules, false);
}

private static class ModuleList
{
private final Injector baseInjector;
Expand Down