From 7674ffeb89c90775b9aeb8caea2f13fa6826b80c Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Wed, 5 Apr 2023 22:56:16 -0400 Subject: [PATCH 1/8] return task status reported by peon --- .../druid/k8s/overlord/K8sOverlordModule.java | 2 + .../k8s/overlord/KubernetesTaskRunner.java | 57 +++-- .../overlord/KubernetesTaskRunnerFactory.java | 11 +- .../KubernetesTaskRunnerFactoryTest.java | 22 +- .../overlord/KubernetesTaskRunnerTest.java | 221 +++++++++++++++--- .../druid/storage/azure/AzureTaskLogs.java | 19 ++ .../storage/azure/AzureTaskLogsTest.java | 95 ++++++++ .../druid/storage/google/GoogleTaskLogs.java | 20 ++ .../storage/google/GoogleTaskLogsTest.java | 50 ++++ .../storage/hdfs/tasklog/HdfsTaskLogs.java | 25 ++ .../common/tasklogs/HdfsTaskLogsTest.java | 17 ++ .../apache/druid/storage/s3/S3TaskLogs.java | 15 ++ .../druid/storage/s3/S3TaskLogsTest.java | 52 +++++ .../indexing/common/task/AbstractTask.java | 43 ++-- .../common/tasklogs/FileTaskLogs.java | 20 ++ .../common/task/AbstractTaskTest.java | 23 +- .../common/tasklogs/FileTaskLogsTest.java | 23 ++ .../apache/druid/tasklogs/NoopTaskLogs.java | 6 + .../apache/druid/tasklogs/TaskLogPusher.java | 4 + .../druid/tasklogs/TaskLogStreamer.java | 5 + 20 files changed, 649 insertions(+), 81 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java index c176330b15a3..b68b4d07a2fe 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/K8sOverlordModule.java @@ -37,6 +37,7 @@ import org.apache.druid.tasklogs.NoopTaskLogs; import org.apache.druid.tasklogs.TaskLogKiller; import org.apache.druid.tasklogs.TaskLogPusher; +import org.apache.druid.tasklogs.TaskLogStreamer; import org.apache.druid.tasklogs.TaskLogs; @LoadScope(roles = NodeRole.OVERLORD_JSON_NAME) @@ -78,6 +79,7 @@ private void configureTaskLogs(Binder binder) binder.bind(FileTaskLogs.class).in(LazySingleton.class); binder.bind(TaskLogPusher.class).to(TaskLogs.class); + binder.bind(TaskLogStreamer.class).to(TaskLogs.class); binder.bind(TaskLogKiller.class).to(TaskLogs.class); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 00f9c2dfd9c7..4c210cd08e6e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -19,6 +19,7 @@ package org.apache.druid.k8s.overlord; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -45,6 +46,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; @@ -57,8 +59,8 @@ import org.apache.druid.k8s.overlord.common.KubernetesResourceNotFoundException; import org.apache.druid.k8s.overlord.common.PeonPhase; import org.apache.druid.k8s.overlord.common.TaskAdapter; -import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.tasklogs.TaskLogStreamer; +import org.apache.druid.tasklogs.TaskLogs; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; @@ -73,6 +75,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; @@ -100,28 +103,31 @@ public class KubernetesTaskRunner implements TaskLogStreamer, TaskRunner protected final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); protected final TaskAdapter adapter; + protected final KubernetesPeonClient client; + private final ObjectMapper mapper; private final KubernetesTaskRunnerConfig k8sConfig; private final TaskQueueConfig taskQueueConfig; - private final TaskLogPusher taskLogPusher; + private final TaskLogs taskLogs; private final ListeningExecutorService exec; - private final KubernetesPeonClient client; private final HttpClient httpClient; public KubernetesTaskRunner( + ObjectMapper mapper, TaskAdapter adapter, KubernetesTaskRunnerConfig k8sConfig, TaskQueueConfig taskQueueConfig, - TaskLogPusher taskLogPusher, + TaskLogs taskLogs, KubernetesPeonClient client, HttpClient httpClient ) { + this.mapper = mapper; this.adapter = adapter; this.k8sConfig = k8sConfig; this.taskQueueConfig = taskQueueConfig; - this.taskLogPusher = taskLogPusher; + this.taskLogs = taskLogs; this.client = client; this.httpClient = httpClient; this.cleanupExecutor = Executors.newScheduledThreadPool(1); @@ -177,20 +183,7 @@ public ListenableFuture run(Task task) completedPhase = monitorJob(k8sTaskId); } } - TaskStatus status; - if (PeonPhase.SUCCEEDED.equals(completedPhase.getPhase())) { - status = TaskStatus.success(task.getId()); - } else if (completedPhase.getJob() == null) { - status = TaskStatus.failure( - task.getId(), - "K8s Job for task disappeared before completion: " + k8sTaskId - ); - } else { - status = TaskStatus.failure( - task.getId(), - "Task failed: " + k8sTaskId - ); - } + TaskStatus status = getTaskStatus(k8sTaskId, completedPhase); if (completedPhase.getJobDuration().isPresent()) { status = status.withDuration(completedPhase.getJobDuration().get()); } @@ -209,7 +202,7 @@ public ListenableFuture run(Task task) if (logStream.isPresent()) { FileUtils.copyInputStreamToFile(logStream.get(), log.toFile()); } - taskLogPusher.pushTaskLog(task.getId(), log.toFile()); + taskLogs.pushTaskLog(task.getId(), log.toFile()); } finally { Files.deleteIfExists(log); @@ -242,10 +235,32 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId) ); } + private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) throws IOException + { + if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) { + Optional maybeTaskStatusStream = taskLogs.streamTaskStatus(task.getOriginalTaskId()); + if (maybeTaskStatusStream.isPresent()) { + return mapper.readValue(maybeTaskStatusStream.get(), TaskStatus.class); + } + return TaskStatus.failure( + task.getOriginalTaskId(), + StringUtils.format("Task [%s] failed: status file not found", task.getOriginalTaskId()) + ); + } else if (Objects.isNull(jobResponse.getJob())) { + return TaskStatus.failure( + task.getOriginalTaskId(), + StringUtils.format("Task [%s] failed kubernetes job disappeared before completion", task.getOriginalTaskId()) + ); + } + return TaskStatus.failure( + task.getOriginalTaskId(), + StringUtils.format("Task [%s] failed", task.getOriginalTaskId()) + ); + } + @Override public void updateStatus(Task task, TaskStatus status) { - log.info("Updating task: %s with status %s", task.getId(), status); TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), status); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index 4b9873499c1f..fdee5702c572 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -41,7 +41,7 @@ import org.apache.druid.k8s.overlord.common.TaskAdapter; import org.apache.druid.server.DruidNode; import org.apache.druid.server.log.StartupLoggingConfig; -import org.apache.druid.tasklogs.TaskLogPusher; +import org.apache.druid.tasklogs.TaskLogs; import java.util.Locale; import java.util.Properties; @@ -54,7 +54,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory future = spyRunner.run(task); - future.get(); + TaskStatus actualTaskStatus = future.get(); + Assert.assertEquals(taskStatus, actualTaskStatus); // we should never launch the job here, one exists verify(peonClient, never()).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class)); verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId)); - verify(spyRunner, times(1)).updateStatus(eq(task), eq(TaskStatus.success(task.getId()))); + verify(spyRunner, times(1)).updateStatus(eq(task), eq(taskStatus)); } @Test @@ -208,22 +217,158 @@ public void testJobNeedsToLaunchInK8s() throws Exception job, PeonPhase.SUCCEEDED )); + + TaskStatus taskStatus = TaskStatus.success(task.getId()); + when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream( + jsonMapper.writeValueAsString(taskStatus), + StandardCharsets.UTF_8)) + ); + when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent()); when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + jsonMapper, adapter, kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, null ); KubernetesTaskRunner spyRunner = spy(taskRunner); + ListenableFuture future = spyRunner.run(task); + TaskStatus actualTaskStatus = future.get(); + Assert.assertEquals(taskStatus, actualTaskStatus); + // we should never launch the job here, one exists + verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class)); + verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId)); + verify(spyRunner, times(1)).updateStatus(eq(task), eq(taskStatus)); + } + + @Test + public void test_run_withoutStatusFile_returnsFailedTask() throws Exception + { + Task task = makeTask(); + K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); + + Job job = mock(Job.class); + ObjectMeta jobMetadata = mock(ObjectMeta.class); + when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId()); + JobStatus status = mock(JobStatus.class); + when(status.getActive()).thenReturn(1).thenReturn(null); + when(job.getStatus()).thenReturn(status); + when(job.getMetadata()).thenReturn(jobMetadata); + + Pod peonPod = mock(Pod.class); + ObjectMeta metadata = mock(ObjectMeta.class); + when(metadata.getName()).thenReturn("peonPodName"); + when(peonPod.getMetadata()).thenReturn(metadata); + PodStatus podStatus = mock(PodStatus.class); + when(podStatus.getPodIP()).thenReturn("SomeIP"); + when(peonPod.getStatus()).thenReturn(podStatus); + + K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); + when(adapter.fromTask(eq(task))).thenReturn(job); + + DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); + + when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null)); + when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); + when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); + when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse( + job, + PeonPhase.SUCCEEDED + )); + + when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent()); + + when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent()); + when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); + + KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + jsonMapper, + adapter, + kubernetesTaskRunnerConfig, + taskQueueConfig, + taskLogs, + peonClient, + null + ); + KubernetesTaskRunner spyRunner = spy(taskRunner); ListenableFuture future = spyRunner.run(task); - future.get(); + TaskStatus actualTaskStatus = future.get(); + Assert.assertTrue(actualTaskStatus.isFailure()); + Assert.assertEquals( + StringUtils.format("Task [%s] failed: status file not found", task.getId()), + actualTaskStatus.getErrorMsg() + ); + + // we should never launch the job here, one exists + verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class)); + verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId)); + verify(spyRunner, times(1)).updateStatus(eq(task), eq(actualTaskStatus)); + } + + @Test + public void test_run_withFailedJob_returnsFailedTask() throws Exception + { + Task task = makeTask(); + K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); + + Job job = mock(Job.class); + ObjectMeta jobMetadata = mock(ObjectMeta.class); + when(jobMetadata.getName()).thenReturn(k8sTaskId.getK8sTaskId()); + JobStatus status = mock(JobStatus.class); + when(status.getActive()).thenReturn(1).thenReturn(null); + when(job.getStatus()).thenReturn(status); + when(job.getMetadata()).thenReturn(jobMetadata); + + Pod peonPod = mock(Pod.class); + ObjectMeta metadata = mock(ObjectMeta.class); + when(metadata.getName()).thenReturn("peonPodName"); + when(peonPod.getMetadata()).thenReturn(metadata); + PodStatus podStatus = mock(PodStatus.class); + when(podStatus.getPodIP()).thenReturn("SomeIP"); + when(peonPod.getStatus()).thenReturn(podStatus); + + K8sTaskAdapter adapter = mock(K8sTaskAdapter.class); + when(adapter.fromTask(eq(task))).thenReturn(job); + + DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); + + when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.fromNullable(null)); + when(peonClient.launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class))).thenReturn(peonPod); + when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); + when(peonClient.waitForJobCompletion(eq(k8sTaskId), anyLong(), isA(TimeUnit.class))).thenReturn(new JobResponse( + job, + PeonPhase.FAILED + )); + + when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent()); + when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); + + KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + jsonMapper, + adapter, + kubernetesTaskRunnerConfig, + taskQueueConfig, + taskLogs, + peonClient, + null + ); + KubernetesTaskRunner spyRunner = spy(taskRunner); + + ListenableFuture future = spyRunner.run(task); + TaskStatus actualTaskStatus = future.get(); + Assert.assertTrue(actualTaskStatus.isFailure()); + Assert.assertEquals( + StringUtils.format("Task [%s] failed", task.getId()), + actualTaskStatus.getErrorMsg() + ); + // we should never launch the job here, one exists verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class)); verify(peonClient, times(1)).cleanUpJob(eq(k8sTaskId)); @@ -233,7 +378,7 @@ public void testJobNeedsToLaunchInK8s() throws Exception DruidK8sConstants.TLS_PORT, druidNode.isEnableTlsPort() ); - verify(spyRunner, times(1)).updateStatus(eq(task), eq(TaskStatus.success(task.getId(), expectedTaskLocation))); + verify(spyRunner, times(1)).updateStatus(eq(task), eq(actualTaskStatus)); } @Test @@ -274,14 +419,22 @@ public void testTheK8sRestartState() throws Exception job, PeonPhase.SUCCEEDED )); + + TaskStatus taskStatus = TaskStatus.success(task.getId()); + when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream( + jsonMapper.writeValueAsString(taskStatus), + StandardCharsets.UTF_8)) + ); + when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent()); when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + jsonMapper, adapter, kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, null ); @@ -337,14 +490,22 @@ public void testTheK8sRestartStateAndHandleJobsThatAlreadyCompletedWhileDown() t job, PeonPhase.SUCCEEDED )); + + TaskStatus taskStatus = TaskStatus.success(task.getId()); + when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.of(IOUtils.toInputStream( + jsonMapper.writeValueAsString(taskStatus), + StandardCharsets.UTF_8)) + ); + when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent()); when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + jsonMapper, adapter, kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, null ); @@ -411,10 +572,11 @@ public void testStreamTaskReports() throws Exception ); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( - adapter, + jsonMapper, + null, kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, httpClient ); @@ -432,10 +594,11 @@ public void testStreamTaskReports_whereJobDoesNotExist_returnsEmptyOptional() th Task task = makeTask(); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + jsonMapper, mock(K8sTaskAdapter.class), kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, mock(DruidKubernetesPeonClient.class), mock(HttpClient.class) ); @@ -484,10 +647,11 @@ public void testStreamTaskReports_withoutEmptyLocation_returnsEmptyOptional() th when(future.get()).thenThrow(InterruptedException.class); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( - adapter, + jsonMapper, + null, kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, httpClient ); @@ -539,10 +703,11 @@ public void testStreamTaskReports_getInputStreamThrowsInterruptedException_throw when(future.get()).thenThrow(InterruptedException.class); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( - adapter, + jsonMapper, + null, kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, httpClient ); @@ -594,10 +759,11 @@ public void testStreamTaskReports_getInputStreamThrowsExecutionException_throwsR when(future.get()).thenThrow(ExecutionException.class); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( - adapter, + jsonMapper, + null, kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, httpClient ); @@ -619,10 +785,11 @@ public void testMakingCodeCoverageHappy() when(peonClient.getMainJobPod(any())).thenReturn(null).thenReturn(pod); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + jsonMapper, mock(K8sTaskAdapter.class), kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, null ); @@ -647,10 +814,11 @@ public void testMaxQueueSizeIsEnforced() Period.millis(1) ); assertThrows(IllegalArgumentException.class, () -> new KubernetesTaskRunner( + jsonMapper, mock(K8sTaskAdapter.class), kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, mock(DruidKubernetesPeonClient.class), null )); @@ -736,18 +904,19 @@ public void testK8sJobManualShutdown() throws Exception when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); KubernetesTaskRunner taskRunner = new KubernetesTaskRunner( + jsonMapper, adapter, kubernetesTaskRunnerConfig, taskQueueConfig, - taskLogPusher, + taskLogs, peonClient, null ); KubernetesTaskRunner spyRunner = spy(taskRunner); ListenableFuture future = spyRunner.run(task); - TaskStatus taskStatus = future.get(); - Assert.assertEquals(TaskState.FAILED, taskStatus.getStatusCode()); - Assert.assertEquals("K8s Job for task disappeared before completion: [ k8sTaskId, k8staskid]", taskStatus.getErrorMsg()); + TaskStatus taskStatusResponse = future.get(); + Assert.assertEquals(TaskState.FAILED, taskStatusResponse.getStatusCode()); + Assert.assertEquals("Task [k8sTaskId] failed kubernetes job disappeared before completion", taskStatusResponse.getErrorMsg()); } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java index 678a43e5dbe4..9bfda5ab349f 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureTaskLogs.java @@ -82,6 +82,14 @@ public void pushTaskReports(String taskid, File reportFile) pushTaskFile(reportFile, taskKey); } + @Override + public void pushTaskStatus(String taskid, File statusFile) + { + final String taskKey = getTaskStatusKey(taskid); + log.info("Pushing task status %s to: %s", statusFile, taskKey); + pushTaskFile(statusFile, taskKey); + } + private void pushTaskFile(final File logFile, String taskKey) { try { @@ -110,6 +118,12 @@ public Optional streamTaskReports(String taskid) throws IOException return streamTaskFile(taskid, 0, getTaskReportsKey(taskid)); } + @Override + public Optional streamTaskStatus(String taskid) throws IOException + { + return streamTaskFile(taskid, 0, getTaskStatusKey(taskid)); + } + private Optional streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException { @@ -154,6 +168,11 @@ private String getTaskReportsKey(String taskid) return StringUtils.format("%s/%s/report.json", config.getPrefix(), taskid); } + private String getTaskStatusKey(String taskid) + { + return StringUtils.format("%s/%s/status.json", config.getPrefix(), taskid); + } + @Override public void killAll() throws IOException { diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java index 5313c1d5f533..297545e4cad0 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureTaskLogsTest.java @@ -155,6 +155,28 @@ public void test_PushTaskReports_uploadsBlob() throws Exception } } + @Test + public void test_PushTaskStatus_uploadsBlob() throws Exception + { + final File tmpDir = FileUtils.createTempDir(); + + try { + final File logFile = new File(tmpDir, "status.json"); + + azureStorage.uploadBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/status.json"); + EasyMock.expectLastCall(); + + replayAll(); + + azureTaskLogs.pushTaskStatus(TASK_ID, logFile); + + verifyAll(); + } + finally { + FileUtils.deleteDirectory(tmpDir); + } + } + @Test(expected = RuntimeException.class) public void test_PushTaskReports_exception_rethrowsException() throws Exception { @@ -323,6 +345,79 @@ public void test_streamTaskReports_exceptionWhenCheckingBlobExistence_throwsExce verifyAll(); } + @Test + public void test_streamTaskStatus_blobExists_succeeds() throws Exception + { + final String taskStatus = "{}"; + + final String blobPath = PREFIX + "/" + TASK_ID + "/status.json"; + EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true); + EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length()); + EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andReturn( + new ByteArrayInputStream(taskStatus.getBytes(StandardCharsets.UTF_8))); + + + replayAll(); + + final Optional stream = azureTaskLogs.streamTaskStatus(TASK_ID); + + final StringWriter writer = new StringWriter(); + IOUtils.copy(stream.get(), writer, "UTF-8"); + Assert.assertEquals(writer.toString(), taskStatus); + + verifyAll(); + } + + @Test + public void test_streamTaskStatus_blobDoesNotExist_returnsAbsent() throws Exception + { + final String blobPath = PREFIX + "/" + TASK_ID_NOT_FOUND + "/status.json"; + EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(false); + + replayAll(); + + final Optional stream = azureTaskLogs.streamTaskStatus(TASK_ID_NOT_FOUND); + + + Assert.assertFalse(stream.isPresent()); + + verifyAll(); + } + + @Test(expected = IOException.class) + public void test_streamTaskStatus_exceptionWhenGettingStream_throwsException() throws Exception + { + final String taskStatus = "{}"; + + final String blobPath = PREFIX + "/" + TASK_ID + "/status.json"; + EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andReturn(true); + EasyMock.expect(azureStorage.getBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length()); + EasyMock.expect(azureStorage.getBlobInputStream(CONTAINER, blobPath)).andThrow( + new URISyntaxException("", "")); + + + replayAll(); + + final Optional stream = azureTaskLogs.streamTaskStatus(TASK_ID); + + final StringWriter writer = new StringWriter(); + IOUtils.copy(stream.get(), writer, "UTF-8"); + verifyAll(); + } + + @Test(expected = IOException.class) + public void test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsException() throws Exception + { + final String blobPath = PREFIX + "/" + TASK_ID + "/status.json"; + EasyMock.expect(azureStorage.getBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", "")); + + replayAll(); + + azureTaskLogs.streamTaskStatus(TASK_ID); + + verifyAll(); + } + @Test public void test_killAll_noException_deletesAllTaskLogs() throws Exception { diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java index fcdee4039b13..ae4024172a6f 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleTaskLogs.java @@ -74,6 +74,14 @@ public void pushTaskReports(String taskid, File reportFile) throws IOException pushTaskFile(reportFile, taskKey); } + @Override + public void pushTaskStatus(String taskid, File statusFile) throws IOException + { + final String taskKey = getTaskStatusKey(taskid); + LOG.info("Pushing task status %s to: %s", statusFile, taskKey); + pushTaskFile(statusFile, taskKey); + } + private void pushTaskFile(final File logFile, final String taskKey) throws IOException { try (final InputStream fileStream = Files.newInputStream(logFile.toPath())) { @@ -115,6 +123,13 @@ public Optional streamTaskReports(String taskid) throws IOException return streamTaskFile(taskid, 0, taskKey); } + @Override + public Optional streamTaskStatus(String taskid) throws IOException + { + final String taskKey = getTaskStatusKey(taskid); + return streamTaskFile(taskid, 0, taskKey); + } + private Optional streamTaskFile(final String taskid, final long offset, String taskKey) throws IOException { @@ -156,6 +171,11 @@ private String getTaskReportKey(String taskid) return config.getPrefix() + "/" + taskid.replace(':', '_') + ".report.json"; } + private String getTaskStatusKey(String taskid) + { + return config.getPrefix() + "/" + taskid.replace(':', '_') + ".status.json"; + } + @Override public void killAll() throws IOException { diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java index d8b7c61cfcf7..9bfe2706f803 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/storage/google/GoogleTaskLogsTest.java @@ -109,6 +109,35 @@ public void testPushTaskLog() throws Exception } } + @Test + public void testPushTaskStatus() throws Exception + { + final File tmpDir = FileUtils.createTempDir(); + + try { + final File statusFile = new File(tmpDir, "status.json"); + BufferedWriter output = Files.newBufferedWriter(statusFile.toPath(), StandardCharsets.UTF_8); + output.write("{}"); + output.close(); + + storage.insert( + EasyMock.eq(BUCKET), + EasyMock.eq(PREFIX + "/" + TASKID), + EasyMock.anyObject(InputStreamContent.class) + ); + EasyMock.expectLastCall(); + + replayAll(); + + googleTaskLogs.pushTaskLog(TASKID, statusFile); + + verifyAll(); + } + finally { + FileUtils.deleteDirectory(tmpDir); + } + } + @Test public void testStreamTaskLogWithoutOffset() throws Exception { @@ -177,6 +206,27 @@ public void testStreamTaskLogWithNegative() throws Exception verifyAll(); } + @Test + public void testStreamTaskStatus() throws Exception + { + final String taskStatus = "{}"; + + final String logPath = PREFIX + "/" + TASKID + ".status.json"; + EasyMock.expect(storage.exists(BUCKET, logPath)).andReturn(true); + EasyMock.expect(storage.size(BUCKET, logPath)).andReturn((long) taskStatus.length()); + EasyMock.expect(storage.get(BUCKET, logPath, 0)).andReturn(new ByteArrayInputStream(StringUtils.toUtf8(taskStatus))); + + replayAll(); + + final Optional stream = googleTaskLogs.streamTaskStatus(TASKID); + + final StringWriter writer = new StringWriter(); + IOUtils.copy(stream.get(), writer, "UTF-8"); + Assert.assertEquals(writer.toString(), taskStatus); + + verifyAll(); + } + @Test public void test_killAll_noException_deletesAllTaskLogs() throws IOException { diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java index 6026a3e3cb70..f89b0fafde95 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/storage/hdfs/tasklog/HdfsTaskLogs.java @@ -75,6 +75,15 @@ public void pushTaskReports(String taskId, File reportFile) throws IOException log.info("Wrote task reports to: %s", path); } + @Override + public void pushTaskStatus(String taskId, File statusFile) throws IOException + { + final Path path = getTaskStatusFileFromId(taskId); + log.info("Writing task status to: %s", path); + pushTaskFile(path, statusFile); + log.info("Wrote task status to: %s", path); + } + private void pushTaskFile(Path path, File logFile) throws IOException { final FileSystem fs = path.getFileSystem(hadoopConfig); @@ -100,6 +109,13 @@ public Optional streamTaskReports(String taskId) throws IOException return streamTaskFile(path, 0); } + @Override + public Optional streamTaskStatus(String taskId) throws IOException + { + final Path path = getTaskStatusFileFromId(taskId); + return streamTaskFile(path, 0); + } + private Optional streamTaskFile(final Path path, final long offset) throws IOException { final FileSystem fs = path.getFileSystem(hadoopConfig); @@ -139,6 +155,15 @@ private Path getTaskReportsFileFromId(String taskId) return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".reports.json")); } + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + private Path getTaskStatusFileFromId(String taskId) + { + return new Path(mergePaths(config.getDirectory(), taskId.replace(':', '_') + ".status.json")); + } + // some hadoop version Path.mergePaths does not exist private static String mergePaths(String path1, String path2) { diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java index 9724cba69446..9d0273d1a95f 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/indexing/common/tasklogs/HdfsTaskLogsTest.java @@ -78,6 +78,23 @@ public void testOverwrite() throws Exception Assert.assertEquals("blah blah", readLog(taskLogs, "foo", 0)); } + @Test + public void test_taskStatus() throws Exception + { + final File tmpDir = tempFolder.newFolder(); + final File logDir = new File(tmpDir, "logs"); + final File statusFile = new File(tmpDir, "status.json"); + final TaskLogs taskLogs = new HdfsTaskLogs(new HdfsTaskLogsConfig(logDir.toString()), new Configuration()); + + + Files.write("{}", statusFile, StandardCharsets.UTF_8); + taskLogs.pushTaskStatus("id", statusFile); + Assert.assertEquals( + "{}", + StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus("id").get())) + ); + } + @Test public void testKill() throws Exception { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java index 27544e1eecf7..6868ba3c99ac 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java @@ -77,6 +77,13 @@ public Optional streamTaskReports(String taskid) throws IOException return streamTaskFile(0, taskKey); } + @Override + public Optional streamTaskStatus(String taskid) throws IOException + { + final String taskKey = getTaskLogKey(taskid, "status.json"); + return streamTaskFile(0, taskKey); + } + private Optional streamTaskFile(final long offset, String taskKey) throws IOException { try { @@ -141,6 +148,14 @@ public void pushTaskReports(String taskid, File reportFile) throws IOException pushTaskFile(reportFile, taskKey); } + @Override + public void pushTaskStatus(String taskid, File statusFile) throws IOException + { + final String taskKey = getTaskLogKey(taskid, "status.json"); + log.info("Pushing task status %s to: %s", statusFile, taskKey); + pushTaskFile(statusFile, taskKey); + } + private void pushTaskFile(final File logFile, String taskKey) throws IOException { try { diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java index 7b82ca102be3..13a6e07cc59d 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java @@ -76,6 +76,7 @@ public class S3TaskLogsTest extends EasyMockSupport private static final Exception NON_RECOVERABLE_EXCEPTION = new SdkClientException(new NullPointerException()); private static final String LOG_CONTENTS = "log_contents"; private static final String REPORT_CONTENTS = "report_contents"; + private static final String STATUS_CONTENTS = "status_contents"; @Mock private CurrentTimeMillisSupplier timeSupplier; @@ -115,6 +116,31 @@ public void testTaskLogsPushWithAclEnabled() throws Exception ); Assert.assertEquals("The Grant should have full control permission", Permission.FullControl, grant.getPermission()); } + + @Test + public void test_pushTaskStatus() throws IOException + { + EasyMock.expect(s3Client.putObject(EasyMock.anyObject(PutObjectRequest.class))) + .andReturn(new PutObjectResult()) + .once(); + + EasyMock.replay(s3Client); + + S3TaskLogsConfig config = new S3TaskLogsConfig(); + config.setS3Bucket(TEST_BUCKET); + config.setDisableAcl(true); + + CurrentTimeMillisSupplier timeSupplier = new CurrentTimeMillisSupplier(); + S3InputDataConfig inputDataConfig = new S3InputDataConfig(); + S3TaskLogs s3TaskLogs = new S3TaskLogs(s3Client, config, inputDataConfig, timeSupplier); + + String taskId = "index_test-datasource_2019-06-18T13:30:28.887Z"; + File logFile = tempFolder.newFile("status.json"); + + s3TaskLogs.pushTaskLog(taskId, logFile); + + EasyMock.verify(s3Client); + } @Test public void test_killAll_noException_deletesAllTaskLogs() throws IOException @@ -434,6 +460,32 @@ public void test_report_fetch() throws IOException Assert.assertEquals(REPORT_CONTENTS, report); } + @Test + public void test_status_fetch() throws IOException + { + EasyMock.reset(s3Client); + String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json"; + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentLength(STATUS_CONTENTS.length()); + EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata); + S3Object s3Object = new S3Object(); + s3Object.setObjectContent(new ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8))); + GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath); + getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1); + getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag()); + EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object); + EasyMock.replay(s3Client); + + S3TaskLogs s3TaskLogs = getS3TaskLogs(); + + Optional inputStreamOptional = s3TaskLogs.streamTaskStatus(KEY_1); + String report = new BufferedReader( + new InputStreamReader(inputStreamOptional.get(), StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + + Assert.assertEquals(STATUS_CONTENTS, report); + } @Nonnull private S3TaskLogs getS3TaskLogs() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index d9f05cb5016a..7c46451271e1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -94,7 +94,9 @@ public static IngestionMode fromString(String name) private final String dataSource; private final Map context; + private File reportsFile; + private File statusFile; private final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); @@ -147,6 +149,7 @@ public String setup(TaskToolbox toolbox) throws Exception File attemptDir = Paths.get(taskDir.getAbsolutePath(), "attempt", toolbox.getAttemptId()).toFile(); FileUtils.mkdirp(attemptDir); reportsFile = new File(attemptDir, "report.json"); + statusFile = new File(attemptDir, "status.json"); InetAddress hostName = InetAddress.getLocalHost(); DruidNode node = toolbox.getTaskExecutorNode(); toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.create( @@ -185,23 +188,31 @@ public final TaskStatus run(TaskToolbox taskToolbox) throws Exception public void cleanUp(TaskToolbox toolbox, boolean failure) throws Exception { - if (toolbox.getConfig().isEncapsulatedTask()) { - // report back to the overlord - UpdateStatusAction status = new UpdateStatusAction("successful"); - if (failure) { - status = new UpdateStatusAction("failure"); - } - toolbox.getTaskActionClient().submit(status); - toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.unknown())); - - if (reportsFile != null && reportsFile.exists()) { - toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile); - log.debug("Pushed task reports"); - } else { - log.debug("No task reports file exists to push"); - } - } else { + if (!toolbox.getConfig().isEncapsulatedTask()) { log.debug("Not pushing task logs and reports from task."); + return; + } + + // report back to the overlord + UpdateStatusAction status = new UpdateStatusAction("successful"); + if (failure) { + status = new UpdateStatusAction("failure"); + } + toolbox.getTaskActionClient().submit(status); + toolbox.getTaskActionClient().submit(new UpdateLocationAction(TaskLocation.unknown())); + + if (reportsFile != null && reportsFile.exists()) { + toolbox.getTaskLogPusher().pushTaskReports(id, reportsFile); + log.debug("Pushed task reports"); + } else { + log.debug("No task reports file exists to push"); + } + + if (statusFile != null && statusFile.exists()) { + toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile); + log.debug("Pushed task status"); + } else { + log.debug("No task status file exists to push"); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java index 5d546da49429..a8e5b1724563 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogs.java @@ -65,6 +65,15 @@ public void pushTaskReports(String taskid, File reportFile) throws IOException log.info("Wrote task report to: %s", outputFile); } + @Override + public void pushTaskStatus(String taskid, File statusFile) throws IOException + { + FileUtils.mkdirp(config.getDirectory()); + final File outputFile = fileForTask(taskid, statusFile.getName()); + Files.copy(statusFile, outputFile); + log.info("Wrote task status to: %s", outputFile); + } + @Override public Optional streamTaskLog(final String taskid, final long offset) throws IOException { @@ -87,6 +96,17 @@ public Optional streamTaskReports(final String taskid) throws IOExc } } + @Override + public Optional streamTaskStatus(final String taskid) throws IOException + { + final File file = fileForTask(taskid, "status.json"); + if (file.exists()) { + return Optional.of(LogUtils.streamFile(file, 0)); + } else { + return Optional.absent(); + } + } + private File fileForTask(final String taskid, String filename) { return new File(config.getDirectory(), StringUtils.format("%s.%s", taskid, filename)); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index aa5adb8d495f..9c08d3d54249 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -66,8 +66,24 @@ public void testSetupAndCleanupIsCalledWtihParameter() throws Exception TaskLogPusher pusher = mock(TaskLogPusher.class); when(toolbox.getTaskLogPusher()).thenReturn(pusher); - TaskConfig config = mock(TaskConfig.class); - when(config.isEncapsulatedTask()).thenReturn(true); + TaskConfig config = new TaskConfig( + null, + null, + null, + null, + null, + false, + null, + null, + null, + false, + false, + null, + null, + true, + null + ); + when(toolbox.getConfig()).thenReturn(config); TaskStorageDirTracker dirTracker = new TaskStorageDirTracker( ImmutableList.of(temporaryFolder.newFolder().getAbsolutePath()) @@ -92,7 +108,9 @@ public String setup(TaskToolbox toolbox) throws Exception "attempt", toolbox.getAttemptId() ).toFile(); File reportsDir = new File(attemptDir, "report.json"); + File statusDir = new File(attemptDir, "status.json"); FileUtils.write(reportsDir, "foo", StandardCharsets.UTF_8); + FileUtils.write(statusDir, "{}", StandardCharsets.UTF_8); return result; } }; @@ -101,6 +119,7 @@ public String setup(TaskToolbox toolbox) throws Exception // call it 3 times, once to update location in setup, then one for status and location in cleanup Mockito.verify(taskActionClient, times(3)).submit(any()); verify(pusher, times(1)).pushTaskReports(eq("myID"), any()); + verify(pusher, times(1)).pushTaskStatus(eq("myID"), any()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java index 22c13e34644b..b88b31443e06 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/tasklogs/FileTaskLogsTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.config.FileTaskLogsConfig; import org.apache.druid.java.util.common.FileUtils; @@ -95,6 +96,28 @@ public void testSimpleReport() throws Exception ); } + @Test + public void testSimpleStatus() throws Exception + { + final ObjectMapper mapper = TestHelper.makeJsonMapper(); + final File tmpDir = temporaryFolder.newFolder(); + final File logDir = new File(tmpDir, "druid/myTask"); + final File statusFile = new File(tmpDir, "status.json"); + + final String taskId = "myTask"; + final TaskStatus taskStatus = TaskStatus.success(taskId); + final String taskStatusString = mapper.writeValueAsString(taskStatus); + Files.write(taskStatusString, statusFile, StandardCharsets.UTF_8); + + final TaskLogs taskLogs = new FileTaskLogs(new FileTaskLogsConfig(logDir)); + taskLogs.pushTaskStatus(taskId, statusFile); + + Assert.assertEquals( + taskStatusString, + StringUtils.fromUtf8(ByteStreams.toByteArray(taskLogs.streamTaskStatus(taskId).get())) + ); + } + @Test public void testPushTaskLogDirCreationFails() throws Exception { diff --git a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java index 13962d7c4d3b..287b2f6fcc34 100644 --- a/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java +++ b/processing/src/main/java/org/apache/druid/tasklogs/NoopTaskLogs.java @@ -47,6 +47,12 @@ public void pushTaskReports(String taskid, File reportFile) log.info("Not pushing reports for task: %s", taskid); } + @Override + public void pushTaskStatus(String taskid, File statusFile) + { + log.info("Not pushing status for task: %s", taskid); + } + @Override public void killAll() { diff --git a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java index 0a06237e7f47..a1cb317de3b3 100644 --- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java +++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogPusher.java @@ -35,4 +35,8 @@ public interface TaskLogPusher default void pushTaskReports(String taskid, File reportFile) throws IOException { } + + default void pushTaskStatus(String taskid, File reportFile) throws IOException + { + } } diff --git a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java index 04add17ea51e..1ec6a94df179 100644 --- a/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java +++ b/processing/src/main/java/org/apache/druid/tasklogs/TaskLogStreamer.java @@ -44,4 +44,9 @@ default Optional streamTaskReports(final String taskid) throws IOEx { return Optional.absent(); } + + default Optional streamTaskStatus(final String taskid) throws IOException + { + return Optional.absent(); + } } From 2eb9ebe8e4fc33445307bcfa2d91c550ba82f8ca Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Thu, 13 Apr 2023 22:53:51 -0400 Subject: [PATCH 2/8] Write TaskStatus to file in AbstractTask.cleanUp --- .../indexing/common/task/AbstractTask.java | 38 +++++++++---------- .../common/task/AbstractTaskTest.java | 15 +++++++- .../task/batch/parallel/TaskMonitorTest.java | 2 +- .../SingleTaskBackgroundRunnerTest.java | 14 +++---- .../indexing/overlord/TaskQueueTest.java | 12 +++--- 5 files changed, 47 insertions(+), 34 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java index 7c46451271e1..5ea8f8eef2cb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java @@ -50,6 +50,7 @@ import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; import java.util.List; @@ -163,30 +164,27 @@ public String setup(TaskToolbox toolbox) throws Exception @Override public final TaskStatus run(TaskToolbox taskToolbox) throws Exception { - boolean failure = false; + TaskStatus taskStatus = TaskStatus.running(getId()); try { String errorMessage = setup(taskToolbox); if (org.apache.commons.lang3.StringUtils.isNotBlank(errorMessage)) { return TaskStatus.failure(getId(), errorMessage); } - TaskStatus taskStatus = runTask(taskToolbox); - if (taskStatus.isFailure()) { - failure = true; - } + taskStatus = runTask(taskToolbox); return taskStatus; } catch (Exception e) { - failure = true; + taskStatus = TaskStatus.failure(getId(), e.toString()); throw e; } finally { - cleanUp(taskToolbox, failure); + cleanUp(taskToolbox, taskStatus); } } public abstract TaskStatus runTask(TaskToolbox taskToolbox) throws Exception; - public void cleanUp(TaskToolbox toolbox, boolean failure) throws Exception + public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) throws Exception { if (!toolbox.getConfig().isEncapsulatedTask()) { log.debug("Not pushing task logs and reports from task."); @@ -195,7 +193,7 @@ public void cleanUp(TaskToolbox toolbox, boolean failure) throws Exception // report back to the overlord UpdateStatusAction status = new UpdateStatusAction("successful"); - if (failure) { + if (taskStatus.isFailure()) { status = new UpdateStatusAction("failure"); } toolbox.getTaskActionClient().submit(status); @@ -208,8 +206,10 @@ public void cleanUp(TaskToolbox toolbox, boolean failure) throws Exception log.debug("No task reports file exists to push"); } - if (statusFile != null && statusFile.exists()) { + if (statusFile != null) { + toolbox.getJsonMapper().writeValue(statusFile, taskStatus); toolbox.getTaskLogPusher().pushTaskStatus(id, statusFile); + Files.deleteIfExists(statusFile.toPath()); log.debug("Pushed task status"); } else { log.debug("No task status file exists to push"); @@ -292,12 +292,12 @@ public boolean canRestore() public String toString() { return "AbstractTask{" + - "id='" + id + '\'' + - ", groupId='" + groupId + '\'' + - ", taskResource=" + taskResource + - ", dataSource='" + dataSource + '\'' + - ", context=" + context + - '}'; + "id='" + id + '\'' + + ", groupId='" + groupId + '\'' + + ", taskResource=" + taskResource + + ", dataSource='" + dataSource + '\'' + + ", context=" + context + + '}'; } public TaskStatus success() @@ -383,8 +383,8 @@ protected static IngestionMode computeCompactionIngestionMode(@Nullable Compacti protected static IngestionMode computeBatchIngestionMode(@Nullable BatchIOConfig ioConfig) { final boolean isAppendToExisting = ioConfig == null - ? BatchIOConfig.DEFAULT_APPEND_EXISTING - : ioConfig.isAppendToExisting(); + ? BatchIOConfig.DEFAULT_APPEND_EXISTING + : ioConfig.isAppendToExisting(); final boolean isDropExisting = ioConfig == null ? BatchIOConfig.DEFAULT_DROP_EXISTING : ioConfig.isDropExisting(); return computeIngestionMode(isAppendToExisting, isDropExisting); } @@ -399,7 +399,7 @@ private static IngestionMode computeIngestionMode(boolean isAppendToExisting, bo return IngestionMode.REPLACE_LEGACY; } throw new IAE("Cannot simultaneously replace and append to existing segments. " - + "Either dropExisting or appendToExisting should be set to false"); + + "Either dropExisting or appendToExisting should be set to false"); } public void emitMetric( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index 9c08d3d54249..18a66e5fa752 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -19,17 +19,20 @@ package org.apache.druid.indexing.common.task; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import org.apache.commons.io.FileUtils; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskStorageDirTracker; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.UpdateStatusAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.server.DruidNode; import org.apache.druid.tasklogs.TaskLogPusher; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -50,10 +53,17 @@ public class AbstractTaskTest { + private ObjectMapper objectMapper; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Before + public void setup() + { + objectMapper = new TestUtils().getTestObjectMapper(); + } + @Test public void testSetupAndCleanupIsCalledWtihParameter() throws Exception { @@ -85,6 +95,7 @@ public void testSetupAndCleanupIsCalledWtihParameter() throws Exception ); when(toolbox.getConfig()).thenReturn(config); + when(toolbox.getJsonMapper()).thenReturn(objectMapper); TaskStorageDirTracker dirTracker = new TaskStorageDirTracker( ImmutableList.of(temporaryFolder.newFolder().getAbsolutePath()) ); @@ -138,6 +149,7 @@ public void testWithNoEncapsulatedTask() throws Exception when(config.isEncapsulatedTask()).thenReturn(false); File folder = temporaryFolder.newFolder(); when(toolbox.getConfig()).thenReturn(config); + when(toolbox.getJsonMapper()).thenReturn(objectMapper); TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(ImmutableList.of(folder.getAbsolutePath())); when(toolbox.getDirTracker()).thenReturn(dirTracker); @@ -184,6 +196,7 @@ public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Except File folder = temporaryFolder.newFolder(); TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(ImmutableList.of(folder.getAbsolutePath())); when(toolbox.getConfig()).thenReturn(config); + when(toolbox.getJsonMapper()).thenReturn(objectMapper); when(toolbox.getDirTracker()).thenReturn(dirTracker); TaskActionClient taskActionClient = mock(TaskActionClient.class); @@ -193,7 +206,7 @@ public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Except AbstractTask task = new NoopTask("myID", null, null, 1, 0, null, null, null) { @Override - public TaskStatus runTask(TaskToolbox toolbox) + public TaskStatus runTask(TaskToolbox toolbox) { return TaskStatus.failure("myId", "failed"); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java index 510d0883d397..a09644ea6f5f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java @@ -244,7 +244,7 @@ public String setup(TaskToolbox toolbox) } @Override - public void cleanUp(TaskToolbox toolbox, boolean failure) + public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) { // do nothing } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 5a5999bda298..a8180e2b3d7e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -175,7 +175,7 @@ public String setup(TaskToolbox toolbox) } @Override - public void cleanUp(TaskToolbox toolbox, boolean failure) + public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) { // do nothing } @@ -193,10 +193,10 @@ public void testGetQueryRunner() throws ExecutionException, InterruptedException final QueryRunner queryRunner = Druids.newScanQueryBuilder() - .dataSource("foo") - .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) - .build() - .getRunner(runner); + .dataSource("foo") + .intervals(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)) + .build() + .getRunner(runner); Assert.assertThat(queryRunner, CoreMatchers.instanceOf(SetAndVerifyContextQueryRunner.class)); } @@ -274,7 +274,7 @@ public String setup(TaskToolbox toolbox) } @Override - public void cleanUp(TaskToolbox toolbox, boolean failure) + public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) { // do nothing } @@ -396,7 +396,7 @@ public String setup(TaskToolbox toolbox) } @Override - public void cleanUp(TaskToolbox toolbox, boolean failure) + public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) { // do nothing } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 616d2ca8bafe..6388cdd573cd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -412,10 +412,10 @@ public void testKilledTasksEmitRuntimeMetricWithHttpRemote() throws EntryExistsE TaskLocation.create("worker", 1, 2) ), workerHolder); while (!taskRunner.getRunningTasks() - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toList()) - .contains(task.getId())) { + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toList()) + .contains(task.getId())) { Thread.sleep(100); } taskQueue.shutdown(task.getId(), "shutdown"); @@ -435,7 +435,7 @@ private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List runningTask HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery druidNodeDiscovery = new HttpRemoteTaskRunnerTest.TestDruidNodeDiscovery(); DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) - .andReturn(druidNodeDiscovery); + .andReturn(druidNodeDiscovery); EasyMock.replay(druidNodeDiscoveryProvider); TaskStorage taskStorageMock = EasyMock.createStrictMock(TaskStorage.class); for (String taskId : runningTasks) { @@ -520,7 +520,7 @@ public String setup(TaskToolbox toolbox) } @Override - public void cleanUp(TaskToolbox toolbox, boolean failure) + public void cleanUp(TaskToolbox toolbox, TaskStatus taskStatus) { // do nothing } From cb756ea20e1cf0c916f34b25787e1ab46633bbcc Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Thu, 13 Apr 2023 23:00:26 -0400 Subject: [PATCH 3/8] Get TaskStatus from task log --- .../k8s/overlord/KubernetesTaskRunner.java | 29 ++++++++++--------- .../overlord/KubernetesTaskRunnerTest.java | 18 ++++++------ 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index 03d6fa7b3efc..a360dc09f061 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -32,6 +32,7 @@ import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.netty.util.SuppressForbidden; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -68,6 +69,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -238,25 +240,24 @@ JobResponse monitorJob(Pod peonPod, K8sTaskId k8sTaskId) private TaskStatus getTaskStatus(K8sTaskId task, JobResponse jobResponse) throws IOException { - if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) { - Optional maybeTaskStatusStream = taskLogs.streamTaskStatus(task.getOriginalTaskId()); - if (maybeTaskStatusStream.isPresent()) { - return mapper.readValue(maybeTaskStatusStream.get(), TaskStatus.class); - } + Optional maybeTaskStatusStream = taskLogs.streamTaskStatus(task.getOriginalTaskId()); + if (maybeTaskStatusStream.isPresent()) { + String taskStatus = IOUtils.toString(maybeTaskStatusStream.get(), StandardCharsets.UTF_8); + return mapper.readValue(taskStatus, TaskStatus.class); + } else if (PeonPhase.SUCCEEDED.equals(jobResponse.getPhase())) { + // fallback to behavior before the introduction of task status streaming for backwards compatibility + return TaskStatus.success(task.getOriginalTaskId()); + } else if (Objects.isNull(jobResponse.getJob())) { return TaskStatus.failure( task.getOriginalTaskId(), - StringUtils.format("Task [%s] failed: status file not found", task.getOriginalTaskId()) + StringUtils.format("Task [%s] failed kubernetes job disappeared before completion", task.getOriginalTaskId()) ); - } else if (Objects.isNull(jobResponse.getJob())) { + } else { return TaskStatus.failure( task.getOriginalTaskId(), - StringUtils.format("Task [%s] failed kubernetes job disappeared before completion", task.getOriginalTaskId()) + StringUtils.format("Task [%s] failed", task.getOriginalTaskId()) ); } - return TaskStatus.failure( - task.getOriginalTaskId(), - StringUtils.format("Task [%s] failed", task.getOriginalTaskId()) - ); } @Override @@ -523,8 +524,8 @@ public TaskLocation getLocation() } boolean tlsEnabled = Boolean.parseBoolean( mainPod.getMetadata() - .getAnnotations() - .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")); + .getAnnotations() + .getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")); return TaskLocation.create( mainPod.getStatus().getPodIP(), DruidK8sConstants.PORT, diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index eb25bd8f9002..c31a2a42f86e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -248,7 +248,7 @@ public void testJobNeedsToLaunchInK8s() throws Exception } @Test - public void test_run_withoutStatusFile_returnsFailedTask() throws Exception + public void test_run_withSuccessfulJobAndWithoutStatusFile_returnsSucessfulTask() throws Exception { Task task = makeTask(); K8sTaskId k8sTaskId = new K8sTaskId(task.getId()); @@ -300,11 +300,7 @@ public void test_run_withoutStatusFile_returnsFailedTask() throws Exception ListenableFuture future = spyRunner.run(task); TaskStatus actualTaskStatus = future.get(); - Assert.assertTrue(actualTaskStatus.isFailure()); - Assert.assertEquals( - StringUtils.format("Task [%s] failed: status file not found", task.getId()), - actualTaskStatus.getErrorMsg() - ); + Assert.assertTrue(actualTaskStatus.isSuccess()); // we should never launch the job here, one exists verify(peonClient, times(1)).launchJobAndWaitForStart(isA(Job.class), anyLong(), isA(TimeUnit.class)); @@ -347,6 +343,8 @@ public void test_run_withFailedJob_returnsFailedTask() throws Exception PeonPhase.FAILED )); + when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent()); + when(peonClient.getPeonLogs(eq(k8sTaskId))).thenReturn(Optional.absent()); when(peonClient.cleanUpJob(eq(k8sTaskId))).thenReturn(true); @@ -892,6 +890,8 @@ public void testK8sJobManualShutdown() throws Exception DruidKubernetesPeonClient peonClient = mock(DruidKubernetesPeonClient.class); + when(taskLogs.streamTaskStatus(eq(task.getId()))).thenReturn(Optional.absent()); + when(peonClient.jobExists(eq(k8sTaskId))).thenReturn(Optional.of(job)); when(peonClient.getMainJobPod(eq(k8sTaskId))).thenReturn(peonPod); @@ -931,9 +931,9 @@ private Task makeTask() null, null, ImmutableMap.of("druid.indexer.runner.javaOpts", "abc", - "druid.indexer.fork.property.druid.processing.buffer.sizeBytes", "2048", - "druid.peon.pod.cpu", "1", - "druid.peon.pod.memory", "2G" + "druid.indexer.fork.property.druid.processing.buffer.sizeBytes", "2048", + "druid.peon.pod.cpu", "1", + "druid.peon.pod.memory", "2G" ) ); } From 815999cc80ee2fc6f3b8ba7599eeded7141c7933 Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Fri, 14 Apr 2023 09:46:19 -0400 Subject: [PATCH 4/8] Fix merge conflicts in AbstractTaskTest --- .../druid/indexing/common/task/AbstractTaskTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java index 0bda3728c5e5..cf03134c4ab9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AbstractTaskTest.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common.task; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableList; import org.apache.commons.io.FileUtils; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; @@ -85,10 +84,6 @@ public void testSetupAndCleanupIsCalledWtihParameter() throws Exception when(config.getTaskDir(eq("myID"))).thenReturn(folder); when(toolbox.getConfig()).thenReturn(config); when(toolbox.getJsonMapper()).thenReturn(objectMapper); - TaskStorageDirTracker dirTracker = new TaskStorageDirTracker( - ImmutableList.of(temporaryFolder.newFolder().getAbsolutePath()) - ); - when(toolbox.getDirTracker()).thenReturn(dirTracker); TaskActionClient taskActionClient = mock(TaskActionClient.class); when(taskActionClient.submit(any())).thenReturn(TaskConfig.class); @@ -137,8 +132,6 @@ public void testWithNoEncapsulatedTask() throws Exception when(config.getTaskDir(eq("myID"))).thenReturn(folder); when(toolbox.getConfig()).thenReturn(config); when(toolbox.getJsonMapper()).thenReturn(objectMapper); - TaskStorageDirTracker dirTracker = new TaskStorageDirTracker(ImmutableList.of(folder.getAbsolutePath())); - when(toolbox.getDirTracker()).thenReturn(dirTracker); TaskActionClient taskActionClient = mock(TaskActionClient.class); when(taskActionClient.submit(any())).thenReturn(TaskConfig.class); @@ -184,7 +177,6 @@ public void testTaskFailureWithoutExceptionGetsReportedCorrectly() throws Except when(config.getTaskDir(eq("myID"))).thenReturn(folder); when(toolbox.getConfig()).thenReturn(config); when(toolbox.getJsonMapper()).thenReturn(objectMapper); - when(toolbox.getDirTracker()).thenReturn(dirTracker); TaskActionClient taskActionClient = mock(TaskActionClient.class); when(taskActionClient.submit(any())).thenReturn(TaskConfig.class); From 5eb6bede6f3fa8e1de987a7d05804bc1057c1ce8 Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Fri, 14 Apr 2023 14:45:46 -0400 Subject: [PATCH 5/8] Add unit tests for TaskLogPusher, TaskLogStreamer, NoopTaskLogs to satisfy code coverage --- .../druid/tasklogs/NoopTaskLogsTest.java | 16 ++++++++++ .../druid/tasklogs/TaskLogPusherTest.java | 26 +++++++++++++++++ .../druid/tasklogs/TaskLogStreamerTest.java | 29 +++++++++++++++++++ 3 files changed, 71 insertions(+) create mode 100644 processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java create mode 100644 processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java create mode 100644 processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java diff --git a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java new file mode 100644 index 000000000000..30d13b25479f --- /dev/null +++ b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java @@ -0,0 +1,16 @@ +package org.apache.druid.tasklogs; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class NoopTaskLogsTest +{ + @Test + public void test_streamTaskStatus() throws IOException + { + TaskLogs taskLogs = new NoopTaskLogs(); + Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent()); + } +} diff --git a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java new file mode 100644 index 000000000000..3856bff21f9c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java @@ -0,0 +1,26 @@ +package org.apache.druid.tasklogs; + +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +public class TaskLogPusherTest +{ + /** + * Test default implemenation of pushTaskStatus in TaskLogPusher interface for code coverage + * + * @throws IOException + */ + @Test + public void test_pushTaskStatus() throws IOException + { + TaskLogPusher taskLogPusher = new TaskLogPusher() { + @Override + public void pushTaskLog(String taskid, File logFile) throws IOException + { + } + }; + taskLogPusher.pushTaskStatus("id", new File("")); + } +} diff --git a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java new file mode 100644 index 000000000000..adc04d41f4eb --- /dev/null +++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java @@ -0,0 +1,29 @@ +package org.apache.druid.tasklogs; + +import com.google.common.base.Optional; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; + +public class TaskLogStreamerTest +{ + /** + * Test default implemenation of streamTaskStatus in TaskLogStreamer interface for code coverage + * + * @throws IOException + */ + @Test + public void test_streamTaskStatus() throws IOException + { + TaskLogStreamer taskLogStreamer = new TaskLogStreamer() { + @Override + public Optional streamTaskLog(String taskid, long offset) throws IOException + { + return Optional.absent(); + } + }; + Assert.assertFalse(taskLogStreamer.streamTaskStatus("id").isPresent()); + } +} From 37566d77f383a9ff6ba3e5295282fc4471d97865 Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Mon, 17 Apr 2023 19:40:53 -0400 Subject: [PATCH 6/8] Add license headerss --- .../druid/tasklogs/NoopTaskLogsTest.java | 19 +++++++++++++++++++ .../druid/tasklogs/TaskLogPusherTest.java | 19 +++++++++++++++++++ .../druid/tasklogs/TaskLogStreamerTest.java | 19 +++++++++++++++++++ 3 files changed, 57 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java index 30d13b25479f..3aadbfe04c89 100644 --- a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java +++ b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.tasklogs; import org.junit.Assert; diff --git a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java index 3856bff21f9c..6768831b1e76 100644 --- a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java +++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.tasklogs; import org.junit.Test; diff --git a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java index adc04d41f4eb..663723715088 100644 --- a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java +++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.tasklogs; import com.google.common.base.Optional; From 0160957bae6c00fdbc3d95dc529d619e0a3b3a6c Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Mon, 17 Apr 2023 21:30:43 -0400 Subject: [PATCH 7/8] Fix style --- .../test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java index 3aadbfe04c89..30192932a9e0 100644 --- a/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java +++ b/processing/src/test/java/org/apache/druid/tasklogs/NoopTaskLogsTest.java @@ -29,7 +29,7 @@ public class NoopTaskLogsTest @Test public void test_streamTaskStatus() throws IOException { - TaskLogs taskLogs = new NoopTaskLogs(); + TaskLogs taskLogs = new NoopTaskLogs(); Assert.assertFalse(taskLogs.streamTaskStatus("id").isPresent()); } } From 539807d3961e7b7172db75ec46abb71695d73e38 Mon Sep 17 00:00:00 2001 From: Nicholas Lippis Date: Tue, 18 Apr 2023 12:22:22 -0400 Subject: [PATCH 8/8] Remove unknown exception declarations --- .../java/org/apache/druid/tasklogs/TaskLogPusherTest.java | 4 ++-- .../java/org/apache/druid/tasklogs/TaskLogStreamerTest.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java index 6768831b1e76..15b67409b3ca 100644 --- a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java +++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogPusherTest.java @@ -27,7 +27,7 @@ public class TaskLogPusherTest { /** - * Test default implemenation of pushTaskStatus in TaskLogPusher interface for code coverage + * Test default implementation of pushTaskStatus in TaskLogPusher interface for code coverage * * @throws IOException */ @@ -36,7 +36,7 @@ public void test_pushTaskStatus() throws IOException { TaskLogPusher taskLogPusher = new TaskLogPusher() { @Override - public void pushTaskLog(String taskid, File logFile) throws IOException + public void pushTaskLog(String taskid, File logFile) { } }; diff --git a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java index 663723715088..e61b9f0a1a12 100644 --- a/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java +++ b/processing/src/test/java/org/apache/druid/tasklogs/TaskLogStreamerTest.java @@ -38,7 +38,7 @@ public void test_streamTaskStatus() throws IOException { TaskLogStreamer taskLogStreamer = new TaskLogStreamer() { @Override - public Optional streamTaskLog(String taskid, long offset) throws IOException + public Optional streamTaskLog(String taskid, long offset) { return Optional.absent(); }