diff --git a/embedded-tests/pom.xml b/embedded-tests/pom.xml
index 3407b6831a83..956c66bb5ff5 100644
--- a/embedded-tests/pom.xml
+++ b/embedded-tests/pom.xml
@@ -317,6 +317,11 @@
fastutil-core
test
+
+ commons-io
+ commons-io
+ test
+
org.apache.kafka
kafka-clients
diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index c33504d368ad..13245227d2aa 100644
--- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -19,7 +19,9 @@
package org.apache.druid.testing.embedded.indexing;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
+import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
@@ -27,6 +29,7 @@
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.TaskBuilder;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
@@ -41,6 +44,7 @@
import org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.http.SqlTaskStatus;
+import org.apache.druid.tasklogs.TaskLogStreamer;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -62,6 +66,8 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -318,6 +324,45 @@ public void test_runKafkaSupervisor()
Assertions.assertTrue(supervisorStatus.isSuspended());
}
+ @Test
+ public void test_streamLogs_ofCancelledTask() throws Exception
+ {
+ final String taskId = IdUtils.getRandomId();
+ final long runDurationMillis = 100_000L;
+ cluster.callApi().onLeaderOverlord(
+ o -> o.runTask(taskId, new NoopTask(taskId, null, null, runDurationMillis, 0L, null))
+ );
+
+ eventCollector.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName(NoopTask.EVENT_STARTED)
+ .hasDimension(DruidMetrics.TASK_ID, taskId)
+ );
+
+ cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId));
+
+ eventCollector.latchableEmitter().waitForEvent(
+ event -> event.hasMetricName("task/run/time")
+ .hasDimension(DruidMetrics.TASK_ID, taskId)
+ .hasDimension(DruidMetrics.TASK_STATUS, "FAILED")
+ );
+
+ final Optional streamOptional =
+ overlord.bindings()
+ .getInstance(TaskLogStreamer.class)
+ .streamTaskLog(taskId, 0);
+
+ Assertions.assertTrue(streamOptional.isPresent());
+
+ final String logs = IOUtils.toString(streamOptional.get(), StandardCharsets.UTF_8);
+
+ final String expectedLogLine = StringUtils.format(
+ "Running task[%s] for [%d] millis",
+ taskId, runDurationMillis
+ );
+ Assertions.assertFalse(logs.isEmpty());
+ Assertions.assertTrue(logs.contains(expectedLogLine), "Actual logs are: " + logs);
+ }
+
private KafkaSupervisorSpec createKafkaSupervisor(String topic)
{
return MoreResources.Supervisor.KAFKA_JSON
diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
similarity index 95%
rename from embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
rename to embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
index f9a9c10c4976..014186cdd2fc 100644
--- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesClusterWithOperatorDockerTest.java
+++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java
@@ -28,9 +28,9 @@
/**
* Runs some basic ingestion tests against latest image Druid containers running
- * on a K3s cluster with druid-operator.
+ * on a K3s cluster with druid-operator and using {@code k8s} task runner type.
*/
-public class KubernetesClusterWithOperatorDockerTest extends IngestionSmokeTest implements LatestImageDockerTest
+public class KubernetesTaskRunnerDockerTest extends IngestionSmokeTest implements LatestImageDockerTest
{
private static final String MANIFEST_TEMPLATE = "manifests/druid-service-with-operator.yaml";
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index c545f08c63b1..e838da2da7d1 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -29,6 +29,7 @@
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
@@ -36,15 +37,20 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
*/
public class NoopTask extends AbstractTask implements PendingSegmentAllocatingTask
{
+ private static final Logger log = new Logger(NoopTask.class);
+
public static final String TYPE = "noop";
public static final String EVENT_STARTED = "task/noop/started";
private static final int DEFAULT_RUN_TIME = 2500;
+ private final CountDownLatch isShutdown = new CountDownLatch(1);
private final long runTime;
@JsonCreator
@@ -97,14 +103,19 @@ public boolean isReady(TaskActionClient taskActionClient)
@Override
public void stopGracefully(TaskConfig taskConfig)
{
+ isShutdown.countDown();
}
@Override
public TaskStatus runTask(TaskToolbox toolbox) throws Exception
{
+ log.info("Running task[%s] for [%d] millis", getId(), runTime);
emitMetric(toolbox.getEmitter(), EVENT_STARTED, 1);
- Thread.sleep(runTime);
- return TaskStatus.success(getId());
+ if (isShutdown.await(runTime, TimeUnit.MILLISECONDS)) {
+ return TaskStatus.failure(getId(), "Canceled");
+ } else {
+ return TaskStatus.success(getId());
+ }
}
@Override
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
index 70122b6ab9e6..58b73e3d6eb1 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java
@@ -339,10 +339,9 @@ public boolean waitForCleanupToFinish()
runner.stop();
Assert.assertEquals(TaskState.FAILED, statusHolder.get().getStatusCode());
- Assert.assertEquals(
- "Canceled as task execution process stopped",
- statusHolder.get().getErrorMsg()
- );
+
+ // Do not verify the failure error message as there is a race condition
+ // where the error message may either originate from NoopTask or the runner
}
private static class RestorableTask extends AbstractTask