diff --git a/docs/development/extensions-core/k8s-jobs.md b/docs/development/extensions-core/k8s-jobs.md index 4d75a65ba6ff..00b6024bb873 100644 --- a/docs/development/extensions-core/k8s-jobs.md +++ b/docs/development/extensions-core/k8s-jobs.md @@ -792,7 +792,8 @@ Should you require the needed permissions for interacting across Kubernetes name | `druid.indexer.runner.graceTerminationPeriodSeconds` | `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. | `PT30S` (K8s default) | No | | `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. | `2147483647` | No | | `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No | -| `druid.indexer.runner.logSaveTimeout` | `Duration` | How long to wait for task logs to be saved before giving up. | `PT300S` | NO | +| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO | + ### Metrics added diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index dae8890a089b..d15fa3963237 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -43,6 +43,7 @@ import org.apache.druid.tasklogs.TaskLogs; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; @@ -50,6 +51,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -337,58 +339,56 @@ private TaskStatus getTaskStatus(long duration) return taskStatus.withDuration(duration); } + /** + * Attempts to initialize a logWatch for the peon pod if one does not already exist. + *

+ * It is not guaranteed that a logWatch will be successfully initialized with this call. + *

+ */ protected void startWatchingLogs() { if (logWatch != null) { log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId()); return; } - try { - Optional maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId); - if (maybeLogWatch.isPresent()) { - logWatch = maybeLogWatch.get(); - } - } - catch (Exception e) { - log.error(e, "Error watching logs from task: %s", taskId); + Optional maybeLogWatch = executeWithTimeout( + () -> kubernetesClient.getPeonLogWatcher(taskId), + logSaveTimeoutMs, + "initializing K8s LogWatch", + "LogWatch failed to initialize. Peon may not be able to stream and persist task logs." + + " If this continues to happen, check Kubernetes server logs for potential errors." + ); + if (maybeLogWatch != null && maybeLogWatch.isPresent()) { + logWatch = maybeLogWatch.get(); } } + /** + * Saves logs from the peon pod to deep storage via the TaskLogs interface. + *

+ * This method does not gaurantee that logs will be successfully saved. It makes a best-effort attempt to copy + * the logs from the Peon and push them to deep storage. + *

+ */ protected void saveLogs() - { - ExecutorService executor = Executors.newSingleThreadExecutor(Execs.makeThreadFactory("k8s-tasklog-persist-%d")); - try { - Future future = executor.submit(this::doSaveLogs); - future.get(logSaveTimeoutMs, TimeUnit.MILLISECONDS); - } - catch (TimeoutException e) { - log.warn("Persisting task logs timed out after %d ms for task [%s]. This does not have any impact on the" - + " work done by the task, but the logs may be innaccessible. If this continues to happen, check" - + " Kubernetes server logs for potential errors.", logSaveTimeoutMs, taskId.getOriginalTaskId()); - } - catch (Exception e) { - log.error(e, "Persisting task logs failed for task [%s] This does not have any impact on the" - + " work done by the task, but the logs may be innaccessible. If this continues to happen, check" - + " Kubernetes server logs for potential errors.", taskId.getOriginalTaskId()); - } - finally { - executor.shutdownNow(); - // shutdownNow does not always allow finally blocks to run, so we make sure to close the logWatch here too if it - // wasn't closed in doSaveLogs - if (logWatch != null) { - logWatch.close(); - } - } - } - - private void doSaveLogs() { try { Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log"); try { startWatchingLogs(); if (logWatch != null) { - FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile()); + // Copy log output with timeout protection + executeWithTimeout( + () -> { + FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile()); + return null; + }, + logSaveTimeoutMs, + "coyping and persisting task logs", + "This failure does not have any impact on the" + + " ingestion work done by the task, but the logs may be partial or innaccessible. If " + + " this continues to happen, check Kubernetes server logs for potential errors." + ); } else { log.debug("Log stream not found for %s", taskId.getOriginalTaskId()); FileUtils.writeStringToFile( @@ -449,4 +449,34 @@ protected ListenableFuture getTaskStartedSuccessfullyFuture() { return taskStartedSuccessfullyFuture; } + + /** + * Executes a callable with a timeout. + *

+ * If the callable does not complete within the specified timeout or another exception occurs, the error will be + * logged and null will be returned. + *

+ */ + private @Nullable T executeWithTimeout(Callable callable, long timeoutMillis, String operationName, String errorMessage) + { + ExecutorService executor = Executors.newSingleThreadExecutor( + Execs.makeThreadFactory("k8s-peon-lifecycle-util-" + taskId.getOriginalTaskId() + "-%d")); + try { + Future future = executor.submit(callable); + return future.get(timeoutMillis, TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.warn("Operation[%s] for task[%s] timed out after [%d] ms with error[%s].", operationName, taskId.getOriginalTaskId(), timeoutMillis, errorMessage); + } + catch (InterruptedException e) { + log.warn("Operation[%s] for task[%s] was interrupted with error[%s].", operationName, taskId.getOriginalTaskId(), errorMessage); + } + catch (Exception e) { + log.error(e, "Error during operation[%s] for task[%s]: %s", operationName, taskId, errorMessage); + } + finally { + executor.shutdownNow(); + } + return null; + } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index 4484a1076189..df6f81532f7d 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -49,6 +49,7 @@ import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.util.concurrent.ExecutionException; @@ -402,7 +403,7 @@ public void test_join() throws IOException stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); logWatch.close(); - EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -452,11 +453,13 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); EasyMock.expectLastCall(); logWatch.close(); - EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expectLastCall(); stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); EasyMock.expectLastCall().once(); stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); + logWatch.close(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -561,7 +564,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); logWatch.close(); - EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -615,7 +618,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); logWatch.close(); - EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -657,7 +660,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); EasyMock.expectLastCall().once(); logWatch.close(); - EasyMock.expectLastCall().atLeastOnce(); + EasyMock.expectLastCall(); Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); @@ -1051,7 +1054,7 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown() } @Test - public void test_join_saveLogsTimeout_handledGracefully() throws IOException + public void test_startWatchingLogs_logWatchInitialize_timeout() { KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( task, @@ -1060,62 +1063,66 @@ public void test_join_saveLogsTimeout_handledGracefully() throws IOException taskLogs, mapper, stateListener, - SHORT_LOG_SAVE_TIMEOUT.toStandardDuration().getMillis() // 1 second timeout for task log save + SHORT_LOG_SAVE_TIMEOUT.toStandardDuration().getMillis() ); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)) + .andAnswer(() -> { + Thread.sleep(5000); // Exceeds 1 second timeout + return Optional.of(logWatch); + }); - Job job = new JobBuilder() - .withNewMetadata() - .withName(ID) - .endMetadata() - .withNewStatus() - .withSucceeded(1) - .withStartTime("2022-09-19T23:31:50Z") - .withCompletionTime("2022-09-19T23:32:48Z") - .endStatus() - .build(); - - EasyMock.expect(kubernetesClient.waitForPeonJobCompletion( - EasyMock.eq(k8sTaskId), - EasyMock.anyLong(), - EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED)); + replayAll(); + long startTime = System.currentTimeMillis(); + peonLifecycle.startWatchingLogs(); + long duration = System.currentTimeMillis() - startTime; + // Anything less than 5 seconds means the Executor timeed out correctly, because the mock sleeps for 5 seconds + Assert.assertTrue("Test should complete quickly due to timeout", duration < 2500); + verifyAll(); + } - EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)); - EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of( - IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8) - )); + @Test + public void test_saveLogs_streamLogs_timeout() throws IOException + { + EasyMock.reset(logWatch); - // Mock pushTaskLog to sleep longer than the timeout (2 seconds > 1 second timeout) - taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); - EasyMock.expectLastCall().andAnswer(() -> { - Thread.sleep(2000); // Sleep for 2 seconds - return null; - }); + InputStream slowInputStream = new InputStream() { + @Override + public int read() throws IOException + { + try { + Thread.sleep(5000); + } + catch (InterruptedException e) { + throw new IOException(e); + } + return -1; + } + }; - stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID); - EasyMock.expectLastCall().once(); - stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID); - EasyMock.expectLastCall().once(); + KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle( + task, + k8sTaskId, + kubernetesClient, + taskLogs, + mapper, + stateListener, + SHORT_LOG_SAVE_TIMEOUT.toStandardDuration().getMillis() + ); + EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)).once(); + EasyMock.expect(logWatch.getOutput()).andReturn(slowInputStream); logWatch.close(); - EasyMock.expectLastCall().atLeastOnce(); - - Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState()); + EasyMock.expectLastCall(); + taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class)); + EasyMock.expectLastCall(); replayAll(); - - // The test should complete without hanging, even though log save times out long startTime = System.currentTimeMillis(); - TaskStatus taskStatus = peonLifecycle.join(0L); + peonLifecycle.saveLogs(); long duration = System.currentTimeMillis() - startTime; - - // Should complete in ~1 second (timeout), not 5+ seconds (sleep duration) - Assert.assertTrue("Test should complete quickly due to timeout", duration < 1500); - + // Anything less than 5 seconds means the Executor timeed out correctly, because the mock sleeps for 5 seconds + Assert.assertTrue("Test should complete quickly due to timeout", duration < 2500); verifyAll(); - // Task should still succeed - log timeout doesn't affect task result - Assert.assertEquals(SUCCESS.withDuration(58000), taskStatus); - Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState()); } private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, KubernetesPeonLifecycle.State state)