From 724f22a0a0d3df543286ecf1d627ff6a45d368dd Mon Sep 17 00:00:00 2001 From: Lucas Capistrant Date: Fri, 18 Jul 2025 11:04:12 -0500 Subject: [PATCH] By default dont allow index_hadoop tasks to run on a cluster, forcing operators to acknowledge that they are using a deprecated feature (#18239) * By default dont allow index_hadoop tasks to run on a cluster, forcing operators to acknolwedge that they are using a deprecated feature * update unclear recommendation from log * Fixup codeql warning * fix UT --- docs/configuration/index.md | 1 + .../indexing/common/config/TaskConfig.java | 24 +++++++++-- .../indexing/common/task/HadoopIndexTask.java | 16 ++++++++ .../common/config/TaskConfigBuilder.java | 10 ++++- .../common/task/HadoopIndexTaskTest.java | 41 +++++++++++++++++++ 5 files changed, 87 insertions(+), 5 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 61d9e8e8322e..0976190cf99a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1414,6 +1414,7 @@ Additional Peon configs include: |`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/input-sources.md) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.22.0.|false| |`druid.indexer.task.storeEmptyColumns`|Boolean value for whether or not to store empty columns during ingestion. When set to true, Druid stores every column specified in the [`dimensionsSpec`](../ingestion/ingestion-spec.md#dimensionsspec). If you use the string-based schemaless ingestion and don't specify any dimensions to ingest, you must also set [`includeAllDimensions`](../ingestion/ingestion-spec.md#dimensionsspec) for Druid to store empty columns.

If you set `storeEmptyColumns` to false, Druid SQL queries referencing empty columns will fail. If you intend to leave `storeEmptyColumns` disabled, you should either ingest placeholder data for empty columns or else not query on empty columns.

You can overwrite this configuration by setting `storeEmptyColumns` in the [task context](../ingestion/tasks.md#context-parameters).|true| |`druid.indexer.task.tmpStorageBytesPerTask`|Maximum number of bytes per task to be used to store temporary files on disk. This config is generally intended for internal usage. Attempts to set it are very likely to be overwritten by the TaskRunner that executes the task, so be sure of what you expect to happen before directly adjusting this configuration parameter. The config is documented here primarily to provide an understanding of what it means if/when someone sees that it has been set. A value of -1 disables this limit. |-1| +|`druid.indexer.task.allowHadoopTaskExecution`|Conditional dictating if the cluster allows `index_hadoop` tasks to be executed. `index_hadoop` is deprecated, and defaulting to false will force cluster operators to acknowledge the deprecation and consciously opt in to using index_hadoop with the understanding that it will be removed in the future.|false| |`druid.indexer.server.maxChatRequests`|Maximum number of concurrent requests served by a task's chat handler. Set to 0 to disable limiting.|0| If the Peon is running in remote mode, there must be an Overlord up and running. Peons in remote mode can set the following configurations: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index 31778f9b380d..ad746100f577 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -47,6 +47,7 @@ */ public class TaskConfig implements TaskDirectory { + public static final String ALLOW_HADOOP_TASK_EXECUTION_KEY = "druid.indexer.task.allowHadoopTaskExecution"; private static final Logger log = new Logger(TaskConfig.class); private static final String HADOOP_LIB_VERSIONS = "hadoop.indexer.libs.version"; public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES; @@ -109,6 +110,9 @@ public class TaskConfig implements TaskDirectory @JsonProperty private final long tmpStorageBytesPerTask; + @JsonProperty + private final boolean allowHadoopTaskExecution; + @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -123,7 +127,8 @@ public TaskConfig( @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource, @JsonProperty("storeEmptyColumns") @Nullable Boolean storeEmptyColumns, @JsonProperty("encapsulatedTask") boolean enableTaskLevelLogPush, - @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask + @JsonProperty("tmpStorageBytesPerTask") @Nullable Long tmpStorageBytesPerTask, + @JsonProperty("allowHadoopTaskExecution") boolean allowHadoopTaskExecution ) { this.baseDir = Configs.valueOrDefault(baseDir, System.getProperty("java.io.tmpdir")); @@ -156,6 +161,7 @@ public TaskConfig( this.storeEmptyColumns = Configs.valueOrDefault(storeEmptyColumns, DEFAULT_STORE_EMPTY_COLUMNS); this.tmpStorageBytesPerTask = Configs.valueOrDefault(tmpStorageBytesPerTask, DEFAULT_TMP_STORAGE_BYTES_PER_TASK); + this.allowHadoopTaskExecution = allowHadoopTaskExecution; } private TaskConfig( @@ -171,7 +177,8 @@ private TaskConfig( boolean ignoreTimestampSpecForDruidInputSource, boolean storeEmptyColumns, boolean encapsulatedTask, - long tmpStorageBytesPerTask + long tmpStorageBytesPerTask, + boolean allowHadoopTaskExecution ) { this.baseDir = baseDir; @@ -187,6 +194,7 @@ private TaskConfig( this.storeEmptyColumns = storeEmptyColumns; this.encapsulatedTask = encapsulatedTask; this.tmpStorageBytesPerTask = tmpStorageBytesPerTask; + this.allowHadoopTaskExecution = allowHadoopTaskExecution; } @JsonProperty @@ -297,6 +305,12 @@ public long getTmpStorageBytesPerTask() return tmpStorageBytesPerTask; } + @JsonProperty + public boolean isAllowHadoopTaskExecution() + { + return allowHadoopTaskExecution; + } + private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { @@ -321,7 +335,8 @@ public TaskConfig withBaseTaskDir(File baseTaskDir) ignoreTimestampSpecForDruidInputSource, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + allowHadoopTaskExecution ); } @@ -340,7 +355,8 @@ public TaskConfig withTmpStorageBytesPerTask(long tmpStorageBytesPerTask) ignoreTimestampSpecForDruidInputSource, storeEmptyColumns, encapsulatedTask, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + allowHadoopTaskExecution ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java index 01696af04d0b..c5f794cdbe94 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopIndexTask.java @@ -295,6 +295,22 @@ public TaskStatus runTask(TaskToolbox toolbox) { try { taskConfig = toolbox.getConfig(); + if (!taskConfig.isAllowHadoopTaskExecution()) { + errorMsg = StringUtils.format( + "Hadoop tasks are deprecated and will be removed in a future release. " + + "Currently, they are not allowed to run on this cluster. If you wish to run them despite deprecation, " + + "please set [%s] to true.", + TaskConfig.ALLOW_HADOOP_TASK_EXECUTION_KEY + ); + log.error(errorMsg); + toolbox.getTaskReportFileWriter().write(getId(), getTaskCompletionReports()); + return TaskStatus.failure(getId(), errorMsg); + } + log.warn("Running deprecated index_hadoop task [%s]. " + + "Hadoop batch indexing is deprecated and will be removed in a future release. " + + "Please plan your migration to one of Druid's supported indexing patterns.", + getId() + ); if (chatHandlerProvider.isPresent()) { log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); chatHandlerProvider.get().register(getId(), this, false); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java index 1213b5525146..5dac84748951 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/config/TaskConfigBuilder.java @@ -39,6 +39,7 @@ public class TaskConfigBuilder private Boolean storeEmptyColumns; private boolean enableTaskLevelLogPush; private Long tmpStorageBytesPerTask; + private boolean allowHadoopTaskExecution; public TaskConfigBuilder setBaseDir(String baseDir) { @@ -118,6 +119,12 @@ public TaskConfigBuilder setTmpStorageBytesPerTask(Long tmpStorageBytesPerTask) return this; } + public TaskConfigBuilder setAllowHadoopTaskExecution(boolean allowHadoopTaskExecution) + { + this.allowHadoopTaskExecution = allowHadoopTaskExecution; + return this; + } + public TaskConfig build() { return new TaskConfig( @@ -133,7 +140,8 @@ public TaskConfig build() ignoreTimestampSpecForDruidInputSource, storeEmptyColumns, enableTaskLevelLogPush, - tmpStorageBytesPerTask + tmpStorageBytesPerTask, + allowHadoopTaskExecution ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java index 91b94b2f3cd5..21389e8e2a9f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopIndexTaskTest.java @@ -25,6 +25,9 @@ import org.apache.druid.indexer.HadoopIOConfig; import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.granularity.UniformGranularitySpec; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.config.TaskConfigBuilder; +import org.apache.druid.indexing.overlord.TestTaskToolboxFactory; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -43,6 +46,44 @@ public class HadoopIndexTaskTest { private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + @Test + public void testHadoopTaskWontRunWithDefaultTaskConfig() + { + final HadoopIndexTask task = new HadoopIndexTask( + null, + new HadoopIngestionSpec( + DataSchema.builder() + .withDataSource("foo") + .withGranularity( + new UniformGranularitySpec( + Granularities.DAY, + null, + ImmutableList.of(Intervals.of("2010-01-01/P1D")) + ) + ) + .withObjectMapper(jsonMapper) + .build(), + new HadoopIOConfig(ImmutableMap.of("paths", "bar"), null, null), + null + ), + null, + null, + "blah", + jsonMapper, + null, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null + ); + + TestTaskToolboxFactory.Builder builder = new TestTaskToolboxFactory.Builder().setConfig(new TaskConfigBuilder().build()); + TaskToolbox toolbox = new TestTaskToolboxFactory(builder).build(task); + + Assert.assertEquals("Hadoop tasks are deprecated and will be removed in a future release. Currently, " + + "they are not allowed to run on this cluster. If you wish to run them despite deprecation, " + + "please set [druid.indexer.task.allowHadoopTaskExecution] to true.", + task.runTask(toolbox).getErrorMsg()); + } + @Test public void testCorrectInputSourceResources() {