From ccdbc746c1ff2690f94f0b6f15ed1a7b041a1a19 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 10 Jun 2024 16:29:40 -0400 Subject: [PATCH 01/10] SQL syntax error should target USER persona --- .../druid/sql/calcite/planner/DruidPlanner.java | 8 +------- .../druid/sql/calcite/planner/QueryHandler.java | 11 +---------- .../druid/sql/calcite/BaseCalciteQueryTest.java | 6 +----- 3 files changed, 3 insertions(+), 22 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java index 4b697a0d5dfa..cf1d22eb39b4 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/DruidPlanner.java @@ -380,13 +380,7 @@ public static DruidException translateException(Exception e) } } - return DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.UNCATEGORIZED) - .build( - inner, - "Unable to parse the SQL, unrecognized error from calcite: [%s]", - inner.getMessage() - ); + return InvalidSqlInput.exception(inner.getMessage()); } catch (RelOptPlanner.CannotPlanException inner) { return DruidException.forPersona(DruidException.Persona.USER) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 9f15d3822866..2d292621b57d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -684,16 +684,7 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e .ofCategory(DruidException.Category.UNSUPPORTED) .build(exception, "Unhandled Query Planning Failure, see broker logs for details"); } else { - // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. - // For this reason, we consider these as targetting a more expert persona, i.e. the admin instead of the actual - // user. - throw DruidException.forPersona(DruidException.Persona.ADMIN) - .ofCategory(DruidException.Category.INVALID_INPUT) - .build( - exception, - "Query could not be planned. A possible reason is [%s]", - errorMessage - ); + throw InvalidSqlInput.exception("Query could not be planned. A possible reason is [%s]", errorMessage); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index dfee7d0e3a22..9c34c89bd8fd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -664,11 +664,7 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St private DruidExceptionMatcher buildUnplannableExceptionMatcher() { - if (testBuilder().isDecoupledMode()) { - return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); - } else { - return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general"); - } + return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); } /** From abe109262edf128e67f6373b96dc62ede370e3ea Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 11 Jun 2024 14:34:12 -0400 Subject: [PATCH 02/10] * revert change to queryHandler and related tests, based on review comments --- .../druid/sql/calcite/planner/QueryHandler.java | 11 ++++++++++- .../druid/sql/calcite/BaseCalciteQueryTest.java | 6 +++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java index 2d292621b57d..9f15d3822866 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/QueryHandler.java @@ -684,7 +684,16 @@ private DruidException buildSQLPlanningError(RelOptPlanner.CannotPlanException e .ofCategory(DruidException.Category.UNSUPPORTED) .build(exception, "Unhandled Query Planning Failure, see broker logs for details"); } else { - throw InvalidSqlInput.exception("Query could not be planned. A possible reason is [%s]", errorMessage); + // Planning errors are more like hints: it isn't guaranteed that the planning error is actually what went wrong. + // For this reason, we consider these as targetting a more expert persona, i.e. the admin instead of the actual + // user. + throw DruidException.forPersona(DruidException.Persona.ADMIN) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build( + exception, + "Query could not be planned. A possible reason is [%s]", + errorMessage + ); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 9c34c89bd8fd..dfee7d0e3a22 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -664,7 +664,11 @@ public void assertQueryIsUnplannable(final PlannerConfig plannerConfig, final St private DruidExceptionMatcher buildUnplannableExceptionMatcher() { - return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); + if (testBuilder().isDecoupledMode()) { + return new DruidExceptionMatcher(Persona.USER, Category.INVALID_INPUT, "invalidInput"); + } else { + return new DruidExceptionMatcher(Persona.ADMIN, Category.INVALID_INPUT, "general"); + } } /** From 5d730475dca1c90cc2fd4abc15d890a1d812acb1 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Tue, 11 Jun 2024 15:58:16 -0400 Subject: [PATCH 03/10] * add test --- .../druid/sql/calcite/CalciteInsertDmlTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java index bb9c03aa3c88..b40a4c87c3ab 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteInsertDmlTest.java @@ -1626,6 +1626,18 @@ public void testInsertWithInvalidSelectStatement() .verify(); } + @Test + public void testInsertWithLongIdentifer() + { + // This test fails because an identifer is specified of length 200, which exceeds the length limit of 128 + // characters. + String longIdentifer = new String(new char[200]).replace('\0', 'a'); + testIngestionQuery() + .sql(StringUtils.format("INSERT INTO t SELECT %s FROM foo PARTITIONED BY ALL", longIdentifer)) // count is a keyword + .expectValidationError(invalidSqlContains(StringUtils.format("Length of identifier '%s' must be less than or equal to 128 characters", longIdentifer))) + .verify(); + } + @Test public void testInsertWithUnnamedColumnInSelectStatement() { From cd3763edb50ea647c57c4f9f786a67a9603fb70b Mon Sep 17 00:00:00 2001 From: zachjsh Date: Mon, 18 Nov 2024 15:01:06 -0500 Subject: [PATCH 04/10] * add taskStatus dimension to `service/heartbeat` metric --- docs/operations/metrics.md | 6 +++--- .../src/main/resources/defaultMetricDimensions.json | 2 +- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 6 ++++++ .../org/apache/druid/indexing/common/task/Task.java | 11 +++++++++++ .../seekablestream/SeekableStreamIndexTask.java | 7 +++++++ .../druid/indexing/common/task/AbstractTaskTest.java | 7 +++++++ .../supervisor/SeekableStreamSupervisorStateTest.java | 2 ++ .../src/main/java/org/apache/druid/cli/CliPeon.java | 3 +++ 8 files changed, 40 insertions(+), 4 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 4df9e7987ccc..2bf49062cfe4 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -391,9 +391,9 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon ### Service Health -|Metric|Description|Dimensions|Normal value| -|------|-----------|----------|------------| -| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `dataSource`, `tags` on the Peon |1| +|Metric|Description| Dimensions |Normal value| +|------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------| +| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `taskStatus`, `dataSource`, `tags` on the Peon |1| ### Historical diff --git a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json index 91a74707e4f0..6f08d7429ea7 100644 --- a/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json +++ b/extensions-contrib/statsd-emitter/src/main/resources/defaultMetricDimensions.json @@ -187,7 +187,7 @@ "namespace/cache/numEntries" : { "dimensions" : [], "type" : "gauge" }, "namespace/cache/heapSizeInBytes" : { "dimensions" : [], "type" : "gauge" }, - "service/heartbeat" : { "dimensions" : ["leader"], "type" : "count" }, + "service/heartbeat" : { "dimensions" : ["leader", "workerVersion", "category", "status", "taskId", "groupId", "dataSource", "taskStatus" ], "type" : "count" }, "killTask/availableSlot/count" : { "dimensions" : [], "type" : "count" }, "killTask/maxSlot/count" : { "dimensions" : [], "type" : "count" }, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index db8db1fdb38c..0ea5465920dd 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2232,6 +2232,8 @@ public void testRunWithPauseAndResume() throws Exception ) ); + Assert.assertEquals(Status.NOT_STARTED.toString(), task.getStatus()); + final ListenableFuture future = runTask(task); // Insert some data, but not enough for the task to finish @@ -2242,6 +2244,8 @@ public void testRunWithPauseAndResume() throws Exception } Assert.assertEquals(2, countEvents(task)); Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + Assert.assertEquals(Status.READING.toString(), task.getStatus()); + Map currentOffsets = OBJECT_MAPPER.readValue( task.getRunner().pause().getEntity().toString(), @@ -2250,6 +2254,7 @@ public void testRunWithPauseAndResume() throws Exception } ); Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus()); + Assert.assertEquals(Status.PAUSED.toString(), task.getStatus()); // Insert remaining data insertData(Iterables.skip(records, 4)); @@ -2267,6 +2272,7 @@ public void testRunWithPauseAndResume() throws Exception Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); + Assert.assertEquals(Status.PUBLISHING.toString(), task.getStatus()); verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3)); // Check published metadata and segments in deep storage diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cacdc47f520a..cb653f79e7cc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -48,6 +48,8 @@ import org.apache.druid.server.security.ResourceType; import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import java.util.Map; import java.util.Optional; import java.util.Set; @@ -313,6 +315,15 @@ default TaskIdentifier getMetadata() return new TaskIdentifier(this.getId(), this.getGroupId(), this.getType()); } + /** + * @return The status of the task. Note: this interface method is unstable at this time. + */ + @Nullable + default String getStatus() + { + return null; + } + static TaskInfo toTaskIdentifierInfo(TaskInfo taskInfo) { return new TaskInfo<>( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 41cd084cd960..e1c29a3e384a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -180,6 +180,13 @@ public QueryRunner getQueryRunner(Query query) return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext); } + @Nullable + @Override + public String getStatus() + { + return (getRunner() != null && getRunner().getStatus() != null) ? getRunner().getStatus().toString() : null; + } + public Appenderator newAppenderator( TaskToolbox toolbox, SegmentGenerationMetrics metrics, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index ba210fee228a..6a94b74d0b5d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -305,4 +305,11 @@ public void testBatchIOConfigNone() Assert.assertEquals(AbstractTask.IngestionMode.NONE, ingestionMode); } + @Test + public void testGetStatus() + { + AbstractTask task = new NoopTask("myID", null, null, 1, 0, null); + Assert.assertNull(task.getStatus()); + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b97..46ad4ef43f21 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1546,6 +1546,8 @@ public Duration getEmissionDuration() "0" ); + Assert.assertNull(id1.getStatus()); + final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); Collection workItems = new ArrayList<>(); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 15374625d301..10a3c7034503 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -308,6 +308,9 @@ public Supplier> heartbeatDimensions(Task task) builder.put(DruidMetrics.DATASOURCE, task.getDataSource()); builder.put(DruidMetrics.TASK_TYPE, task.getType()); builder.put(DruidMetrics.GROUP_ID, task.getGroupId()); + if (task.getStatus() != null) { + builder.put(DruidMetrics.TASK_STATUS, task.getStatus()); + } Map tags = task.getContextValue(DruidMetrics.TAGS); if (tags != null && !tags.isEmpty()) { builder.put(DruidMetrics.TAGS, tags); From 19d6adfe5cefa5801f5344dd193296c842d5d993 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Wed, 4 Dec 2024 15:25:31 -0500 Subject: [PATCH 05/10] * address review comments --- docs/operations/metrics.md | 6 +- .../druid/indexing/common/task/Task.java | 11 -- .../SeekableStreamIndexTask.java | 1 - .../common/task/AbstractTaskTest.java | 7 - .../java/org/apache/druid/cli/CliPeon.java | 40 +++-- .../org/apache/druid/cli/CliPeonTest.java | 169 ++++++++++++++++++ 6 files changed, 196 insertions(+), 38 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 2bf49062cfe4..c997d58f2e6a 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -391,9 +391,9 @@ These metrics are emitted by the Druid Coordinator in every run of the correspon ### Service Health -|Metric|Description| Dimensions |Normal value| -|------|-----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------| -| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `taskStatus`, `dataSource`, `tags` on the Peon |1| +|Metric|Description|Dimensions|Normal value| +|------|-----------|----------|------------| +| `service/heartbeat` | Metric indicating the service is up. This metric is emitted only when `ServiceStatusMonitor` is enabled. | `leader` on the Overlord and Coordinator.
`workerVersion`, `category`, `status` on the Middle Manager.
`taskId`, `groupId`, `taskType`, `status`, `dataSource`, `tags` on the Peon |1| ### Historical diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cb653f79e7cc..cacdc47f520a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -48,8 +48,6 @@ import org.apache.druid.server.security.ResourceType; import javax.annotation.Nonnull; -import javax.annotation.Nullable; - import java.util.Map; import java.util.Optional; import java.util.Set; @@ -315,15 +313,6 @@ default TaskIdentifier getMetadata() return new TaskIdentifier(this.getId(), this.getGroupId(), this.getType()); } - /** - * @return The status of the task. Note: this interface method is unstable at this time. - */ - @Nullable - default String getStatus() - { - return null; - } - static TaskInfo toTaskIdentifierInfo(TaskInfo taskInfo) { return new TaskInfo<>( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index e1c29a3e384a..7960fba2061f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -181,7 +181,6 @@ public QueryRunner getQueryRunner(Query query) } @Nullable - @Override public String getStatus() { return (getRunner() != null && getRunner().getStatus() != null) ? getRunner().getStatus().toString() : null; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index 6a94b74d0b5d..ba210fee228a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -305,11 +305,4 @@ public void testBatchIOConfigNone() Assert.assertEquals(AbstractTask.IngestionMode.NONE, ingestionMode); } - @Test - public void testGetStatus() - { - AbstractTask task = new NoopTask("myID", null, null, 1, 0, null); - Assert.assertNull(task.getStatus()); - } - } diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 10a3c7034503..50e871017e01 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -91,6 +91,7 @@ import org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.worker.executor.ExecutorLifecycle; import org.apache.druid.indexing.worker.executor.ExecutorLifecycleConfig; import org.apache.druid.indexing.worker.shuffle.DeepStorageIntermediaryDataManager; @@ -303,22 +304,7 @@ public void configure(Binder binder) @Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING) public Supplier> heartbeatDimensions(Task task) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.put(DruidMetrics.TASK_ID, task.getId()); - builder.put(DruidMetrics.DATASOURCE, task.getDataSource()); - builder.put(DruidMetrics.TASK_TYPE, task.getType()); - builder.put(DruidMetrics.GROUP_ID, task.getGroupId()); - if (task.getStatus() != null) { - builder.put(DruidMetrics.TASK_STATUS, task.getStatus()); - } - Map tags = task.getContextValue(DruidMetrics.TAGS); - if (tags != null && !tags.isEmpty()) { - builder.put(DruidMetrics.TAGS, tags); - } - - return Suppliers.ofInstance( - builder.build() - ); + return Suppliers.ofInstance(CliPeon.heartbeatDimensions(task)); } @Provides @@ -568,6 +554,28 @@ static void configureIntermediaryData(Binder binder) shuffleClientBiddy.addBinding("deepstore").to(DeepStorageShuffleClient.class).in(LazySingleton.class); } + static Map heartbeatDimensions(Task task) + { + ImmutableMap.Builder builder = ImmutableMap.builder(); + builder.put(DruidMetrics.TASK_ID, task.getId()); + builder.put(DruidMetrics.DATASOURCE, task.getDataSource()); + builder.put(DruidMetrics.TASK_TYPE, task.getType()); + builder.put(DruidMetrics.GROUP_ID, task.getGroupId()); + Map tags = task.getContextValue(DruidMetrics.TAGS); + if (tags != null && !tags.isEmpty()) { + builder.put(DruidMetrics.TAGS, tags); + } + + if (task instanceof SeekableStreamIndexTask) { + SeekableStreamIndexTask streamingTask = (SeekableStreamIndexTask) task; + if (streamingTask.getStatus() != null) { + builder.put(DruidMetrics.STATUS, streamingTask.getStatus()); + } + } + + return builder.build(); + } + public class BroadcastSegmentLoadingModule implements Module { @Override diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java index ad611a12ac7d..b3dba662cdb6 100644 --- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java @@ -19,20 +19,45 @@ package org.apache.druid.cli; +import com.google.common.collect.ImmutableMap; import com.google.inject.Injector; import org.apache.commons.io.FileUtils; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.AllGranularity; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; + +import static org.easymock.EasyMock.mock; public class CliPeonTest { @@ -73,6 +98,50 @@ public void testCliPeonK8sANdWorkerIsK8sMode() throws IOException Assert.assertNotNull(runnable.makeInjector()); } + @Test + public void testCliPeonHeartbeatDimensions() throws IOException + { + // non-streaming task + String taskId = "testTaskId"; + String groupId = "testGroupId"; + String datasource = "testDatasource"; + Map tags = ImmutableMap.of("tag1", "value1"); + Assert.assertEquals( + ImmutableMap.of( + DruidMetrics.TASK_ID, taskId, + DruidMetrics.GROUP_ID, groupId, + DruidMetrics.DATASOURCE, datasource, + DruidMetrics.TASK_TYPE, NoopTask.TYPE + ), + CliPeon.heartbeatDimensions(new TestTask(taskId, groupId, datasource, 0, 0, ImmutableMap.of())) + ); + + // streaming task with empty ags + Assert.assertEquals( + ImmutableMap.of( + DruidMetrics.TASK_ID, taskId, + DruidMetrics.GROUP_ID, groupId, + DruidMetrics.DATASOURCE, datasource, + DruidMetrics.TASK_TYPE, TestStreamingTask.TYPE, + DruidMetrics.STATUS, TestStreamingTask.STATUS + ), + CliPeon.heartbeatDimensions(new TestStreamingTask(taskId, datasource, ImmutableMap.of(DruidMetrics.TAGS, ImmutableMap.of()), groupId)) + ); + + // streaming task with non-empty ags + Assert.assertEquals( + ImmutableMap.of( + DruidMetrics.TASK_ID, taskId, + DruidMetrics.GROUP_ID, groupId, + DruidMetrics.DATASOURCE, datasource, + DruidMetrics.TASK_TYPE, TestStreamingTask.TYPE, + DruidMetrics.STATUS, TestStreamingTask.STATUS, + DruidMetrics.TAGS, tags + ), + CliPeon.heartbeatDimensions(new TestStreamingTask(taskId, datasource, ImmutableMap.of(DruidMetrics.TAGS, tags), groupId)) + ); + } + private static class FakeCliPeon extends CliPeon { List taskAndStatusFile = new ArrayList<>(); @@ -96,4 +165,104 @@ private static class FakeCliPeon extends CliPeon } } + + private static class TestTask extends NoopTask + { + + public TestTask( + String id, + String groupId, + String dataSource, + long runTimeMillis, + long isReadyTime, + Map context + ) + { + super(id, groupId, dataSource, runTimeMillis, isReadyTime, context); + } + } + + private static class TestStreamingTask extends SeekableStreamIndexTask + { + static final String TYPE = "testStreaming"; + static final String STATUS = SeekableStreamIndexTaskRunner.Status.PAUSED.toString(); + + public TestStreamingTask( + String id, + String datasource, + @Nullable Map context, + @Nullable String groupId + ) + { + this( + id, + null, + DataSchema.builder() + .withDataSource(datasource) + .withTimestamp(new TimestampSpec(null, null, null)) + .withDimensions(new DimensionsSpec(Collections.emptyList())) + .withGranularity(new ArbitraryGranularitySpec(new AllGranularity(), Collections.emptyList())) + .build(), + mock(SeekableStreamIndexTaskTuningConfig.class), + new TestSeekableStreamIndexTaskIOConfig(), + context, + groupId + ); + } + + private TestStreamingTask( + String id, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId + ) + { + + super(id, taskResource, dataSchema, tuningConfig, ioConfig, context, groupId); + } + + @Override + protected SeekableStreamIndexTaskRunner createTaskRunner() + { + return null; + } + + @Override + protected RecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) + { + return null; + } + + @Override + public String getStatus() + { + return STATUS; + } + + @Override + public String getType() + { + return TYPE; + } + } + + private static class TestSeekableStreamIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig + { + public TestSeekableStreamIndexTaskIOConfig() + { + super( + null, + "someSequence", + new SeekableStreamStartSequenceNumbers<>("abc", "def", Collections.emptyMap(), Collections.emptyMap(), null), + new SeekableStreamEndSequenceNumbers<>("abc", "def", Collections.emptyMap(), Collections.emptyMap()), + false, + DateTimes.nowUtc().minusDays(2), + DateTimes.nowUtc(), + new CsvInputFormat(null, null, true, null, 0, null) + ); + } + } } From 8cb78efa24ebb0e896e7d85f145b7d7fdb30f22c Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 5 Dec 2024 00:30:42 -0500 Subject: [PATCH 06/10] * fix compilation error from merge --- services/src/test/java/org/apache/druid/cli/CliPeonTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java index b3dba662cdb6..cd28316a9613 100644 --- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java @@ -42,6 +42,7 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -261,7 +262,8 @@ public TestSeekableStreamIndexTaskIOConfig() false, DateTimes.nowUtc().minusDays(2), DateTimes.nowUtc(), - new CsvInputFormat(null, null, true, null, 0, null) + new CsvInputFormat(null, null, true, null, 0, null), + Duration.standardHours(2).getStandardMinutes() ); } } From 75d6ec2e7608993d976b82c4055dd32b8424206e Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 5 Dec 2024 15:05:12 -0500 Subject: [PATCH 07/10] * improve test coverage --- .../SeekableStreamIndexTask.java | 3 +- .../SeekableStreamSupervisorStateTest.java | 45 ++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index a66a06e41e0e..6be8c706d0ae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -182,7 +182,8 @@ public QueryRunner getQueryRunner(Query query) @Nullable public String getStatus() { - return (getRunner() != null && getRunner().getStatus() != null) ? getRunner().getStatus().toString() : null; + SeekableStreamIndexTaskRunner.Status status = (getRunner() != null) ? getRunner().getStatus() : null; + return (status != null) ? status.toString() : null; } public Appenderator newAppenderator( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 36afe2bf13f7..904394ef96aa 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -143,6 +143,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport private SeekableStreamSupervisorSpec spec; private SeekableStreamIndexTaskClient indexTaskClient; private RecordSupplier recordSupplier; + private SeekableStreamIndexTaskRunner streamingTaskRunner; private RowIngestionMetersFactory rowIngestionMetersFactory; private SupervisorStateManagerConfig supervisorConfig; @@ -161,6 +162,7 @@ public void setupTest() spec = createMock(SeekableStreamSupervisorSpec.class); indexTaskClient = createMock(SeekableStreamIndexTaskClient.class); recordSupplier = createMock(RecordSupplier.class); + streamingTaskRunner = createMock(SeekableStreamIndexTaskRunner.class); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -1459,6 +1461,8 @@ public Duration getEmissionDuration() supervisor.start(); supervisor.runInternal(); + Assert.assertNull(id1.getStatus()); + supervisor.checkpoint( 0, new TestSeekableStreamDataSourceMetadata( @@ -1475,7 +1479,7 @@ public Duration getEmissionDuration() Assert.assertTrue(supervisor.getNoticesQueueSize() == 0); } - @Test(timeout = 10_000L) + @Test(timeout = 60_000L) public void testSupervisorStopTaskGroupEarly() throws JsonProcessingException, InterruptedException { DateTime startTime = DateTimes.nowUtc(); @@ -1518,6 +1522,8 @@ public Duration getEmissionDuration() EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes(); EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + EasyMock.expect(streamingTaskRunner.getStatus()).andReturn(null); + EasyMock.expect(streamingTaskRunner.getStatus()).andReturn(SeekableStreamIndexTaskRunner.Status.NOT_STARTED); SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig(); @@ -1543,11 +1549,10 @@ public Duration getEmissionDuration() ioConfig ), context, - "0" + "0", + streamingTaskRunner ); - Assert.assertNull(id1.getStatus()); - final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1); Collection workItems = new ArrayList<>(); @@ -1597,6 +1602,9 @@ public Duration getEmissionDuration() supervisor.runInternal(); supervisor.handoffTaskGroupsEarly(ImmutableList.of(0)); + Assert.assertNull(id1.getStatus()); + Assert.assertEquals("NOT_STARTED", id1.getStatus()); + while (supervisor.getNoticesQueueSize() > 0) { Thread.sleep(100); } @@ -2706,6 +2714,8 @@ public String toString() private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask { + private final SeekableStreamIndexTaskRunner streamingTaskRunner; + public TestSeekableStreamIndexTask( String id, @Nullable TaskResource taskResource, @@ -2715,6 +2725,29 @@ public TestSeekableStreamIndexTask( @Nullable Map context, @Nullable String groupId ) + { + this( + id, + taskResource, + dataSchema, + tuningConfig, + ioConfig, + context, + groupId, + null + ); + } + + public TestSeekableStreamIndexTask( + String id, + @Nullable TaskResource taskResource, + DataSchema dataSchema, + SeekableStreamIndexTaskTuningConfig tuningConfig, + SeekableStreamIndexTaskIOConfig ioConfig, + @Nullable Map context, + @Nullable String groupId, + @Nullable SeekableStreamIndexTaskRunner streamingTaskRunner + ) { super( id, @@ -2725,12 +2758,14 @@ public TestSeekableStreamIndexTask( context, groupId ); + this.streamingTaskRunner = streamingTaskRunner; } + @Nullable @Override protected SeekableStreamIndexTaskRunner createTaskRunner() { - return null; + return streamingTaskRunner; } @Override From 41d8bbafeb62f2f7d5b4630ae8288fb9ab9fa821 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 5 Dec 2024 15:23:11 -0500 Subject: [PATCH 08/10] Address review comments --- .../apache/druid/indexing/kafka/KafkaIndexTaskTest.java | 8 ++++---- .../indexing/seekablestream/SeekableStreamIndexTask.java | 5 ++++- .../supervisor/SeekableStreamSupervisorStateTest.java | 6 +++--- services/src/main/java/org/apache/druid/cli/CliPeon.java | 6 +++--- .../src/test/java/org/apache/druid/cli/CliPeonTest.java | 2 +- 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 6849f0a41f2c..0621f6f9ba15 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2270,7 +2270,7 @@ public void testRunWithPauseAndResume() throws Exception ) ); - Assert.assertEquals(Status.NOT_STARTED.toString(), task.getStatus()); + Assert.assertEquals(Status.NOT_STARTED.toString(), task.getCurrentRunnerStatus()); final ListenableFuture future = runTask(task); @@ -2282,7 +2282,7 @@ public void testRunWithPauseAndResume() throws Exception } Assert.assertEquals(2, countEvents(task)); Assert.assertEquals(Status.READING, task.getRunner().getStatus()); - Assert.assertEquals(Status.READING.toString(), task.getStatus()); + Assert.assertEquals(Status.READING.toString(), task.getCurrentRunnerStatus()); Map currentOffsets = OBJECT_MAPPER.readValue( @@ -2292,7 +2292,7 @@ public void testRunWithPauseAndResume() throws Exception } ); Assert.assertEquals(Status.PAUSED, task.getRunner().getStatus()); - Assert.assertEquals(Status.PAUSED.toString(), task.getStatus()); + Assert.assertEquals(Status.PAUSED.toString(), task.getCurrentRunnerStatus()); // Insert remaining data insertData(Iterables.skip(records, 4)); @@ -2310,7 +2310,7 @@ public void testRunWithPauseAndResume() throws Exception Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); - Assert.assertEquals(Status.PUBLISHING.toString(), task.getStatus()); + Assert.assertEquals(Status.PUBLISHING.toString(), task.getCurrentRunnerStatus()); verifyTaskMetrics(task, RowMeters.with().bytes(getTotalSizeOfRecords(2, 5)).totalProcessed(3)); // Check published metadata and segments in deep storage diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 6be8c706d0ae..7b10c0afdccf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -179,8 +179,11 @@ public QueryRunner getQueryRunner(Query query) return (queryPlus, responseContext) -> queryPlus.run(getRunner().getAppenderator(), responseContext); } + /** + * @return the current status of this task. + */ @Nullable - public String getStatus() + public String getCurrentRunnerStatus() { SeekableStreamIndexTaskRunner.Status status = (getRunner() != null) ? getRunner().getStatus() : null; return (status != null) ? status.toString() : null; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 904394ef96aa..556f3fd2d48d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1461,7 +1461,7 @@ public Duration getEmissionDuration() supervisor.start(); supervisor.runInternal(); - Assert.assertNull(id1.getStatus()); + Assert.assertNull(id1.getCurrentRunnerStatus()); supervisor.checkpoint( 0, @@ -1602,8 +1602,8 @@ public Duration getEmissionDuration() supervisor.runInternal(); supervisor.handoffTaskGroupsEarly(ImmutableList.of(0)); - Assert.assertNull(id1.getStatus()); - Assert.assertEquals("NOT_STARTED", id1.getStatus()); + Assert.assertNull(id1.getCurrentRunnerStatus()); + Assert.assertEquals("NOT_STARTED", id1.getCurrentRunnerStatus()); while (supervisor.getNoticesQueueSize() > 0) { Thread.sleep(100); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 50e871017e01..544a7031b0da 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -304,7 +304,7 @@ public void configure(Binder binder) @Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING) public Supplier> heartbeatDimensions(Task task) { - return Suppliers.ofInstance(CliPeon.heartbeatDimensions(task)); + return () -> CliPeon.heartbeatDimensions(task); } @Provides @@ -568,8 +568,8 @@ static Map heartbeatDimensions(Task task) if (task instanceof SeekableStreamIndexTask) { SeekableStreamIndexTask streamingTask = (SeekableStreamIndexTask) task; - if (streamingTask.getStatus() != null) { - builder.put(DruidMetrics.STATUS, streamingTask.getStatus()); + if (streamingTask.getCurrentRunnerStatus() != null) { + builder.put(DruidMetrics.STATUS, streamingTask.getCurrentRunnerStatus()); } } diff --git a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java index cd28316a9613..c3ed57cac61d 100644 --- a/services/src/test/java/org/apache/druid/cli/CliPeonTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliPeonTest.java @@ -238,7 +238,7 @@ protected RecordSupplier newTaskRecordSupplier(final } @Override - public String getStatus() + public String getCurrentRunnerStatus() { return STATUS; } From 95771694323f0808df629f82bb15acb46bee279a Mon Sep 17 00:00:00 2001 From: zachjsh Date: Thu, 5 Dec 2024 15:37:46 -0500 Subject: [PATCH 09/10] * remove unuused import --- services/src/main/java/org/apache/druid/cli/CliPeon.java | 1 - 1 file changed, 1 deletion(-) diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 544a7031b0da..328f28fd4ada 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -25,7 +25,6 @@ import com.github.rvesse.airline.annotations.Option; import com.github.rvesse.airline.annotations.restrictions.Required; import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; From 01296cf575a1ab668d8a28902bee641dc4e9bc64 Mon Sep 17 00:00:00 2001 From: zachjsh Date: Fri, 6 Dec 2024 11:58:42 -0500 Subject: [PATCH 10/10] * address remaining comments --- .../supervisor/SeekableStreamSupervisorStateTest.java | 2 +- services/src/main/java/org/apache/druid/cli/CliPeon.java | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 556f3fd2d48d..f182ced69fbb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1479,7 +1479,7 @@ public Duration getEmissionDuration() Assert.assertTrue(supervisor.getNoticesQueueSize() == 0); } - @Test(timeout = 60_000L) + @Test(timeout = 10_000L) public void testSupervisorStopTaskGroupEarly() throws JsonProcessingException, InterruptedException { DateTime startTime = DateTimes.nowUtc(); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 328f28fd4ada..9d4bffecb5a8 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -567,8 +567,9 @@ static Map heartbeatDimensions(Task task) if (task instanceof SeekableStreamIndexTask) { SeekableStreamIndexTask streamingTask = (SeekableStreamIndexTask) task; - if (streamingTask.getCurrentRunnerStatus() != null) { - builder.put(DruidMetrics.STATUS, streamingTask.getCurrentRunnerStatus()); + String status = streamingTask.getCurrentRunnerStatus(); + if (status != null) { + builder.put(DruidMetrics.STATUS, status); } }