From ccdbc746c1ff2690f94f0b6f15ed1a7b041a1a19 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 10 Jun 2024 16:29:40 -0400 Subject: [PATCH 1/6] SQL syntax error should target USER persona --- .../druid/sql/calcite/planner/DruidPlanner.java | 8 +------- .../druid/sql/calcite/planner/QueryHandler.java | 11 +---------- .../druid/sql/calcite/BaseCalciteQueryTest.java | 6 +----- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 4b697a0d5dfa..cf1d22eb39b4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -380,13 +380,7 @@ public static DruidException translateException(Exception e) } } - return DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.UNCATEGORIZED) - .build( - inner, - "Unable to parse the SQL, unrecognized error from calcite: [%s]", - inner.getMessage() - ); + return InvalidSqlInput.exception(inner.getMessage()); } catch (RelOptPlanner.CannotPlanException inner) { return DruidException.forPersona(DruidException.Persona.USER) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 9f15d3822866..2d292621b57d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -684,16 +684,7 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e .ofCategory(DruidException.Category.UNSUPPORTED) .build(exception, "Unhandled Query Planning Failure, see broker logs for details"); } else { - // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. - // For this reason, we consider these as targetting a more expert persona, i.e. the admin instead of the actual - // user. - throw DruidException.forPersona(DruidException.Persona.ADMIN) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - exception, - "Query could not be planned. A possible reason is [%s]", - errorMessage - ); + throw InvalidSqlInput.exception("Query could not be planned. A possible reason is [%s]", errorMessage); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index dfee7d0e3a22..9c34c89bd8fd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -664,11 +664,7 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St private DruidExceptionMatcher buildUnplannableExceptionMatcher() { - if (testBuilder().isDecoupledMode()) { - return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); - } else { - return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general"); - } + return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); } /** From abe109262edf128e67f6373b96dc62ede370e3ea Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 11 Jun 2024 14:34:12 -0400 Subject: [PATCH 2/6] * revert change to queryHandler and related tests, based on review comments --- .../druid/sql/calcite/planner/QueryHandler.java | 11 ++++++++++- .../druid/sql/calcite/BaseCalciteQueryTest.java | 6 +++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 2d292621b57d..9f15d3822866 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -684,7 +684,16 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e .ofCategory(DruidException.Category.UNSUPPORTED) .build(exception, "Unhandled Query Planning Failure, see broker logs for details"); } else { - throw InvalidSqlInput.exception("Query could not be planned. A possible reason is [%s]", errorMessage); + // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. + // For this reason, we consider these as targetting a more expert persona, i.e. the admin instead of the actual + // user. + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + exception, + "Query could not be planned. A possible reason is [%s]", + errorMessage + ); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 9c34c89bd8fd..dfee7d0e3a22 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -664,7 +664,11 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St private DruidExceptionMatcher buildUnplannableExceptionMatcher() { - return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); + if (testBuilder().isDecoupledMode()) { + return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); + } else { + return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general"); + } } /** From 5d730475dca1c90cc2fd4abc15d890a1d812acb1 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 11 Jun 2024 15:58:16 -0400 Subject: [PATCH 3/6] * add test --- .../druid/sql/calcite/CalciteInsertDmlTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index bb9c03aa3c88..b40a4c87c3ab 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1626,6 +1626,18 @@ public void testInsertWithInvalidSelectStatement() .verify(); } + @Test + public void testInsertWithLongIdentifer() + { + // This test fails because an identifer is specified of length 200, which exceeds the length limit of 128 + // characters. + String longIdentifer = new String(new char[200]).replace('\0', 'a'); + testIngestionQuery() + .sql(StringUtils.format("INSERT INTO t SELECT %s FROM foo PARTITIONED BY ALL", longIdentifer)) // count is a keyword + .expectValidationError(invalidSqlContains(StringUtils.format("Length of identifier '%s' must be less than or equal to 128 characters", longIdentifer))) + .verify(); + } + @Test public void testInsertWithUnnamedColumnInSelectStatement() { From 2b4c87fa1c228babd54a92a18aa1b5948d830ed0 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 18 Nov 2024 13:43:05 -0500 Subject: [PATCH 4/6] * add `ingest/notices/queueSize` and `ingest/pause/time` to statsd emitter --- .../src/main/resources/defaultMetricDimensions.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 91a74707e4f0..a83aeb1bc7e1 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -66,6 +66,9 @@ "ingest/kinesis/partitionLag/time" : { "dimensions" : ["dataSource", "stream", "partition"], "type" : "gauge" }, "ingest/notices/time" : { "dimensions" : ["dataSource", "noticeType"], "type" : "timer" }, + "ingest/notices/queueSize" : { "dimensions" : ["dataSource"], "type" : "gauge" }, + + "ingest/pause/time" : { "dimensions" : ["dataSource", "taskId"], "type" : "timer" }, "task/success/count" : { "dimensions" : ["dataSource"], "type" : "count" }, "task/failed/count" : { "dimensions" : ["dataSource"], "type" : "count" }, From cfb02a2813b24243ea0703def7a0dddd948bee00 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 18 Nov 2024 15:01:06 -0500 Subject: [PATCH 5/6] * add taskStatus dimension to `service/heartbeat` metric --- docs/operations/metrics.md | 6 +++--- .../src/main/resources/defaultMetricDimensions.json | 2 +- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 6 ++++++ .../org/apache/druid/indexing/common/task/Task.java | 11 +++++++++++ .../seekablestream/SeekableStreamIndexTask.java | 7 +++++++ .../druid/indexing/common/task/AbstractTaskTest.java | 7 +++++++ .../supervisor/SeekableStreamSupervisorStateTest.java | 2 ++ .../src/main/java/org/apache/druid/cli/CliPeon.java | 3 +++ 8 files changed, 40 insertions(+), 4 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 4df9e7987ccc..2bf49062cfe4 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -391,9 +391,9 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon ### Service Health -|Metric|Description|Dimensions|Normal value| -|------|-----------|----------|------------| -| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `dataSource`, `tags` on the Peon |1| +|Metric|Description| Dimensions |Normal value| +|------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------| +| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `taskStatus`, `dataSource`, `tags` on the Peon |1| ### Historical diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index a83aeb1bc7e1..48bc72505fe5 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -190,7 +190,7 @@ "namespace/cache/numEntries" : { "dimensions" : [], "type" : "gauge" }, "namespace/cache/heapSizeInBytes" : { "dimensions" : [], "type" : "gauge" }, - "service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" }, + "service/heartbeat" : { "dimensions" : ["leader", "workerVersion", "category", "status", "taskId", "groupId", "dataSource", "taskStatus" ], "type" : "count" }, "killTask/availableSlot/count" : { "dimensions" : [], "type" : "count" }, "killTask/maxSlot/count" : { "dimensions" : [], "type" : "count" }, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index db8db1fdb38c..0ea5465920dd 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2232,6 +2232,8 @@ public void testRunWithPauseAndResume() throws Exception ) ); + Assert.assertEquals(Status.NOT_STARTED.toString(), task.getStatus()); + final ListenableFuture future = runTask(task); // Insert some data, but not enough for the task to finish @@ -2242,6 +2244,8 @@ public void testRunWithPauseAndResume() throws Exception } Assert.assertEquals(2, countEvents(task)); Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + Assert.assertEquals(Status.READING.toString(), task.getStatus()); + Map currentOffsets = OBJECT_MAPPER.readValue( task.getRunner().pause().getEntity().toString(), @@ -2250,6 +2254,7 @@ public void testRunWithPauseAndResume() throws Exception } ); Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus()); + Assert.assertEquals(Status.PAUSED.toString(), task.getStatus()); // Insert remaining data insertData(Iterables.skip(records, 4)); @@ -2267,6 +2272,7 @@ public void testRunWithPauseAndResume() throws Exception Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); + Assert.assertEquals(Status.PUBLISHING.toString(), task.getStatus()); verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3)); // Check published metadata and segments in deep storage diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cacdc47f520a..cb653f79e7cc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -48,6 +48,8 @@ import org.apache.druid.server.security.ResourceType; import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.util.Map; import java.util.Optional; import java.util.Set; @@ -313,6 +315,15 @@ default TaskIdentifier getMetadata() return new TaskIdentifier(this.getId(), this.getGroupId(), this.getType()); } + /** + * @return The status of the task. Note: this interface method is unstable at this time. + */ + @Nullable + default String getStatus() + { + return null; + } + static TaskInfo toTaskIdentifierInfo(TaskInfo taskInfo) { return new TaskInfo<>( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 41cd084cd960..e1c29a3e384a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -180,6 +180,13 @@ public QueryRunner getQueryRunner(Query query) return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext); } + @Nullable + @Override + public String getStatus() + { + return (getRunner() != null && getRunner().getStatus() != null) ? getRunner().getStatus().toString() : null; + } + public Appenderator newAppenderator( TaskToolbox toolbox, SegmentGenerationMetrics metrics, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index ba210fee228a..6a94b74d0b5d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -305,4 +305,11 @@ public void testBatchIOConfigNone() Assert.assertEquals(AbstractTask.IngestionMode.NONE, ingestionMode); } + @Test + public void testGetStatus() + { + AbstractTask task = new NoopTask("myID", null, null, 1, 0, null); + Assert.assertNull(task.getStatus()); + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b97..46ad4ef43f21 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1546,6 +1546,8 @@ public Duration getEmissionDuration() "0" ); + Assert.assertNull(id1.getStatus()); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); Collection workItems = new ArrayList<>(); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 15374625d301..10a3c7034503 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -308,6 +308,9 @@ public Supplier> heartbeatDimensions(Task task) builder.put(DruidMetrics.DATASOURCE, task.getDataSource()); builder.put(DruidMetrics.TASK_TYPE, task.getType()); builder.put(DruidMetrics.GROUP_ID, task.getGroupId()); + if (task.getStatus() != null) { + builder.put(DruidMetrics.TASK_STATUS, task.getStatus()); + } Map tags = task.getContextValue(DruidMetrics.TAGS); if (tags != null && !tags.isEmpty()) { builder.put(DruidMetrics.TAGS, tags); From ea8f8438c8771892e291a014f5ce57f3f3f98599 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 18 Nov 2024 16:44:57 -0500 Subject: [PATCH 6/6] Revert "* add taskStatus dimension to `service/heartbeat` metric" This reverts commit cfb02a2813b24243ea0703def7a0dddd948bee00. --- docs/operations/metrics.md | 6 +++--- .../src/main/resources/defaultMetricDimensions.json | 2 +- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 6 ------ .../org/apache/druid/indexing/common/task/Task.java | 11 ----------- .../seekablestream/SeekableStreamIndexTask.java | 7 ------- .../druid/indexing/common/task/AbstractTaskTest.java | 7 ------- .../supervisor/SeekableStreamSupervisorStateTest.java | 2 -- .../src/main/java/org/apache/druid/cli/CliPeon.java | 3 --- 8 files changed, 4 insertions(+), 40 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 2bf49062cfe4..4df9e7987ccc 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -391,9 +391,9 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon ### Service Health -|Metric|Description| Dimensions |Normal value| -|------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------| -| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `taskStatus`, `dataSource`, `tags` on the Peon |1| +|Metric|Description|Dimensions|Normal value| +|------|-----------|----------|------------| +| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `dataSource`, `tags` on the Peon |1| ### Historical diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 48bc72505fe5..a83aeb1bc7e1 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -190,7 +190,7 @@ "namespace/cache/numEntries" : { "dimensions" : [], "type" : "gauge" }, "namespace/cache/heapSizeInBytes" : { "dimensions" : [], "type" : "gauge" }, - "service/heartbeat" : { "dimensions" : ["leader", "workerVersion", "category", "status", "taskId", "groupId", "dataSource", "taskStatus" ], "type" : "count" }, + "service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" }, "killTask/availableSlot/count" : { "dimensions" : [], "type" : "count" }, "killTask/maxSlot/count" : { "dimensions" : [], "type" : "count" }, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 0ea5465920dd..db8db1fdb38c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2232,8 +2232,6 @@ public void testRunWithPauseAndResume() throws Exception ) ); - Assert.assertEquals(Status.NOT_STARTED.toString(), task.getStatus()); - final ListenableFuture future = runTask(task); // Insert some data, but not enough for the task to finish @@ -2244,8 +2242,6 @@ public void testRunWithPauseAndResume() throws Exception } Assert.assertEquals(2, countEvents(task)); Assert.assertEquals(Status.READING, task.getRunner().getStatus()); - Assert.assertEquals(Status.READING.toString(), task.getStatus()); - Map currentOffsets = OBJECT_MAPPER.readValue( task.getRunner().pause().getEntity().toString(), @@ -2254,7 +2250,6 @@ public void testRunWithPauseAndResume() throws Exception } ); Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus()); - Assert.assertEquals(Status.PAUSED.toString(), task.getStatus()); // Insert remaining data insertData(Iterables.skip(records, 4)); @@ -2272,7 +2267,6 @@ public void testRunWithPauseAndResume() throws Exception Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); - Assert.assertEquals(Status.PUBLISHING.toString(), task.getStatus()); verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3)); // Check published metadata and segments in deep storage diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cb653f79e7cc..cacdc47f520a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -48,8 +48,6 @@ import org.apache.druid.server.security.ResourceType; import javax.annotation.Nonnull; -import javax.annotation.Nullable; - import java.util.Map; import java.util.Optional; import java.util.Set; @@ -315,15 +313,6 @@ default TaskIdentifier getMetadata() return new TaskIdentifier(this.getId(), this.getGroupId(), this.getType()); } - /** - * @return The status of the task. Note: this interface method is unstable at this time. - */ - @Nullable - default String getStatus() - { - return null; - } - static TaskInfo toTaskIdentifierInfo(TaskInfo taskInfo) { return new TaskInfo<>( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index e1c29a3e384a..41cd084cd960 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -180,13 +180,6 @@ public QueryRunner getQueryRunner(Query query) return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext); } - @Nullable - @Override - public String getStatus() - { - return (getRunner() != null && getRunner().getStatus() != null) ? getRunner().getStatus().toString() : null; - } - public Appenderator newAppenderator( TaskToolbox toolbox, SegmentGenerationMetrics metrics, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index 6a94b74d0b5d..ba210fee228a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -305,11 +305,4 @@ public void testBatchIOConfigNone() Assert.assertEquals(AbstractTask.IngestionMode.NONE, ingestionMode); } - @Test - public void testGetStatus() - { - AbstractTask task = new NoopTask("myID", null, null, 1, 0, null); - Assert.assertNull(task.getStatus()); - } - } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 46ad4ef43f21..af66ce3b8b97 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1546,8 +1546,6 @@ public Duration getEmissionDuration() "0" ); - Assert.assertNull(id1.getStatus()); - final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); Collection workItems = new ArrayList<>(); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 10a3c7034503..15374625d301 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -308,9 +308,6 @@ public Supplier> heartbeatDimensions(Task task) builder.put(DruidMetrics.DATASOURCE, task.getDataSource()); builder.put(DruidMetrics.TASK_TYPE, task.getType()); builder.put(DruidMetrics.GROUP_ID, task.getGroupId()); - if (task.getStatus() != null) { - builder.put(DruidMetrics.TASK_STATUS, task.getStatus()); - } Map tags = task.getContextValue(DruidMetrics.TAGS); if (tags != null && !tags.isEmpty()) { builder.put(DruidMetrics.TAGS, tags);