Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ protected enum State
@MonotonicNonNull
private LogWatch logWatch;

private TaskLocation taskLocation;

protected KubernetesPeonLifecycle(
Task task,
KubernetesPeonClient kubernetesClient,
Expand Down Expand Up @@ -116,6 +118,8 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout)
State.PENDING
);

// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
job,
launchTimeout,
Expand Down Expand Up @@ -226,27 +230,31 @@ protected TaskLocation getTaskLocation()
return TaskLocation.unknown();
}

Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
return TaskLocation.unknown();
}
/* It's okay to cache this because podIP only changes on pod restart, and we have to set restartPolicy to Never
since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher
if we decide we need to change this later.
**/
if (taskLocation == null) {
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
return TaskLocation.unknown();
}

Pod pod = maybePod.get();
PodStatus podStatus = pod.getStatus();
Pod pod = maybePod.get();
PodStatus podStatus = pod.getStatus();

if (podStatus == null || podStatus.getPodIP() == null) {
return TaskLocation.unknown();
if (podStatus == null || podStatus.getPodIP() == null) {
return TaskLocation.unknown();
}
taskLocation = TaskLocation.create(
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false"))
);
}

return TaskLocation.create(
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
Boolean.parseBoolean(pod.getMetadata()
.getAnnotations()
.getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")
)
);
return taskLocation;
}

private TaskStatus getTaskStatus(long duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,35 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_retu
verifyAll();
}

@Test
public void test_getTaskLocation_saveTaskLocation()
throws NoSuchFieldException, IllegalAccessException
{
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);

Pod pod = new PodBuilder()
.withNewMetadata()
.withName(ID)
.endMetadata()
.withNewStatus()
.withPodIP("ip")
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod)).once();

replayAll();

TaskLocation location = peonLifecycle.getTaskLocation();
peonLifecycle.getTaskLocation();
Assert.assertEquals("ip", location.getHost());
Assert.assertEquals(8100, location.getPort());
Assert.assertEquals(-1, location.getTlsPort());

verifyAll();
}

@Test
public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithTLSAnnotation_returnsLocation()
throws NoSuchFieldException, IllegalAccessException
Expand Down