From 6823646a72761b68d504652bdd7cacc18d589234 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 14 Jul 2025 01:38:32 -0700 Subject: [PATCH 1/8] indexing-service no longer depends on indexing-hadoop, instead indexing-hadoop depends on indexing-service --- .../tutorial/hadoop/docker/Dockerfile | 6 +- .../MaterializedViewSupervisor.java | 2 +- .../MaterializedViewSupervisorSpec.java | 6 +- .../MaterializedViewSupervisorTest.java | 9 +- indexing-hadoop/pom.xml | 33 +++- .../druid/indexer}/HadoopIndexTask.java | 60 ++++--- .../druid/indexer/HadoopIndexTaskModule.java | 50 ++++++ .../org/apache/druid/indexer}/HadoopTask.java | 18 ++- .../druid/indexer/HadoopTaskConfig.java | 85 ++++++++++ ...rlordActionBasedUsedSegmentsRetriever.java | 2 +- .../resources/hadoop.indexer.libs.version | 1 + .../druid/indexer}/HadoopIndexTaskTest.java | 7 +- .../druid/indexer/HadoopTaskSerdeTest.java | 153 ++++++++++++++++++ .../apache/druid/indexer}/HadoopTaskTest.java | 9 +- indexing-service/pom.xml | 52 ------ .../indexing/common/config/TaskConfig.java | 21 --- .../druid/indexing/common/task/Task.java | 1 - .../indexing/common/TaskToolboxTest.java | 1 - .../common/config/TaskConfigBuilder.java | 8 - .../indexing/common/task/TaskSerdeTest.java | 96 ----------- .../SingleTaskBackgroundRunnerTest.java | 1 - .../indexing/overlord/TaskLifecycleTest.java | 1 - .../SeekableStreamIndexTaskTestBase.java | 1 - .../worker/WorkerTaskManagerTest.java | 1 - .../worker/WorkerTaskMonitorTest.java | 1 - .../hadoop/batch_hadoop_queries.json | 3 +- pom.xml | 6 +- .../java/org/apache/druid/cli/CliIndexer.java | 2 + .../apache/druid/cli/CliMiddleManager.java | 2 + .../org/apache/druid/cli/CliOverlord.java | 2 + .../java/org/apache/druid/cli/CliPeon.java | 2 + .../org/apache/druid/cli/ResetCluster.java | 5 +- 32 files changed, 398 insertions(+), 249 deletions(-) rename {indexing-service/src/main/java/org/apache/druid/indexing/common/task => indexing-hadoop/src/main/java/org/apache/druid/indexer}/HadoopIndexTask.java (94%) create mode 100644 indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTaskModule.java rename {indexing-service/src/main/java/org/apache/druid/indexing/common/task => indexing-hadoop/src/main/java/org/apache/druid/indexer}/HadoopTask.java (95%) create mode 100644 indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTaskConfig.java rename {indexing-service/src/main/java/org/apache/druid/indexing/hadoop => indexing-hadoop/src/main/java/org/apache/druid/indexer}/OverlordActionBasedUsedSegmentsRetriever.java (97%) create mode 100644 indexing-hadoop/src/main/resources/hadoop.indexer.libs.version rename {indexing-service/src/test/java/org/apache/druid/indexing/common/task => indexing-hadoop/src/test/java/org/apache/druid/indexer}/HadoopIndexTaskTest.java (94%) create mode 100644 indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskSerdeTest.java rename {indexing-service/src/test/java/org/apache/druid/indexing/common/task => indexing-hadoop/src/test/java/org/apache/druid/indexer}/HadoopTaskTest.java (94%) diff --git a/examples/quickstart/tutorial/hadoop/docker/Dockerfile b/examples/quickstart/tutorial/hadoop/docker/Dockerfile index 6ac148d412a6..157ccf8e98e8 100644 --- a/examples/quickstart/tutorial/hadoop/docker/Dockerfile +++ b/examples/quickstart/tutorial/hadoop/docker/Dockerfile @@ -52,12 +52,12 @@ RUN rpm --import http://repos.azulsystems.com/RPM-GPG-KEY-azulsystems && \ rpm -ivh zulu-repo-${ZULU_REPO_VER}.noarch.rpm && \ yum -q -y update && \ yum -q -y upgrade && \ - yum -q -y install zulu17-jdk && \ + yum -q -y install zulu11-jdk && \ yum -q -y install nano net-tools telnet less unzip wget && \ yum clean all && \ rm -rf /var/cache/yum zulu-repo_${ZULU_REPO_VER}.noarch.rpm -ENV JAVA_HOME=/usr/lib/jvm/zulu17 +ENV JAVA_HOME=/usr/lib/jvm/zulu11 ENV PATH=$PATH:$JAVA_HOME/bin # hadoop @@ -76,7 +76,7 @@ ENV YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop # in hadoop 3 the example file is nearly empty so we can just append stuff RUN cat << EOT >> $HADOOP_HOME/etc/hadoop/hadoop-env.sh -RUN sed -i '$ a export JAVA_HOME=/usr/lib/jvm/zulu17' $HADOOP_HOME/etc/hadoop/hadoop-env.sh +RUN sed -i '$ a export JAVA_HOME=/usr/lib/jvm/zulu11' $HADOOP_HOME/etc/hadoop/hadoop-env.sh RUN sed -i '$ a export HADOOP_HOME=/usr/local/hadoop' $HADOOP_HOME/etc/hadoop/hadoop-env.sh RUN sed -i '$ a export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop/' $HADOOP_HOME/etc/hadoop/hadoop-env.sh RUN sed -i '$ a export HDFS_NAMENODE_USER=root' $HADOOP_HOME/etc/hadoop/hadoop-env.sh diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 5622556b389d..30b3484c99d7 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -30,7 +30,7 @@ import org.apache.druid.error.DruidException; import org.apache.druid.error.EntryAlreadyExists; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.task.HadoopIndexTask; +import org.apache.druid.indexer.HadoopIndexTask; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.Segments; diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index a757a065832f..427cede55a33 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -30,10 +30,11 @@ import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.HadoopIOConfig; import org.apache.druid.indexer.HadoopIngestionSpec; +import org.apache.druid.indexer.HadoopTaskConfig; import org.apache.druid.indexer.HadoopTuningConfig; import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexer.hadoop.DatasourceIngestionSpec; -import org.apache.druid.indexing.common.task.HadoopIndexTask; +import org.apache.druid.indexer.HadoopIndexTask; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; @@ -245,7 +246,8 @@ public HadoopIndexTask createTask(Interval interval, String version, List + + org.apache.druid + druid-indexing-service + ${project.parent.version} + commons-io @@ -130,6 +135,13 @@ test-jar test + + org.apache.druid + druid-indexing-service + ${project.parent.version} + test-jar + test + org.apache.derby derbyclient @@ -152,7 +164,6 @@ test - hadoop3 @@ -197,6 +208,11 @@ test + + + org.apache.hadoop:hadoop-client-api:${hadoop.compile.version},org.apache.hadoop:hadoop-client-runtime:${hadoop.compile.version} + + @@ -224,6 +240,21 @@ + + + maven-resources-plugin + org.apache.maven.plugins + + ${project.build.outputDirectory} + + + src/main/resources + hadoop.indexer.libs.version + true + + + + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java similarity index 94% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java rename to indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java index 01696af04d0b..bed0851c8f4e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.common.task; +package org.apache.druid.indexer; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; @@ -32,16 +32,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.commons.lang3.BooleanUtils; -import org.apache.druid.indexer.DataSegmentAndIndexZipFilePath; -import org.apache.druid.indexer.HadoopDruidDetermineConfigurationJob; -import org.apache.druid.indexer.HadoopDruidIndexerConfig; -import org.apache.druid.indexer.HadoopDruidIndexerJob; -import org.apache.druid.indexer.HadoopIngestionSpec; -import org.apache.druid.indexer.IngestionState; -import org.apache.druid.indexer.JobHelper; -import org.apache.druid.indexer.TaskMetricsGetter; -import org.apache.druid.indexer.TaskMetricsUtils; -import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexer.granularity.GranularitySpec; import org.apache.druid.indexer.path.SegmentMetadataPublisher; @@ -53,7 +43,8 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; -import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever; +import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.StringUtils; @@ -115,6 +106,9 @@ private static String getTheDataSource(HadoopIngestionSpec spec) @JsonIgnore private final AuthorizerMapper authorizerMapper; + @JsonIgnore + private final HadoopTaskConfig hadoopTaskConfig; + @JsonIgnore private final Optional chatHandlerProvider; @@ -136,6 +130,7 @@ private static String getTheDataSource(HadoopIngestionSpec spec) @JsonIgnore private String errorMsg; + /** * @param spec is used by the HadoopDruidIndexerJob to set up the appropriate parameters * for creating Druid index segments. It may be modified. @@ -156,16 +151,18 @@ public HadoopIndexTask( @JacksonInject ObjectMapper jsonMapper, @JsonProperty("context") Map context, @JacksonInject AuthorizerMapper authorizerMapper, - @JacksonInject ChatHandlerProvider chatHandlerProvider + @JacksonInject ChatHandlerProvider chatHandlerProvider, + @JacksonInject HadoopTaskConfig hadoopTaskConfig ) { super( - getOrMakeId(id, TYPE, getTheDataSource(spec)), + AbstractTask.getOrMakeId(id, TYPE, getTheDataSource(spec)), getTheDataSource(spec), hadoopDependencyCoordinates == null ? (hadoopCoordinates == null ? null : ImmutableList.of(hadoopCoordinates)) : hadoopDependencyCoordinates, - context + context, + hadoopTaskConfig ); this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); @@ -181,6 +178,7 @@ public HadoopIndexTask( this.spec.getIOConfig().getMetadataUpdateSpec() == null, "metadataUpdateSpec must be absent" ); + this.hadoopTaskConfig = hadoopTaskConfig; this.classpathPrefix = classpathPrefix; this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMappper"); @@ -338,8 +336,8 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception try { registerResourceCloserOnAbnormalExit(config -> killHadoopJob()); String hadoopJobIdFile = getHadoopJobIdFileName(); - logExtensionsConfig(); - final ClassLoader loader = buildClassLoader(toolbox); + HadoopTask.logExtensionsConfig(); + final ClassLoader loader = buildClassLoader(); boolean determineIntervals = spec.getDataSchema().getGranularitySpec().inputIntervals().isEmpty(); HadoopIngestionSpec.updateSegmentListIfDatasourcePathSpecIsUsed( @@ -348,15 +346,15 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception new OverlordActionBasedUsedSegmentsRetriever(toolbox) ); - Object determinePartitionsInnerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner", + Object determinePartitionsInnerProcessingRunner = HadoopTask.getForeignClassloaderObject( + "org.apache.druid.indexer.HadoopIndexTask$HadoopDetermineConfigInnerProcessingRunner", loader ); determinePartitionsStatsGetter = new InnerProcessingStatsGetter(determinePartitionsInnerProcessingRunner); String[] determinePartitionsInput = new String[]{ toolbox.getJsonMapper().writeValueAsString(spec), - toolbox.getConfig().getHadoopWorkingPath(), + hadoopTaskConfig.getHadoopWorkingPath(), toolbox.getSegmentPusher().getPathForHadoop(), hadoopJobIdFile }; @@ -418,7 +416,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception lock.assertNotRevoked(); version = lock.getVersion(); } else { - Iterable locks = getTaskLocks(toolbox.getTaskActionClient()); + Iterable locks = AbstractTask.getTaskLocks(toolbox.getTaskActionClient()); final TaskLock myLock = Iterables.getOnlyElement(locks); version = myLock.getVersion(); } @@ -442,8 +440,8 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception log.info("Setting version to: %s", version); - Object innerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner", + Object innerProcessingRunner = HadoopTask.getForeignClassloaderObject( + "org.apache.druid.indexer.HadoopIndexTask$HadoopIndexGeneratorInnerProcessingRunner", loader ); buildSegmentsStatsGetter = new InnerProcessingStatsGetter(innerProcessingRunner); @@ -535,11 +533,11 @@ private void killHadoopJob() try { ClassLoader loader = HadoopTask.buildClassLoader( getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates() + hadoopTaskConfig.getDefaultHadoopCoordinates() ); - Object killMRJobInnerProcessingRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", + Object killMRJobInnerProcessingRunner = HadoopTask.getForeignClassloaderObject( + "org.apache.druid.indexer.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", loader ); @@ -576,7 +574,7 @@ private void renameSegmentIndexFilesJob( final ClassLoader loader = Thread.currentThread().getContextClassLoader(); try { final Class clazz = loader.loadClass( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner" + "org.apache.druid.indexer.HadoopIndexTask$HadoopRenameSegmentIndexFilesRunner" ); Object renameSegmentIndexFilesRunner = clazz.newInstance(); @@ -616,11 +614,11 @@ private void indexerGeneratorCleanupJob( try { ClassLoader loader = HadoopTask.buildClassLoader( getHadoopDependencyCoordinates(), - taskConfig.getDefaultHadoopCoordinates() + hadoopTaskConfig.getDefaultHadoopCoordinates() ); - Object indexerGeneratorCleanupRunner = getForeignClassloaderObject( - "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner", + Object indexerGeneratorCleanupRunner = HadoopTask.getForeignClassloaderObject( + "org.apache.druid.indexer.HadoopIndexTask$HadoopIndexerGeneratorCleanupRunner", loader ); @@ -833,7 +831,7 @@ public String runTask(String[] args) throws Exception // can be injected based on the configuration given in config.getSchema().getIOConfig().getMetadataUpdateSpec() final SegmentMetadataPublisher maybeHandler; if (config.isUpdaterJobSpecSet()) { - maybeHandler = new SegmentMetadataPublisher(INJECTOR.getInstance(IndexerMetadataStorageCoordinator.class)); + maybeHandler = new SegmentMetadataPublisher(HadoopTask.INJECTOR.getInstance(IndexerMetadataStorageCoordinator.class)); } else { maybeHandler = null; } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTaskModule.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTaskModule.java new file mode 100644 index 000000000000..3f48ddf3ab75 --- /dev/null +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTaskModule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.inject.Binder; +import org.apache.druid.guice.JsonConfigProvider; +import org.apache.druid.initialization.DruidModule; + +import java.util.Collections; +import java.util.List; + +public class HadoopIndexTaskModule implements DruidModule +{ + @Override + public void configure(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.indexer.task", HadoopTaskConfig.class); + } + + @Override + public List getJacksonModules() + { + return Collections.singletonList( + new SimpleModule(getClass().getSimpleName()) + .registerSubtypes( + new NamedType(HadoopIndexTask.class, HadoopIndexTask.TYPE) + ) + ); + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTask.java similarity index 95% rename from indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java rename to indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTask.java index a239025ee05d..2a77379779c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopTask.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTask.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.common.task; +package org.apache.druid.indexer; import com.google.common.base.Joiner; import com.google.common.base.Predicate; @@ -28,13 +28,13 @@ import org.apache.druid.guice.ExtensionsConfig; import org.apache.druid.guice.ExtensionsLoader; import org.apache.druid.guice.StartupInjectorBuilder; -import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; +import org.apache.druid.indexing.common.task.Initialization; import org.apache.druid.initialization.ServerInjectorBuilder; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; - import java.io.File; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -59,16 +59,19 @@ public abstract class HadoopTask extends AbstractBatchIndexTask private static final ExtensionsLoader EXTENSIONS_LOADER = ExtensionsLoader.instance(INJECTOR); private final List hadoopDependencyCoordinates; + private final HadoopTaskConfig hadoopTaskConfig; protected HadoopTask( String id, String dataSource, List hadoopDependencyCoordinates, - Map context + Map context, + HadoopTaskConfig hadoopTaskConfig ) { super(id, dataSource, context, IngestionMode.HADOOP); this.hadoopDependencyCoordinates = hadoopDependencyCoordinates; + this.hadoopTaskConfig = hadoopTaskConfig; } public List getHadoopDependencyCoordinates() @@ -79,7 +82,7 @@ public List getHadoopDependencyCoordinates() // This could stand to have a more robust detection methodology. // Right now it just looks for `druid.*\.jar` // This is only used for classpath isolation in the runTask isolation stuff, so it shooouuullldddd be ok. - /** {@link #buildClassLoader(TaskToolbox)} has outdated javadocs referencing this field, TODO update */ + /** {@link #buildClassLoader()} has outdated javadocs referencing this field, TODO update */ @SuppressWarnings("unused") protected static final Predicate IS_DRUID_URL = new Predicate<>() { @@ -126,12 +129,11 @@ public boolean apply(@Nullable URL input) * sanity in a ClassLoader where all jars (which are isolated by extension ClassLoaders in the Druid framework) are * jumbled together into one ClassLoader for Hadoop and Hadoop-like tasks (Spark for example). * - * @param toolbox The toolbox to pull the default coordinates from if not present in the task * @return An isolated URLClassLoader not tied by parent chain to the ApplicationClassLoader */ - protected ClassLoader buildClassLoader(final TaskToolbox toolbox) + protected ClassLoader buildClassLoader() { - return buildClassLoader(hadoopDependencyCoordinates, toolbox.getConfig().getDefaultHadoopCoordinates()); + return buildClassLoader(hadoopDependencyCoordinates, hadoopTaskConfig.getDefaultHadoopCoordinates()); } public static ClassLoader buildClassLoader(final List hadoopDependencyCoordinates, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTaskConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTaskConfig.java new file mode 100644 index 000000000000..2181b2c3a26a --- /dev/null +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTaskConfig.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.druid.common.config.Configs; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.java.util.common.ISE; + +import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class HadoopTaskConfig +{ + private static final String HADOOP_LIB_VERSIONS = "hadoop.indexer.libs.version"; + public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES; + + static { + try { + DEFAULT_DEFAULT_HADOOP_COORDINATES = + ImmutableList.copyOf(Lists.newArrayList(IOUtils.toString( + TaskConfig.class.getResourceAsStream("/" + HADOOP_LIB_VERSIONS), + StandardCharsets.UTF_8 + ).split(","))); + + } + catch (Exception e) { + throw new ISE(e, "Unable to read file %s from classpath ", HADOOP_LIB_VERSIONS); + } + } + + @JsonProperty + private final String hadoopWorkingPath; + + @JsonProperty + private final List defaultHadoopCoordinates; + + @JsonCreator + public HadoopTaskConfig( + @JsonProperty("hadoopWorkingPath") @Nullable String hadoopWorkingPath, + @JsonProperty("defaultHadoopCoordinates") @Nullable List defaultHadoopCoordinates + ) + { + this.hadoopWorkingPath = Configs.valueOrDefault(hadoopWorkingPath, "/tmp/druid-indexing"); + this.defaultHadoopCoordinates = Configs.valueOrDefault( + defaultHadoopCoordinates, + DEFAULT_DEFAULT_HADOOP_COORDINATES + ); + } + + + @JsonProperty + public String getHadoopWorkingPath() + { + return hadoopWorkingPath; + } + + @JsonProperty + public List getDefaultHadoopCoordinates() + { + return defaultHadoopCoordinates; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/OverlordActionBasedUsedSegmentsRetriever.java similarity index 97% rename from indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java rename to indexing-hadoop/src/main/java/org/apache/druid/indexer/OverlordActionBasedUsedSegmentsRetriever.java index e6e9ad7170b4..23eb74e19558 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/hadoop/OverlordActionBasedUsedSegmentsRetriever.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/OverlordActionBasedUsedSegmentsRetriever.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.druid.indexing.hadoop; +package org.apache.druid.indexer; import com.google.common.base.Preconditions; import com.google.inject.Inject; diff --git a/indexing-hadoop/src/main/resources/hadoop.indexer.libs.version b/indexing-hadoop/src/main/resources/hadoop.indexer.libs.version new file mode 100644 index 000000000000..61e072272ac0 --- /dev/null +++ b/indexing-hadoop/src/main/resources/hadoop.indexer.libs.version @@ -0,0 +1 @@ +${hadoop-task-libs} \ No newline at end of file diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIndexTaskTest.java similarity index 94% rename from indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java rename to indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIndexTaskTest.java index 91b94b2f3cd5..d3ecea88de87 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopIndexTaskTest.java @@ -17,13 +17,11 @@ * under the License. */ -package org.apache.druid.indexing.common.task; +package org.apache.druid.indexer; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.indexer.HadoopIOConfig; -import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; @@ -69,7 +67,8 @@ public void testCorrectInputSourceResources() jsonMapper, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null + null, + new HadoopTaskConfig(null, null) ); Assert.assertEquals( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskSerdeTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskSerdeTest.java new file mode 100644 index 000000000000..76abc4d8b67c --- /dev/null +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskSerdeTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.indexer.granularity.UniformGranularitySpec; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.security.AuthTestUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class HadoopTaskSerdeTest +{ + private final ObjectMapper jsonMapper; + private final IndexSpec indexSpec = IndexSpec.DEFAULT; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + public HadoopTaskSerdeTest() + { + TestUtils testUtils = new TestUtils(); + jsonMapper = testUtils.getTestObjectMapper(); + jsonMapper.registerSubtypes( + new NamedType(ParallelIndexTuningConfig.class, "index_parallel"), + new NamedType(IndexTask.IndexTuningConfig.class, "index") + ); + } + + @Test + public void testHadoopIndexTaskSerde() throws Exception + { + final HadoopIndexTask task = new HadoopIndexTask( + null, + new HadoopIngestionSpec( + DataSchema.builder() + .withDataSource("foo") + .withGranularity( + new UniformGranularitySpec( + Granularities.DAY, + null, + ImmutableList.of(Intervals.of("2010-01-01/P1D")) + ) + ) + .withObjectMapper(jsonMapper) + .build(), + new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), + null + ), + null, + null, + "blah", + jsonMapper, + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + new HadoopTaskConfig(null, null) + ); + + final String json = jsonMapper.writeValueAsString(task); + + final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals( + task.getSpec().getTuningConfig().getJobProperties(), + task2.getSpec().getTuningConfig().getJobProperties() + ); + Assert.assertEquals("blah", task.getClasspathPrefix()); + Assert.assertEquals("blah", task2.getClasspathPrefix()); + } + + @Test + public void testHadoopIndexTaskWithContextSerde() throws Exception + { + final HadoopIndexTask task = new HadoopIndexTask( + null, + new HadoopIngestionSpec( + DataSchema.builder() + .withDataSource("foo") + .withGranularity( + new UniformGranularitySpec( + Granularities.DAY, + null, ImmutableList.of(Intervals.of("2010-01-01/P1D")) + ) + ) + .withObjectMapper(jsonMapper) + .build(), + new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), + null + ), + null, + null, + "blah", + jsonMapper, + ImmutableMap.of("userid", 12345, "username", "bob"), + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + new HadoopTaskConfig(null, null) + ); + + final String json = jsonMapper.writeValueAsString(task); + + final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals( + task.getSpec().getTuningConfig().getJobProperties(), + task2.getSpec().getTuningConfig().getJobProperties() + ); + Assert.assertEquals("blah", task.getClasspathPrefix()); + Assert.assertEquals("blah", task2.getClasspathPrefix()); + Assert.assertEquals(ImmutableMap.of("userid", 12345, "username", "bob"), task2.getContext()); + Assert.assertEquals(ImmutableMap.of("userid", 12345, "username", "bob"), task2.getSpec().getContext()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java similarity index 94% rename from indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java rename to indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java index 741208dd0db4..7bf38ff3267c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java @@ -17,12 +17,10 @@ * under the License. */ -package org.apache.druid.indexing.common.task; +package org.apache.druid.indexer; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import org.apache.druid.indexer.HadoopDruidIndexerConfig; -import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; @@ -54,7 +52,8 @@ public void testBuildClassLoader() throws Exception "taskId", "dataSource", ImmutableList.of(), - ImmutableMap.of() + ImmutableMap.of(), + new HadoopTaskConfig(null, null) ) { @Override @@ -114,7 +113,7 @@ public TaskStatus runTask(TaskToolbox toolbox) ).once(); EasyMock.replay(toolbox); - final ClassLoader classLoader = task.buildClassLoader(toolbox); + final ClassLoader classLoader = task.buildClassLoader(); assertClassLoaderIsSingular(classLoader); final Class hadoopClazz = Class.forName("org.apache.hadoop.fs.FSDataInputStream", false, classLoader); diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 7667188a6378..f0da679d4a45 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -43,11 +43,6 @@ druid-server ${project.parent.version} - - org.apache.druid - druid-indexing-hadoop - ${project.parent.version} - io.dropwizard.metrics metrics-core @@ -274,28 +269,6 @@ - - - hadoop3 - - true - - - - org.apache.hadoop - hadoop-client-api - ${hadoop.compile.version} - provided - - - - - org.apache.hadoop:hadoop-client-api:${hadoop.compile.version},org.apache.hadoop:hadoop-client-runtime:${hadoop.compile.version} - - - - - @@ -309,31 +282,6 @@ - - maven-resources-plugin - org.apache.maven.plugins - - ${project.build.outputDirectory} - - - src/main/resources - hadoop.indexer.libs.version - true - - - - - - org.jacoco - jacoco-maven-plugin - - - - org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientSyncImpl.class - - - diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 31778f9b380d..4bffb43cdec7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -79,9 +79,6 @@ public class TaskConfig implements TaskDirectory @JsonProperty private final String hadoopWorkingPath; - @JsonProperty - private final int defaultRowFlushBoundary; - @JsonProperty private final List defaultHadoopCoordinates; @@ -114,7 +111,6 @@ public TaskConfig( @JsonProperty("baseDir") String baseDir, @JsonProperty("baseTaskDir") String baseTaskDir, @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, - @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary, @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates, @JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart, @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @@ -130,7 +126,6 @@ public TaskConfig( this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task")); // This is usually on HDFS or similar, so we can't use java.io.tmpdir this.hadoopWorkingPath = Configs.valueOrDefault(hadoopWorkingPath, "/tmp/druid-indexing"); - this.defaultRowFlushBoundary = Configs.valueOrDefault(defaultRowFlushBoundary, 75000); this.defaultHadoopCoordinates = Configs.valueOrDefault( defaultHadoopCoordinates, DEFAULT_DEFAULT_HADOOP_COORDINATES @@ -162,7 +157,6 @@ private TaskConfig( String baseDir, File baseTaskDir, String hadoopWorkingPath, - int defaultRowFlushBoundary, List defaultHadoopCoordinates, boolean restoreTasksOnRestart, Period gracefulShutdownTimeout, @@ -177,7 +171,6 @@ private TaskConfig( this.baseDir = baseDir; this.baseTaskDir = baseTaskDir; this.hadoopWorkingPath = hadoopWorkingPath; - this.defaultRowFlushBoundary = defaultRowFlushBoundary; this.defaultHadoopCoordinates = defaultHadoopCoordinates; this.restoreTasksOnRestart = restoreTasksOnRestart; this.gracefulShutdownTimeout = gracefulShutdownTimeout; @@ -231,18 +224,6 @@ public File getTaskLockFile(String taskId) return new File(getTaskDir(taskId), "lock"); } - @JsonProperty - public String getHadoopWorkingPath() - { - return hadoopWorkingPath; - } - - @JsonProperty - public int getDefaultRowFlushBoundary() - { - return defaultRowFlushBoundary; - } - @JsonProperty public List getDefaultHadoopCoordinates() { @@ -312,7 +293,6 @@ public TaskConfig withBaseTaskDir(File baseTaskDir) baseDir, baseTaskDir, hadoopWorkingPath, - defaultRowFlushBoundary, defaultHadoopCoordinates, restoreTasksOnRestart, gracefulShutdownTimeout, @@ -331,7 +311,6 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask) baseDir, baseTaskDir, hadoopWorkingPath, - defaultRowFlushBoundary, defaultHadoopCoordinates, restoreTasksOnRestart, gracefulShutdownTimeout, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cacdc47f520a..3cce9d8a6a54 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -80,7 +80,6 @@ @Type(name = PartialRangeSegmentGenerateTask.TYPE, value = PartialRangeSegmentGenerateTask.class), @Type(name = PartialDimensionDistributionTask.TYPE, value = PartialDimensionDistributionTask.class), @Type(name = PartialGenericSegmentMergeTask.TYPE, value = PartialGenericSegmentMergeTask.class), - @Type(name = HadoopIndexTask.TYPE, value = HadoopIndexTask.class), @Type(name = NoopTask.TYPE, value = NoopTask.class), @Type(name = CompactionTask.TYPE, value = CompactionTask.class) }) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index 2986171325f3..f66c399806b2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -116,7 +116,6 @@ public void setUp() throws IOException TaskConfig taskConfig = new TaskConfigBuilder() .setBaseDir(temporaryFolder.newFile().toString()) - .setDefaultRowFlushBoundary(50000) .build(); taskToolbox = new TaskToolboxFactory( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index 1213b5525146..403334e7ec3d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -29,7 +29,6 @@ public class TaskConfigBuilder private String baseDir; private String baseTaskDir; private String hadoopWorkingPath; - private Integer defaultRowFlushBoundary; private List defaultHadoopCoordinates; private boolean restoreTasksOnRestart; private Period gracefulShutdownTimeout; @@ -58,12 +57,6 @@ public TaskConfigBuilder setHadoopWorkingPath(String hadoopWorkingPath) return this; } - public TaskConfigBuilder setDefaultRowFlushBoundary(Integer defaultRowFlushBoundary) - { - this.defaultRowFlushBoundary = defaultRowFlushBoundary; - return this; - } - public TaskConfigBuilder setDefaultHadoopCoordinates(List defaultHadoopCoordinates) { this.defaultHadoopCoordinates = defaultHadoopCoordinates; @@ -124,7 +117,6 @@ public TaskConfig build() baseDir, baseTaskDir, hadoopWorkingPath, - defaultRowFlushBoundary, defaultHadoopCoordinates, restoreTasksOnRestart, gracefulShutdownTimeout, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 651c1ba60087..5e4b8197a35c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -27,8 +27,6 @@ import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.NoopInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; -import org.apache.druid.indexer.HadoopIOConfig; -import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -42,7 +40,6 @@ import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.server.security.AuthTestUtils; import org.hamcrest.CoreMatchers; import org.joda.time.Period; import org.junit.Assert; @@ -406,97 +403,4 @@ public void testMoveTaskSerde() throws Exception Assert.assertEquals(task.getInterval(), task2.getInterval()); Assert.assertEquals(task.getTargetLoadSpec(), task2.getTargetLoadSpec()); } - - @Test - public void testHadoopIndexTaskSerde() throws Exception - { - final HadoopIndexTask task = new HadoopIndexTask( - null, - new HadoopIngestionSpec( - DataSchema.builder() - .withDataSource("foo") - .withGranularity( - new UniformGranularitySpec( - Granularities.DAY, - null, - ImmutableList.of(Intervals.of("2010-01-01/P1D")) - ) - ) - .withObjectMapper(jsonMapper) - .build(), - new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), - null - ), - null, - null, - "blah", - jsonMapper, - null, - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null - ); - - final String json = jsonMapper.writeValueAsString(task); - - final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class); - - Assert.assertEquals("foo", task.getDataSource()); - - Assert.assertEquals(task.getId(), task2.getId()); - Assert.assertEquals(task.getGroupId(), task2.getGroupId()); - Assert.assertEquals(task.getDataSource(), task2.getDataSource()); - Assert.assertEquals( - task.getSpec().getTuningConfig().getJobProperties(), - task2.getSpec().getTuningConfig().getJobProperties() - ); - Assert.assertEquals("blah", task.getClasspathPrefix()); - Assert.assertEquals("blah", task2.getClasspathPrefix()); - } - - @Test - public void testHadoopIndexTaskWithContextSerde() throws Exception - { - final HadoopIndexTask task = new HadoopIndexTask( - null, - new HadoopIngestionSpec( - DataSchema.builder() - .withDataSource("foo") - .withGranularity( - new UniformGranularitySpec( - Granularities.DAY, - null, ImmutableList.of(Intervals.of("2010-01-01/P1D")) - ) - ) - .withObjectMapper(jsonMapper) - .build(), - new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), - null - ), - null, - null, - "blah", - jsonMapper, - ImmutableMap.of("userid", 12345, "username", "bob"), - AuthTestUtils.TEST_AUTHORIZER_MAPPER, - null - ); - - final String json = jsonMapper.writeValueAsString(task); - - final HadoopIndexTask task2 = (HadoopIndexTask) jsonMapper.readValue(json, Task.class); - - Assert.assertEquals("foo", task.getDataSource()); - - Assert.assertEquals(task.getId(), task2.getId()); - Assert.assertEquals(task.getGroupId(), task2.getGroupId()); - Assert.assertEquals(task.getDataSource(), task2.getDataSource()); - Assert.assertEquals( - task.getSpec().getTuningConfig().getJobProperties(), - task2.getSpec().getTuningConfig().getJobProperties() - ); - Assert.assertEquals("blah", task.getClasspathPrefix()); - Assert.assertEquals("blah", task2.getClasspathPrefix()); - Assert.assertEquals(ImmutableMap.of("userid", 12345, "username", "bob"), task2.getContext()); - Assert.assertEquals(ImmutableMap.of("userid", 12345, "username", "bob"), task2.getSpec().getContext()); - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 0e6fce4517a1..70122b6ab9e6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -95,7 +95,6 @@ public void setup() throws IOException final DruidNode node = new DruidNode("testServer", "testHost", false, 1000, null, true, false); final TaskConfig taskConfig = new TaskConfigBuilder() .setBaseDir(temporaryFolder.newFile().toString()) - .setDefaultRowFlushBoundary(50000) .setRestoreTasksOnRestart(true) .build(); final ServiceEmitter emitter = new NoopServiceEmitter(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index d177ab555c98..ff6b5329c692 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -562,7 +562,6 @@ private TaskToolboxFactory setUpTaskToolboxFactory( ); taskConfig = new TaskConfigBuilder() .setBaseDir(temporaryFolder.newFolder().toString()) - .setDefaultRowFlushBoundary(50000) .setTmpStorageBytesPerTask(-1L) .build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 8090f45dd0c7..19ff6ec4d0b5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -571,7 +571,6 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b new TaskConfigBuilder() .setBaseDir(new File(directory, "baseDir").getPath()) .setBaseTaskDir(new File(directory, "baseTaskDir").getPath()) - .setDefaultRowFlushBoundary(50000) .setRestoreTasksOnRestart(true) .build(); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 0a47f687f3c8..4eff8a5579e1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -116,7 +116,6 @@ private WorkerTaskManager createWorkerTaskManager() { TaskConfig taskConfig = new TaskConfigBuilder() .setBaseDir(FileUtils.createTempDir().toString()) - .setDefaultRowFlushBoundary(0) .setRestoreTasksOnRestart(restoreTasksOnRestart) .build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index da7184b39953..e0b09f7af7cc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -164,7 +164,6 @@ private WorkerTaskMonitor createTaskMonitor() { final TaskConfig taskConfig = new TaskConfigBuilder() .setBaseDir(FileUtils.createTempDir().toString()) - .setDefaultRowFlushBoundary(0) .build(); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/integration-tests/src/test/resources/hadoop/batch_hadoop_queries.json b/integration-tests/src/test/resources/hadoop/batch_hadoop_queries.json index 50397b1d7f44..466abfefbb1f 100644 --- a/integration-tests/src/test/resources/hadoop/batch_hadoop_queries.json +++ b/integration-tests/src/test/resources/hadoop/batch_hadoop_queries.json @@ -12,7 +12,7 @@ "intervals": [ "2014-10-20T00:00:00.000Z/2014-11-03T00:00:00.000Z" ], - "id": "merged", + "id": "%%DATASOURCE%%_2014-11-02T00:00:00.000Z_2014-11-03T00:00:00.000Z_merged", "columns": { "location": { "typeSignature": "STRING", @@ -75,6 +75,7 @@ "aggregators": null, "timestampSpec": null, "queryGranularity": null, + "projections": null, "rollup":null } ] diff --git a/pom.xml b/pom.xml index 082a9a0cb806..160c46430864 100644 --- a/pom.xml +++ b/pom.xml @@ -185,11 +185,11 @@ processing - indexing-hadoop - indexing-service server - sql + indexing-service + indexing-hadoop services + sql integration-tests benchmarks web-console diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 46e243e71bec..5951d3cff584 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -55,6 +55,7 @@ import org.apache.druid.guice.annotations.Parent; import org.apache.druid.guice.annotations.RemoteChatHandler; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.HadoopIndexTaskModule; import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter; import org.apache.druid.indexing.overlord.TaskRunner; @@ -241,6 +242,7 @@ public DataNodeService getDataNodeService(DruidServerConfig serverConfig) new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), new InputSourceModule(), + new HadoopIndexTaskModule(), new QueryablePeonModule(), new CliIndexerServerModule(properties), new LookupModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 0b2a8d02ad8d..e1197c0bfc31 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -48,6 +48,7 @@ import org.apache.druid.guice.MiddleManagerServiceModule; import org.apache.druid.guice.PolyBind; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.HadoopIndexTaskModule; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.TaskStorageDirTracker; import org.apache.druid.indexing.common.config.TaskConfig; @@ -250,6 +251,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), new InputSourceModule(), + new HadoopIndexTaskModule(), new LookupSerdeModule() ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 2787d26840e7..d5125f1e892b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -53,6 +53,7 @@ import org.apache.druid.guice.PolyBind; import org.apache.druid.guice.SupervisorModule; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.indexer.HadoopIndexTaskModule; import org.apache.druid.indexing.common.RetryPolicyFactory; import org.apache.druid.indexing.common.TaskStorageDirTracker; import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory; @@ -456,6 +457,7 @@ private void configureOverlordHelpers(Binder binder) new IndexingServiceTaskLogsModule(), new IndexingServiceTuningConfigModule(), new InputSourceModule(), + new HadoopIndexTaskModule(), new SupervisorModule(), new LookupSerdeModule(), new SamplerModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 9cb939f9b899..05edc01cce58 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -68,6 +68,7 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Parent; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.HadoopIndexTaskModule; import org.apache.druid.indexer.report.SingleFileTaskReportFileWriter; import org.apache.druid.indexer.report.TaskReportFileWriter; import org.apache.druid.indexing.common.RetryPolicyConfig; @@ -375,6 +376,7 @@ public LocalTmpStorageConfig getLocalTmpStorage() new IndexingServiceInputSourceModule(), new IndexingServiceTuningConfigModule(), new InputSourceModule(), + new HadoopIndexTaskModule(), new ChatHandlerServerModule(properties), new LookupModule() ); diff --git a/services/src/main/java/org/apache/druid/cli/ResetCluster.java b/services/src/main/java/org/apache/druid/cli/ResetCluster.java index 16958398a82b..6ac05cb0a21c 100644 --- a/services/src/main/java/org/apache/druid/cli/ResetCluster.java +++ b/services/src/main/java/org/apache/druid/cli/ResetCluster.java @@ -31,8 +31,9 @@ import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.HadoopTaskConfig; import org.apache.druid.indexing.common.config.TaskConfig; -import org.apache.druid.indexing.common.task.HadoopTask; +import org.apache.druid.indexer.HadoopTask; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; @@ -186,7 +187,7 @@ private void deleteIndexerHadoopWorkingDir(Injector injector) log.info("Deleting hadoopWorkingPath."); log.info("==========================================================================="); - TaskConfig taskConfig = injector.getInstance(TaskConfig.class); + HadoopTaskConfig taskConfig = injector.getInstance(HadoopTaskConfig.class); HadoopTask.invokeForeignLoader( "org.apache.druid.indexer.HadoopWorkingDirCleaner", new String[]{ From 7b8a84fdf2ebacd2d898a62cb15a8e5845697b52 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 14 Jul 2025 09:47:41 -0700 Subject: [PATCH 2/8] fixes --- .../MaterializedViewSupervisor.java | 2 +- .../MaterializedViewSupervisorSpec.java | 2 +- .../MaterializedViewSupervisorTest.java | 2 +- .../druid/indexer/HadoopTaskConfig.java | 4 +- .../druid/indexer/HadoopTaskSerdeTest.java | 31 +++++++++++ .../apache/druid/indexer/HadoopTaskTest.java | 4 +- .../indexing/common/config/TaskConfig.java | 52 ------------------- .../common/config/TaskConfigBuilder.java | 16 ------ .../overlord/ForkingTaskRunnerTest.java | 1 - .../apache/druid/cli/CliHadoopIndexer.java | 4 +- .../apache/druid/cli/PullDependencies.java | 4 +- .../org/apache/druid/cli/ResetCluster.java | 2 +- 12 files changed, 44 insertions(+), 80 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 30b3484c99d7..c3bd7bb41bcc 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -29,8 +29,8 @@ import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.error.DruidException; import org.apache.druid.error.EntryAlreadyExists; -import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.HadoopIndexTask; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.Segments; diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 427cede55a33..c835103c4c2c 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -29,12 +29,12 @@ import org.apache.druid.data.input.impl.DimensionSchema; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.indexer.HadoopIOConfig; +import org.apache.druid.indexer.HadoopIndexTask; import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.HadoopTaskConfig; import org.apache.druid.indexer.HadoopTuningConfig; import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexer.hadoop.DatasourceIngestionSpec; -import org.apache.druid.indexer.HadoopIndexTask; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index 316a7a0a86aa..0e225bf92fa5 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -30,11 +30,11 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.error.EntryAlreadyExists; import org.apache.druid.indexer.HadoopIOConfig; +import org.apache.druid.indexer.HadoopIndexTask; import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.HadoopTaskConfig; import org.apache.druid.indexer.HadoopTuningConfig; import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexer.HadoopIndexTask; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueue; diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTaskConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTaskConfig.java index 2181b2c3a26a..e7b505e2f2b6 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTaskConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTaskConfig.java @@ -25,7 +25,6 @@ import com.google.common.collect.Lists; import org.apache.commons.io.IOUtils; import org.apache.druid.common.config.Configs; -import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; @@ -41,7 +40,7 @@ public class HadoopTaskConfig try { DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.copyOf(Lists.newArrayList(IOUtils.toString( - TaskConfig.class.getResourceAsStream("/" + HADOOP_LIB_VERSIONS), + HadoopTaskConfig.class.getResourceAsStream("/" + HADOOP_LIB_VERSIONS), StandardCharsets.UTF_8 ).split(","))); @@ -63,6 +62,7 @@ public HadoopTaskConfig( @JsonProperty("defaultHadoopCoordinates") @Nullable List defaultHadoopCoordinates ) { + // This is usually on HDFS or similar, so we can't use java.io.tmpdir this.hadoopWorkingPath = Configs.valueOrDefault(hadoopWorkingPath, "/tmp/druid-indexing"); this.defaultHadoopCoordinates = Configs.valueOrDefault( defaultHadoopCoordinates, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskSerdeTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskSerdeTest.java index 76abc4d8b67c..8519f05e7e7f 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskSerdeTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskSerdeTest.java @@ -19,20 +19,34 @@ package org.apache.druid.indexer; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.indexer.granularity.UniformGranularitySpec; import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; +import org.apache.druid.rpc.indexing.OverlordClient; +import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.loading.LocalDataSegmentPuller; +import org.apache.druid.segment.realtime.ChatHandlerProvider; +import org.apache.druid.segment.realtime.NoopChatHandlerProvider; +import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthTestUtils; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.timeline.DataSegment; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -54,6 +68,23 @@ public HadoopTaskSerdeTest() new NamedType(ParallelIndexTuningConfig.class, "index_parallel"), new NamedType(IndexTask.IndexTuningConfig.class, "index") ); + jsonMapper.registerModules(new HadoopIndexTaskModule().getJacksonModules()); + jsonMapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class, LookupEnabledTestExprMacroTable.INSTANCE) + .addValue(IndexIO.class, testUtils.getTestIndexIO()) + .addValue(ObjectMapper.class, jsonMapper) + .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider()) + .addValue(AuthConfig.class, new AuthConfig()) + .addValue(AuthorizerMapper.class, null) + .addValue(RowIngestionMetersFactory.class, new DropwizardRowIngestionMetersFactory()) + .addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT) + .addValue(OverlordClient.class, TestUtils.OVERLORD_SERVICE_CLIENT) + .addValue(AuthorizerMapper.class, new AuthorizerMapper(ImmutableMap.of())) + .addValue(AppenderatorsManager.class, TestUtils.APPENDERATORS_MANAGER) + .addValue(LocalDataSegmentPuller.class, new LocalDataSegmentPuller()) + .addValue(HadoopTaskConfig.class, new HadoopTaskConfig(null, null)) + ); } @Test diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java index 7bf38ff3267c..a680da610af1 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java @@ -24,7 +24,6 @@ import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; -import org.apache.druid.indexing.common.config.TaskConfigBuilder; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.timeline.DataSegment; @@ -105,12 +104,15 @@ public TaskStatus runTask(TaskToolbox toolbox) } }; final TaskToolbox toolbox = EasyMock.createStrictMock(TaskToolbox.class); + /* EasyMock.expect(toolbox.getConfig()).andReturn( new TaskConfigBuilder() .setBaseDir(temporaryFolder.newFolder().toString()) .setDefaultHadoopCoordinates(ImmutableList.of("something:hadoop:1")) .build() ).once(); + + */ EasyMock.replay(toolbox); final ClassLoader classLoader = task.buildClassLoader(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 4bffb43cdec7..9c88144259fe 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -21,20 +21,14 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; import org.apache.druid.common.config.Configs; import org.apache.druid.common.utils.IdUtils; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.TaskDirectory; import org.joda.time.Period; import javax.annotation.Nullable; import java.io.File; -import java.nio.charset.StandardCharsets; import java.nio.file.Paths; import java.util.Collections; import java.util.List; @@ -47,24 +41,6 @@ */ public class TaskConfig implements TaskDirectory { - private static final Logger log = new Logger(TaskConfig.class); - private static final String HADOOP_LIB_VERSIONS = "hadoop.indexer.libs.version"; - public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES; - - static { - try { - DEFAULT_DEFAULT_HADOOP_COORDINATES = - ImmutableList.copyOf(Lists.newArrayList(IOUtils.toString( - TaskConfig.class.getResourceAsStream("/" + HADOOP_LIB_VERSIONS), - StandardCharsets.UTF_8 - ).split(","))); - - } - catch (Exception e) { - throw new ISE(e, "Unable to read file %s from classpath ", HADOOP_LIB_VERSIONS); - } - } - private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M"); private static final boolean DEFAULT_STORE_EMPTY_COLUMNS = true; @@ -76,12 +52,6 @@ public class TaskConfig implements TaskDirectory @JsonProperty private final File baseTaskDir; - @JsonProperty - private final String hadoopWorkingPath; - - @JsonProperty - private final List defaultHadoopCoordinates; - @JsonProperty private final boolean restoreTasksOnRestart; @@ -110,8 +80,6 @@ public class TaskConfig implements TaskDirectory public TaskConfig( @JsonProperty("baseDir") String baseDir, @JsonProperty("baseTaskDir") String baseTaskDir, - @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, - @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates, @JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart, @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @JsonProperty("directoryLockTimeout") Period directoryLockTimeout, @@ -124,12 +92,6 @@ public TaskConfig( { this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir")); this.baseTaskDir = new File(defaultDir(baseTaskDir, "persistent/task")); - // This is usually on HDFS or similar, so we can't use java.io.tmpdir - this.hadoopWorkingPath = Configs.valueOrDefault(hadoopWorkingPath, "/tmp/druid-indexing"); - this.defaultHadoopCoordinates = Configs.valueOrDefault( - defaultHadoopCoordinates, - DEFAULT_DEFAULT_HADOOP_COORDINATES - ); this.restoreTasksOnRestart = restoreTasksOnRestart; this.gracefulShutdownTimeout = Configs.valueOrDefault( gracefulShutdownTimeout, @@ -156,8 +118,6 @@ public TaskConfig( private TaskConfig( String baseDir, File baseTaskDir, - String hadoopWorkingPath, - List defaultHadoopCoordinates, boolean restoreTasksOnRestart, Period gracefulShutdownTimeout, Period directoryLockTimeout, @@ -170,8 +130,6 @@ private TaskConfig( { this.baseDir = baseDir; this.baseTaskDir = baseTaskDir; - this.hadoopWorkingPath = hadoopWorkingPath; - this.defaultHadoopCoordinates = defaultHadoopCoordinates; this.restoreTasksOnRestart = restoreTasksOnRestart; this.gracefulShutdownTimeout = gracefulShutdownTimeout; this.directoryLockTimeout = directoryLockTimeout; @@ -224,12 +182,6 @@ public File getTaskLockFile(String taskId) return new File(getTaskDir(taskId), "lock"); } - @JsonProperty - public List getDefaultHadoopCoordinates() - { - return defaultHadoopCoordinates; - } - @JsonProperty public boolean isRestoreTasksOnRestart() { @@ -292,8 +244,6 @@ public TaskConfig withBaseTaskDir(File baseTaskDir) return new TaskConfig( baseDir, baseTaskDir, - hadoopWorkingPath, - defaultHadoopCoordinates, restoreTasksOnRestart, gracefulShutdownTimeout, directoryLockTimeout, @@ -310,8 +260,6 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask) return new TaskConfig( baseDir, baseTaskDir, - hadoopWorkingPath, - defaultHadoopCoordinates, restoreTasksOnRestart, gracefulShutdownTimeout, directoryLockTimeout, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index 403334e7ec3d..ff8f86d8e7ea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -28,8 +28,6 @@ public class TaskConfigBuilder { private String baseDir; private String baseTaskDir; - private String hadoopWorkingPath; - private List defaultHadoopCoordinates; private boolean restoreTasksOnRestart; private Period gracefulShutdownTimeout; private Period directoryLockTimeout; @@ -51,18 +49,6 @@ public TaskConfigBuilder setBaseTaskDir(String baseTaskDir) return this; } - public TaskConfigBuilder setHadoopWorkingPath(String hadoopWorkingPath) - { - this.hadoopWorkingPath = hadoopWorkingPath; - return this; - } - - public TaskConfigBuilder setDefaultHadoopCoordinates(List defaultHadoopCoordinates) - { - this.defaultHadoopCoordinates = defaultHadoopCoordinates; - return this; - } - public TaskConfigBuilder setRestoreTasksOnRestart(boolean restoreTasksOnRestart) { this.restoreTasksOnRestart = restoreTasksOnRestart; @@ -116,8 +102,6 @@ public TaskConfig build() return new TaskConfig( baseDir, baseTaskDir, - hadoopWorkingPath, - defaultHadoopCoordinates, restoreTasksOnRestart, gracefulShutdownTimeout, directoryLockTimeout, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java index 0c4908563633..3664af6fd82c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/ForkingTaskRunnerTest.java @@ -538,7 +538,6 @@ ProcessHolder runTaskProcess(List command, File logFile, TaskLocation ta public static TaskConfigBuilder makeDefaultTaskConfigBuilder() { return new TaskConfigBuilder() - .setDefaultHadoopCoordinates(ImmutableList.of()) .setGracefulShutdownTimeout(new Period("PT0S")) .setDirectoryLockTimeout(new Period("PT10S")) .setShuffleDataLocations(ImmutableList.of()); diff --git a/services/src/main/java/org/apache/druid/cli/CliHadoopIndexer.java b/services/src/main/java/org/apache/druid/cli/CliHadoopIndexer.java index a1a382318112..dcb857083580 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHadoopIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliHadoopIndexer.java @@ -26,7 +26,7 @@ import com.google.common.base.Joiner; import com.google.inject.Inject; import org.apache.druid.guice.ExtensionsLoader; -import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexer.HadoopTaskConfig; import org.apache.druid.indexing.common.task.Initialization; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.utils.JvmUtils; @@ -48,7 +48,7 @@ ) public class CliHadoopIndexer implements Runnable { - private static final List DEFAULT_HADOOP_COORDINATES = TaskConfig.DEFAULT_DEFAULT_HADOOP_COORDINATES; + private static final List DEFAULT_HADOOP_COORDINATES = HadoopTaskConfig.DEFAULT_DEFAULT_HADOOP_COORDINATES; private static final Logger log = new Logger(CliHadoopIndexer.class); diff --git a/services/src/main/java/org/apache/druid/cli/PullDependencies.java b/services/src/main/java/org/apache/druid/cli/PullDependencies.java index f0588d2b372d..e6b18ab7da70 100644 --- a/services/src/main/java/org/apache/druid/cli/PullDependencies.java +++ b/services/src/main/java/org/apache/druid/cli/PullDependencies.java @@ -27,7 +27,7 @@ import com.google.common.collect.SetMultimap; import com.google.inject.Inject; import org.apache.druid.guice.ExtensionsConfig; -import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexer.HadoopTaskConfig; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -294,7 +294,7 @@ public void run() log.info("Finish downloading dependencies for extension coordinates: [%s]", coordinates); if (!noDefaultHadoop && hadoopCoordinates.isEmpty()) { - hadoopCoordinates.addAll(TaskConfig.DEFAULT_DEFAULT_HADOOP_COORDINATES); + hadoopCoordinates.addAll(HadoopTaskConfig.DEFAULT_DEFAULT_HADOOP_COORDINATES); } log.info("Start downloading dependencies for hadoop extension coordinates: [%s]", hadoopCoordinates); diff --git a/services/src/main/java/org/apache/druid/cli/ResetCluster.java b/services/src/main/java/org/apache/druid/cli/ResetCluster.java index 6ac05cb0a21c..6fe6b0743183 100644 --- a/services/src/main/java/org/apache/druid/cli/ResetCluster.java +++ b/services/src/main/java/org/apache/druid/cli/ResetCluster.java @@ -31,9 +31,9 @@ import org.apache.druid.guice.QueryRunnerFactoryModule; import org.apache.druid.guice.QueryableModule; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexer.HadoopTask; import org.apache.druid.indexer.HadoopTaskConfig; import org.apache.druid.indexing.common.config.TaskConfig; -import org.apache.druid.indexer.HadoopTask; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageTablesConfig; From 943e52022dc75f55181cc95bd36ad3ce60d35a3f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 14 Jul 2025 10:05:03 -0700 Subject: [PATCH 3/8] cleanup --- .../java/org/apache/druid/indexer/HadoopIndexTask.java | 2 +- .../java/org/apache/druid/indexer/HadoopTaskTest.java | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java index bed0851c8f4e..14f0912c4bd5 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java @@ -831,7 +831,7 @@ public String runTask(String[] args) throws Exception // can be injected based on the configuration given in config.getSchema().getIOConfig().getMetadataUpdateSpec() final SegmentMetadataPublisher maybeHandler; if (config.isUpdaterJobSpecSet()) { - maybeHandler = new SegmentMetadataPublisher(HadoopTask.INJECTOR.getInstance(IndexerMetadataStorageCoordinator.class)); + maybeHandler = new SegmentMetadataPublisher(INJECTOR.getInstance(IndexerMetadataStorageCoordinator.class)); } else { maybeHandler = null; } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java index a680da610af1..02679387e88d 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTaskTest.java @@ -104,15 +104,6 @@ public TaskStatus runTask(TaskToolbox toolbox) } }; final TaskToolbox toolbox = EasyMock.createStrictMock(TaskToolbox.class); - /* - EasyMock.expect(toolbox.getConfig()).andReturn( - new TaskConfigBuilder() - .setBaseDir(temporaryFolder.newFolder().toString()) - .setDefaultHadoopCoordinates(ImmutableList.of("something:hadoop:1")) - .build() - ).once(); - - */ EasyMock.replay(toolbox); final ClassLoader classLoader = task.buildClassLoader(); From a1424d73456c5f24fa531ee7cd3f0b6a2adb54d2 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 14 Jul 2025 12:36:38 -0700 Subject: [PATCH 4/8] fix it --- indexing-hadoop/pom.xml | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 07b5917c5300..4f465efe1b46 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -240,22 +240,25 @@ - maven-resources-plugin org.apache.maven.plugins - - ${project.build.outputDirectory} - - - src/main/resources - hadoop.indexer.libs.version - true - - - + + + src/main/resources + + hadoop.indexer.libs.version + + true + + + + + src/test/resources + false + + - From 70874aa7204ba605051d2c118712a8bf641fcde8 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 14 Jul 2025 14:06:23 -0700 Subject: [PATCH 5/8] fix declared dependencies --- indexing-hadoop/pom.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 4f465efe1b46..9772bc8b6699 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -94,6 +94,18 @@ org.eclipse.jetty jetty-client + + org.apache.commons + commons-lang3 + + + javax.servlet + javax.servlet-api + + + javax.ws.rs + jsr311-api + From d847488b3aafdb5716eeafa2c690fbc5e3ddbc02 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 15 Jul 2025 00:54:56 -0700 Subject: [PATCH 6/8] suppress warning --- .../src/main/java/org/apache/druid/indexer/HadoopIndexTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java index 14f0912c4bd5..c8e28a93aa6b 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java @@ -81,6 +81,7 @@ import java.util.Set; import java.util.stream.Collectors; +@SuppressWarnings("DataFlowIssue") public class HadoopIndexTask extends HadoopTask implements ChatHandler { public static final String TYPE = "index_hadoop"; From 2be8a1c98b383b18b673d587c6c5eb569f65e644 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 15 Jul 2025 04:37:01 -0700 Subject: [PATCH 7/8] try again --- .../src/main/java/org/apache/druid/indexer/HadoopIndexTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java index c8e28a93aa6b..5960ca5b155c 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java @@ -81,7 +81,7 @@ import java.util.Set; import java.util.stream.Collectors; -@SuppressWarnings("DataFlowIssue") +@SuppressWarnings("NP_NONNULL_PARAM_VIOLATION") public class HadoopIndexTask extends HadoopTask implements ChatHandler { public static final String TYPE = "index_hadoop"; From dfe17f85acc604fb25c9ecc1ceb7f1e88f674245 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 15 Jul 2025 11:52:20 -0700 Subject: [PATCH 8/8] suppress check --- .../main/java/org/apache/druid/indexer/HadoopIndexTask.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java index 5960ca5b155c..6af5cfcfb1c3 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIndexTask.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.commons.lang3.BooleanUtils; +import org.apache.druid.annotations.SuppressFBWarnings; import org.apache.druid.indexer.granularity.ArbitraryGranularitySpec; import org.apache.druid.indexer.granularity.GranularitySpec; import org.apache.druid.indexer.path.SegmentMetadataPublisher; @@ -81,7 +82,7 @@ import java.util.Set; import java.util.stream.Collectors; -@SuppressWarnings("NP_NONNULL_PARAM_VIOLATION") +@SuppressFBWarnings({"NP_NONNULL_PARAM_VIOLATION", "NP_STORE_INTO_NONNULL_FIELD"}) public class HadoopIndexTask extends HadoopTask implements ChatHandler { public static final String TYPE = "index_hadoop";