From 73d1aa707e86c2bff26832f21e51a710b277cd9a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 26 Sep 2025 22:20:48 +0530 Subject: [PATCH 01/10] Add embedded test for K8s task logs --- .../embedded/indexing/IngestionSmokeTest.java | 45 +++++++++++++++++++ .../k8s/overlord/KubernetesWorkItem.java | 3 +- .../druid/indexing/common/task/NoopTask.java | 16 ++++++- 3 files changed, 60 insertions(+), 4 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java index c33504d368ad..60dba265f6e1 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java @@ -19,7 +19,9 @@ package org.apache.druid.testing.embedded.indexing; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import org.apache.commons.io.IOUtils; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; @@ -27,6 +29,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.TaskBuilder; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; @@ -41,6 +44,7 @@ import org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.http.SqlTaskStatus; +import org.apache.druid.tasklogs.TaskLogStreamer; import org.apache.druid.testing.embedded.EmbeddedBroker; import org.apache.druid.testing.embedded.EmbeddedCoordinator; import org.apache.druid.testing.embedded.EmbeddedDruidCluster; @@ -62,6 +66,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -318,6 +324,45 @@ public void test_runKafkaSupervisor() Assertions.assertTrue(supervisorStatus.isSuspended()); } + @Test + public void test_streamLogs_ofCancelledTask() throws Exception + { + final String taskId = IdUtils.getRandomId(); + final long runDurationMillis = 100_000L; + cluster.callApi().onLeaderOverlord( + o -> o.runTask(taskId, new NoopTask(taskId, null, null, runDurationMillis, 0L, null)) + ); + + eventCollector.latchableEmitter().waitForEvent( + event -> event.hasMetricName(NoopTask.EVENT_STARTED) + .hasDimension(DruidMetrics.TASK_ID, taskId) + ); + + cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId)); + + eventCollector.latchableEmitter().waitForEvent( + event -> event.hasMetricName("task/run/time") + .hasDimension(DruidMetrics.TASK_ID, taskId) + .hasDimension(DruidMetrics.TASK_STATUS, "FAILED") + ); + + final Optional streamOptional = + overlord.bindings() + .getInstance(TaskLogStreamer.class) + .streamTaskLog(taskId, 0); + + Assertions.assertTrue(streamOptional.isPresent()); + + final String logs = IOUtils.toString(streamOptional.get(), StandardCharsets.UTF_8); + + final String expectedLogLine = StringUtils.format( + "Running task[%s] for [%d] millis", + taskId, runDurationMillis + ); + Assertions.assertTrue(logs.contains(expectedLogLine)); + Assertions.assertTrue(logs.contains("Task has been cancelled by user")); + } + private KafkaSupervisorSpec createKafkaSupervisor(String topic) { return MoreResources.Supervisor.KAFKA_JSON diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 6e0794a1e45f..544e32f01956 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -52,8 +52,7 @@ protected void shutdown() { if (isShutdown.compareAndSet(false, true)) { synchronized (this) { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); + kubernetesPeonLifecycle.shutdown(); } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java index c545f08c63b1..aa813cb6dd7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java @@ -29,6 +29,7 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.security.ResourceAction; import javax.annotation.Nonnull; @@ -36,15 +37,20 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** */ public class NoopTask extends AbstractTask implements PendingSegmentAllocatingTask { + private static final Logger log = new Logger(NoopTask.class); + public static final String TYPE = "noop"; public static final String EVENT_STARTED = "task/noop/started"; private static final int DEFAULT_RUN_TIME = 2500; + private final CountDownLatch isShutdown = new CountDownLatch(1); private final long runTime; @JsonCreator @@ -97,14 +103,20 @@ public boolean isReady(TaskActionClient taskActionClient) @Override public void stopGracefully(TaskConfig taskConfig) { + isShutdown.countDown(); } @Override public TaskStatus runTask(TaskToolbox toolbox) throws Exception { + log.info("Running task[%s] for [%d] millis", getId(), runTime); emitMetric(toolbox.getEmitter(), EVENT_STARTED, 1); - Thread.sleep(runTime); - return TaskStatus.success(getId()); + if (isShutdown.await(runTime, TimeUnit.MILLISECONDS)) { + log.info("Task has been cancelled by user."); + return TaskStatus.failure(getId(), "Cancelled"); + } else { + return TaskStatus.success(getId()); + } } @Override From fb2f93da0a992bbc0125fcf4b0b9b4e6e3d53d98 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 26 Sep 2025 22:24:03 +0530 Subject: [PATCH 02/10] Revert extra changes --- .../java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 544e32f01956..6e0794a1e45f 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -52,7 +52,8 @@ protected void shutdown() { if (isShutdown.compareAndSet(false, true)) { synchronized (this) { - kubernetesPeonLifecycle.shutdown(); + this.kubernetesPeonLifecycle.startWatchingLogs(); + this.kubernetesPeonLifecycle.shutdown(); } } } From 3b69f80e4fae9c6b52a88942c23c73f8d836ca6d Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 08:15:25 +0530 Subject: [PATCH 03/10] Fix deps --- embedded-tests/pom.xml | 5 +++++ .../org/apache/druid/k8s/overlord/KubernetesWorkItem.java | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml index 3407b6831a83..956c66bb5ff5 100644 --- a/embedded-tests/pom.xml +++ b/embedded-tests/pom.xml @@ -317,6 +317,11 @@ fastutil-core test + + commons-io + commons-io + test + org.apache.kafka kafka-clients diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 6e0794a1e45f..11378e229099 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -52,7 +52,6 @@ protected void shutdown() { if (isShutdown.compareAndSet(false, true)) { synchronized (this) { - this.kubernetesPeonLifecycle.startWatchingLogs(); this.kubernetesPeonLifecycle.shutdown(); } } From eb2955e1f0168bdd13c1297ed1cb741ee78a79ae Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 09:35:35 +0530 Subject: [PATCH 04/10] Fix tests --- .../testing/embedded/indexing/IngestionSmokeTest.java | 1 - .../apache/druid/k8s/overlord/KubernetesWorkItemTest.java | 2 -- .../org/apache/druid/indexing/common/task/NoopTask.java | 3 +-- .../indexing/overlord/SingleTaskBackgroundRunnerTest.java | 7 +++---- 4 files changed, 4 insertions(+), 9 deletions(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java index 60dba265f6e1..d1dda6b05c03 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java @@ -360,7 +360,6 @@ public void test_streamLogs_ofCancelledTask() throws Exception taskId, runDurationMillis ); Assertions.assertTrue(logs.contains(expectedLogLine)); - Assertions.assertTrue(logs.contains("Task has been cancelled by user")); } private KafkaSupervisorSpec createKafkaSupervisor(String topic) diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 23c4b444811a..1735e9c960c1 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -58,8 +58,6 @@ public void test_shutdown_withKubernetesPeonLifecycle() { kubernetesPeonLifecycle.shutdown(); EasyMock.expectLastCall(); - kubernetesPeonLifecycle.startWatchingLogs(); - EasyMock.expectLastCall(); replayAll(); workItem = new KubernetesWorkItem( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java index aa813cb6dd7b..e838da2da7d1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java @@ -112,8 +112,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception log.info("Running task[%s] for [%d] millis", getId(), runTime); emitMetric(toolbox.getEmitter(), EVENT_STARTED, 1); if (isShutdown.await(runTime, TimeUnit.MILLISECONDS)) { - log.info("Task has been cancelled by user."); - return TaskStatus.failure(getId(), "Cancelled"); + return TaskStatus.failure(getId(), "Canceled"); } else { return TaskStatus.success(getId()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 70122b6ab9e6..58b73e3d6eb1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -339,10 +339,9 @@ public boolean waitForCleanupToFinish() runner.stop(); Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode()); - Assert.assertEquals( - "Canceled as task execution process stopped", - statusHolder.get().getErrorMsg() - ); + + // Do not verify the failure error message as there is a race condition + // where the error message may either originate from NoopTask or the runner } private static class RestorableTask extends AbstractTask From e5b4509753a99c9fe2c3547bc74e988203e63bec Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 15:40:44 +0530 Subject: [PATCH 05/10] Re-run tests --- .../druid/testing/embedded/indexing/IngestionSmokeTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java index d1dda6b05c03..13245227d2aa 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java @@ -359,7 +359,8 @@ public void test_streamLogs_ofCancelledTask() throws Exception "Running task[%s] for [%d] millis", taskId, runDurationMillis ); - Assertions.assertTrue(logs.contains(expectedLogLine)); + Assertions.assertFalse(logs.isEmpty()); + Assertions.assertTrue(logs.contains(expectedLogLine), "Actual logs are: " + logs); } private KafkaSupervisorSpec createKafkaSupervisor(String topic) From 92dc9adf82e1d19fe2a529706823de619cf2339b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 17:28:26 +0530 Subject: [PATCH 06/10] Watch logs upon shutdown --- .../java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 11378e229099..6e0794a1e45f 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -52,6 +52,7 @@ protected void shutdown() { if (isShutdown.compareAndSet(false, true)) { synchronized (this) { + this.kubernetesPeonLifecycle.startWatchingLogs(); this.kubernetesPeonLifecycle.shutdown(); } } From 9a43ea475dc1598f223654c38c7ae0dc594c431b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 17:51:05 +0530 Subject: [PATCH 07/10] Handle race condition --- .../apache/druid/indexing/overlord/hrtr/WorkerHolder.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index 4444916e69f7..44765cf10413 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -69,6 +69,7 @@ public class WorkerHolder private Worker disabledWorker; protected final AtomicBoolean disabled; + private final AtomicBoolean syncerInitialized = new AtomicBoolean(false); // Known list of tasks running/completed on this worker. protected final AtomicReference> tasksSnapshotRef; @@ -299,9 +300,12 @@ public void waitForInitialization() throws InterruptedException } } + /** + * Whether this worker has been synced successfully atleast once. + */ public boolean isInitialized() { - return syncer.isInitialized(); + return syncerInitialized.get(); } public boolean isEnabled() @@ -425,6 +429,7 @@ public void deltaSync(List changes) private void notifyListener(List announcements, boolean isWorkerDisabled) { + syncerInitialized.set(true); for (TaskAnnouncement announcement : announcements) { try { listener.taskAddedOrUpdated(announcement, WorkerHolder.this); From e9b7c401ea8baf6a0ccf9e0ecb88f091cc46eaf6 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 17:51:50 +0530 Subject: [PATCH 08/10] Revert "Handle race condition" This reverts commit 9a43ea475dc1598f223654c38c7ae0dc594c431b. --- .../apache/druid/indexing/overlord/hrtr/WorkerHolder.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index 44765cf10413..4444916e69f7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -69,7 +69,6 @@ public class WorkerHolder private Worker disabledWorker; protected final AtomicBoolean disabled; - private final AtomicBoolean syncerInitialized = new AtomicBoolean(false); // Known list of tasks running/completed on this worker. protected final AtomicReference> tasksSnapshotRef; @@ -300,12 +299,9 @@ public void waitForInitialization() throws InterruptedException } } - /** - * Whether this worker has been synced successfully atleast once. - */ public boolean isInitialized() { - return syncerInitialized.get(); + return syncer.isInitialized(); } public boolean isEnabled() @@ -429,7 +425,6 @@ public void deltaSync(List changes) private void notifyListener(List announcements, boolean isWorkerDisabled) { - syncerInitialized.set(true); for (TaskAnnouncement announcement : announcements) { try { listener.taskAddedOrUpdated(announcement, WorkerHolder.this); From 2b4612bfe714fd4f47339f2b17496c76551ed5b8 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 18:41:32 +0530 Subject: [PATCH 09/10] Fix up unit test --- .../org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 1735e9c960c1..23c4b444811a 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -58,6 +58,8 @@ public void test_shutdown_withKubernetesPeonLifecycle() { kubernetesPeonLifecycle.shutdown(); EasyMock.expectLastCall(); + kubernetesPeonLifecycle.startWatchingLogs(); + EasyMock.expectLastCall(); replayAll(); workItem = new KubernetesWorkItem( From a5935d5e65225764d4643ee1a5367cfb8f45888f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sun, 28 Sep 2025 09:21:14 +0530 Subject: [PATCH 10/10] Rename test class --- ...torDockerTest.java => KubernetesTaskRunnerDockerTest.java} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/{KubernetesClusterWithOperatorDockerTest.java => KubernetesTaskRunnerDockerTest.java} (95%) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java similarity index 95% rename from embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java rename to embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java index f9a9c10c4976..014186cdd2fc 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java @@ -28,9 +28,9 @@ /** * Runs some basic ingestion tests against latest image Druid containers running - * on a K3s cluster with druid-operator. + * on a K3s cluster with druid-operator and using {@code k8s} task runner type. */ -public class KubernetesClusterWithOperatorDockerTest extends IngestionSmokeTest implements LatestImageDockerTest +public class KubernetesTaskRunnerDockerTest extends IngestionSmokeTest implements LatestImageDockerTest { private static final String MANIFEST_TEMPLATE = "manifests/druid-service-with-operator.yaml";