Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/development/extensions-core/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,8 @@ Should you require the needed permissions for interacting across Kubernetes name
| `druid.indexer.runner.graceTerminationPeriodSeconds` | `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. | `PT30S` (K8s default) | No |
| `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. | `2147483647` | No |
| `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No |
| `druid.indexer.runner.logSaveTimeout` | `Duration` | How long to wait for task logs to be saved before giving up. | `PT300S` | NO |
| `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO |


### Metrics added

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
import org.apache.druid.tasklogs.TaskLogs;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -337,58 +339,56 @@ private TaskStatus getTaskStatus(long duration)
return taskStatus.withDuration(duration);
}

/**
* Attempts to initialize a logWatch for the peon pod if one does not already exist.
* <p>
* It is not guaranteed that a logWatch will be successfully initialized with this call.
* </p>
*/
protected void startWatchingLogs()
{
if (logWatch != null) {
log.debug("There is already a log watcher for %s", taskId.getOriginalTaskId());
return;
}
try {
Optional<LogWatch> maybeLogWatch = kubernetesClient.getPeonLogWatcher(taskId);
if (maybeLogWatch.isPresent()) {
logWatch = maybeLogWatch.get();
}
}
catch (Exception e) {
log.error(e, "Error watching logs from task: %s", taskId);
Optional<LogWatch> maybeLogWatch = executeWithTimeout(
() -> kubernetesClient.getPeonLogWatcher(taskId),
logSaveTimeoutMs,
"initializing K8s LogWatch",
"LogWatch failed to initialize. Peon may not be able to stream and persist task logs."
+ " If this continues to happen, check Kubernetes server logs for potential errors."
);
if (maybeLogWatch != null && maybeLogWatch.isPresent()) {
logWatch = maybeLogWatch.get();
}
}

/**
* Saves logs from the peon pod to deep storage via the TaskLogs interface.
* <p>
* This method does not gaurantee that logs will be successfully saved. It makes a best-effort attempt to copy
* the logs from the Peon and push them to deep storage.
* </p>
*/
protected void saveLogs()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we might benefit from adding a common method which can be used both for logWatch init as well as saveLogs:

private <T> T executeWithTimeout(Callable<T> runnable, long timeoutMillis, String operationName)
{
    // Create the executor
    // Start it
    // Handle the exceptions
    // Finally shutdown the executor
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I took a shot at this

{
ExecutorService executor = Executors.newSingleThreadExecutor(Execs.makeThreadFactory("k8s-tasklog-persist-%d"));
try {
Future<?> future = executor.submit(this::doSaveLogs);
future.get(logSaveTimeoutMs, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
log.warn("Persisting task logs timed out after %d ms for task [%s]. This does not have any impact on the"
+ " work done by the task, but the logs may be innaccessible. If this continues to happen, check"
+ " Kubernetes server logs for potential errors.", logSaveTimeoutMs, taskId.getOriginalTaskId());
}
catch (Exception e) {
log.error(e, "Persisting task logs failed for task [%s] This does not have any impact on the"
+ " work done by the task, but the logs may be innaccessible. If this continues to happen, check"
+ " Kubernetes server logs for potential errors.", taskId.getOriginalTaskId());
}
finally {
executor.shutdownNow();
// shutdownNow does not always allow finally blocks to run, so we make sure to close the logWatch here too if it
// wasn't closed in doSaveLogs
if (logWatch != null) {
logWatch.close();
}
}
}

private void doSaveLogs()
{
try {
Path file = Files.createTempFile(taskId.getOriginalTaskId(), "log");
try {
startWatchingLogs();
if (logWatch != null) {
FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile());
// Copy log output with timeout protection
executeWithTimeout(
() -> {
FileUtils.copyInputStreamToFile(logWatch.getOutput(), file.toFile());
return null;
},
logSaveTimeoutMs,
"coyping and persisting task logs",
"This failure does not have any impact on the"
+ " ingestion work done by the task, but the logs may be partial or innaccessible. If "
+ " this continues to happen, check Kubernetes server logs for potential errors."
);
} else {
log.debug("Log stream not found for %s", taskId.getOriginalTaskId());
FileUtils.writeStringToFile(
Expand Down Expand Up @@ -449,4 +449,34 @@ protected ListenableFuture<Boolean> getTaskStartedSuccessfullyFuture()
{
return taskStartedSuccessfullyFuture;
}

/**
* Executes a callable with a timeout.
* <p>
* If the callable does not complete within the specified timeout or another exception occurs, the error will be
* logged and null will be returned.
* </p>
*/
private <T> @Nullable T executeWithTimeout(Callable<T> callable, long timeoutMillis, String operationName, String errorMessage)
{
ExecutorService executor = Executors.newSingleThreadExecutor(
Execs.makeThreadFactory("k8s-peon-lifecycle-util-" + taskId.getOriginalTaskId() + "-%d"));
try {
Future<T> future = executor.submit(callable);
return future.get(timeoutMillis, TimeUnit.MILLISECONDS);
}
catch (TimeoutException e) {
log.warn("Operation[%s] for task[%s] timed out after [%d] ms with error[%s].", operationName, taskId.getOriginalTaskId(), timeoutMillis, errorMessage);
}
catch (InterruptedException e) {
log.warn("Operation[%s] for task[%s] was interrupted with error[%s].", operationName, taskId.getOriginalTaskId(), errorMessage);
}
catch (Exception e) {
log.error(e, "Error during operation[%s] for task[%s]: %s", operationName, taskId, errorMessage);
}
finally {
executor.shutdownNow();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -402,7 +403,7 @@ public void test_join() throws IOException
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -452,11 +453,13 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();
logWatch.close();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expectLastCall();
stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -561,7 +564,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -615,7 +618,7 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -657,7 +660,7 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall().atLeastOnce();
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

Expand Down Expand Up @@ -1051,7 +1054,7 @@ public void test_getTaskLocation_withStoppedTaskState_returnsUnknown()
}

@Test
public void test_join_saveLogsTimeout_handledGracefully() throws IOException
Comment thread
kfaraz marked this conversation as resolved.
public void test_startWatchingLogs_logWatchInitialize_timeout()
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
Expand All @@ -1060,62 +1063,66 @@ public void test_join_saveLogsTimeout_handledGracefully() throws IOException
taskLogs,
mapper,
stateListener,
SHORT_LOG_SAVE_TIMEOUT.toStandardDuration().getMillis() // 1 second timeout for task log save
SHORT_LOG_SAVE_TIMEOUT.toStandardDuration().getMillis()
);
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId))
.andAnswer(() -> {
Thread.sleep(5000); // Exceeds 1 second timeout
return Optional.of(logWatch);
});

Job job = new JobBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.withNewStatus()
.withSucceeded(1)
.withStartTime("2022-09-19T23:31:50Z")
.withCompletionTime("2022-09-19T23:32:48Z")
.endStatus()
.build();

EasyMock.expect(kubernetesClient.waitForPeonJobCompletion(
EasyMock.eq(k8sTaskId),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(new JobResponse(job, PeonPhase.SUCCEEDED));
replayAll();
long startTime = System.currentTimeMillis();
peonLifecycle.startWatchingLogs();
long duration = System.currentTimeMillis() - startTime;
// Anything less than 5 seconds means the Executor timeed out correctly, because the mock sleeps for 5 seconds
Assert.assertTrue("Test should complete quickly due to timeout", duration < 2500);
verifyAll();
}

EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch));
EasyMock.expect(taskLogs.streamTaskStatus(ID)).andReturn(Optional.of(
IOUtils.toInputStream(mapper.writeValueAsString(SUCCESS), StandardCharsets.UTF_8)
));
@Test
public void test_saveLogs_streamLogs_timeout() throws IOException
{
EasyMock.reset(logWatch);

// Mock pushTaskLog to sleep longer than the timeout (2 seconds > 1 second timeout)
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall().andAnswer(() -> {
Thread.sleep(2000); // Sleep for 2 seconds
return null;
});
InputStream slowInputStream = new InputStream() {
@Override
public int read() throws IOException
{
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
throw new IOException(e);
}
return -1;
}
};

stateListener.stateChanged(KubernetesPeonLifecycle.State.RUNNING, ID);
EasyMock.expectLastCall().once();
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(
task,
k8sTaskId,
kubernetesClient,
taskLogs,
mapper,
stateListener,
SHORT_LOG_SAVE_TIMEOUT.toStandardDuration().getMillis()
);
EasyMock.expect(kubernetesClient.getPeonLogWatcher(k8sTaskId)).andReturn(Optional.of(logWatch)).once();
EasyMock.expect(logWatch.getOutput()).andReturn(slowInputStream);
logWatch.close();
EasyMock.expectLastCall().atLeastOnce();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());
EasyMock.expectLastCall();
taskLogs.pushTaskLog(EasyMock.eq(ID), EasyMock.anyObject(File.class));
EasyMock.expectLastCall();

replayAll();

// The test should complete without hanging, even though log save times out
long startTime = System.currentTimeMillis();
TaskStatus taskStatus = peonLifecycle.join(0L);
peonLifecycle.saveLogs();
long duration = System.currentTimeMillis() - startTime;

// Should complete in ~1 second (timeout), not 5+ seconds (sleep duration)
Assert.assertTrue("Test should complete quickly due to timeout", duration < 1500);

// Anything less than 5 seconds means the Executor timeed out correctly, because the mock sleeps for 5 seconds
Assert.assertTrue("Test should complete quickly due to timeout", duration < 2500);
verifyAll();

// Task should still succeed - log timeout doesn't affect task result
Assert.assertEquals(SUCCESS.withDuration(58000), taskStatus);
Assert.assertEquals(KubernetesPeonLifecycle.State.STOPPED, peonLifecycle.getState());
}

private void setPeonLifecycleState(KubernetesPeonLifecycle peonLifecycle, KubernetesPeonLifecycle.State state)
Expand Down