Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ protected TaskLocation getTaskLocation()
return TaskLocation.unknown();
}

Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId);
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
return TaskLocation.unknown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class K8sTaskId
{

private final String k8sTaskId;
private final String k8sJobName;
private final String originalTaskId;

public K8sTaskId(Task task)
Expand All @@ -37,12 +37,12 @@ public K8sTaskId(Task task)
public K8sTaskId(String taskId)
{
this.originalTaskId = taskId;
this.k8sTaskId = KubernetesOverlordUtils.convertTaskIdToK8sLabel(taskId);
this.k8sJobName = KubernetesOverlordUtils.convertTaskIdToJobName(taskId);
}

public String getK8sTaskId()
public String getK8sJobName()
{
return k8sTaskId;
return k8sJobName;
}

public String getOriginalTaskId()
Expand All @@ -60,18 +60,18 @@ public boolean equals(Object o)
return false;
}
K8sTaskId k8sTaskId1 = (K8sTaskId) o;
return k8sTaskId.equals(k8sTaskId1.k8sTaskId) && originalTaskId.equals(k8sTaskId1.originalTaskId);
return k8sJobName.equals(k8sTaskId1.k8sJobName) && originalTaskId.equals(k8sTaskId1.originalTaskId);
}

@Override
public int hashCode()
{
return Objects.hash(k8sTaskId, originalTaskId);
return Objects.hash(k8sJobName, originalTaskId);
}

@Override
public String toString()
{
return "[ " + originalTaskId + ", " + k8sTaskId + "]";
return "[ " + originalTaskId + ", " + k8sJobName + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

package org.apache.druid.k8s.overlord.common;

import com.google.common.hash.Hashing;
import org.apache.commons.lang3.RegExUtils;
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.util.Locale;
import java.util.regex.Pattern;

Expand All @@ -43,4 +45,10 @@ public static String convertTaskIdToK8sLabel(String taskId)
return taskId == null ? "" : StringUtils.left(RegExUtils.replaceAll(taskId, K8S_TASK_ID_PATTERN, "")
.toLowerCase(Locale.ENGLISH), 63);
}

public static String convertTaskIdToJobName(String taskId)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats the limit on length of job name?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need to do this conversion if taskId is less than 63 chars? I mean whether it's valuable to keep taskId same as JobName if it's possible.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do need to since there are some characters in task ids that are not valid in k8s labels/job names

{
return taskId == null ? "" : StringUtils.left(RegExUtils.replaceAll(taskId, K8S_TASK_ID_PATTERN, "")
.toLowerCase(Locale.ENGLISH), 30) + "-" + Hashing.murmur3_128().hashString(taskId, StandardCharsets.UTF_8);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hashing.murmur3_128().hashString(taskId, StandardCharsets.UTF_8)

How many characters will this produce? I couldn't easily figure out if there is a limit on the length of the string returned from this function

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

128 bits -> 32 characters

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public Pod launchPeonJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit
// launch job
return clientApi.executeRequest(client -> {
client.batch().v1().jobs().inNamespace(namespace).resource(job).create();
K8sTaskId taskId = new K8sTaskId(job.getMetadata().getName());
log.info("Successfully submitted job: %s ... waiting for job to launch", taskId);
String jobName = job.getMetadata().getName();
log.info("Successfully submitted job: %s ... waiting for job to launch", jobName);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to report the taskId in this log line, to make it easy to map to the task_id that is being started.

// wait until the pod is running or complete or failed, any of those is fine
Pod mainPod = getPeonPodWithRetries(taskId);
Pod mainPod = getPeonPodWithRetries(jobName);
Pod result = client.pods().inNamespace(namespace).withName(mainPod.getMetadata().getName())
.waitUntilCondition(pod -> {
if (pod == null) {
Expand All @@ -68,7 +68,7 @@ public Pod launchPeonJobAndWaitForStart(Job job, long howLong, TimeUnit timeUnit
return pod.getStatus() != null && pod.getStatus().getPodIP() != null;
}, howLong, timeUnit);
long duration = System.currentTimeMillis() - start;
log.info("Took task %s %d ms for pod to startup", taskId, duration);
log.info("Took task %s %d ms for pod to startup", jobName, duration);
return result;
});
}
Expand All @@ -80,7 +80,7 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.withName(taskId.getK8sJobName())
.waitUntilCondition(
x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null
&& (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() != null)),
Expand All @@ -106,7 +106,7 @@ public boolean deletePeonJob(K8sTaskId taskId)
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.withName(taskId.getK8sJobName())
.delete().isEmpty());
if (result) {
log.info("Cleaned up k8s task: %s", taskId);
Expand All @@ -128,7 +128,7 @@ public Optional<LogWatch> getPeonLogWatcher(K8sTaskId taskId)
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.withName(taskId.getK8sJobName())
.inContainer("main")
.watchLog();
if (logWatch == null) {
Expand All @@ -150,7 +150,7 @@ public Optional<InputStream> getPeonLogs(K8sTaskId taskId)
.v1()
.jobs()
.inNamespace(namespace)
.withName(taskId.getK8sTaskId())
.withName(taskId.getK8sJobName())
.inContainer("main")
.getLogInputStream();
if (logStream == null) {
Expand Down Expand Up @@ -212,47 +212,46 @@ private List<Job> getJobsToCleanup(List<Job> candidates, long howFarBack, TimeUn
return toDelete;
}

public Optional<Pod> getPeonPod(K8sTaskId taskId)
public Optional<Pod> getPeonPod(String jobName)
{
return clientApi.executeRequest(client -> getPeonPod(client, taskId));
return clientApi.executeRequest(client -> getPeonPod(client, jobName));
}

private Optional<Pod> getPeonPod(KubernetesClient client, K8sTaskId taskId)
private Optional<Pod> getPeonPod(KubernetesClient client, String jobName)
{
List<Pod> pods = client.pods()
.inNamespace(namespace)
.withLabel("job-name", taskId.getK8sTaskId())
.withLabel("job-name", jobName)
.list()
.getItems();
return pods.isEmpty() ? Optional.absent() : Optional.of(pods.get(0));
}

public Pod getPeonPodWithRetries(K8sTaskId taskId)
public Pod getPeonPodWithRetries(String jobName)
{
return clientApi.executeRequest(client -> getPeonPodWithRetries(client, taskId, 5, RetryUtils.DEFAULT_MAX_TRIES));
return clientApi.executeRequest(client -> getPeonPodWithRetries(client, jobName, 5, RetryUtils.DEFAULT_MAX_TRIES));
}

@VisibleForTesting
Pod getPeonPodWithRetries(KubernetesClient client, K8sTaskId taskId, int quietTries, int maxTries)
Pod getPeonPodWithRetries(KubernetesClient client, String jobName, int quietTries, int maxTries)
{
String k8sTaskId = taskId.getK8sTaskId();
try {
return RetryUtils.retry(
() -> {
Optional<Pod> maybePod = getPeonPod(client, taskId);
Optional<Pod> maybePod = getPeonPod(client, jobName);
if (maybePod.isPresent()) {
return maybePod.get();
}
throw new KubernetesResourceNotFoundException(
"K8s pod with label: job-name="
+ k8sTaskId
+ jobName
+ " not found");
},
DruidK8sConstants.IS_TRANSIENT, quietTries, maxTries
);
}
catch (Exception e) {
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + k8sTaskId + " not found");
throw new KubernetesResourceNotFoundException("K8s pod with label: job-name=" + jobName + " not found");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass jobName instead of K8sTaskId, we lose the originalTaskId in the log, not sure if it's useful for troubleshooting.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm i think it might be, although technically the k8sTaskId being logged right now is not really the real one b/c of the bug i mentioned above. maybe i can find a way to get the actual task id in there

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ protected Job buildJob(
{
return new JobBuilder()
.withNewMetadata()
.withName(k8sTaskId.getK8sTaskId())
.withName(k8sTaskId.getK8sJobName())
.addToLabels(labels)
.addToAnnotations(annotations)
.endMetadata()
Expand Down Expand Up @@ -309,7 +309,7 @@ protected PodTemplateSpec createTemplateFromSpec(
// clean up the podSpec
podSpec.setNodeName(null);
podSpec.setRestartPolicy("Never");
podSpec.setHostname(k8sTaskId.getK8sTaskId());
podSpec.setHostname(k8sTaskId.getK8sJobName());
podSpec.setTerminationGracePeriodSeconds(taskRunnerConfig.getGraceTerminationPeriodSeconds());

PodTemplateSpec podTemplate = new PodTemplateSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public Job fromTask(Task task) throws IOException

return new JobBuilder()
.withNewMetadata()
.withName(new K8sTaskId(task).getK8sTaskId())
.withName(new K8sTaskId(task).getK8sJobName())
.addToLabels(getJobLabels(taskRunnerConfig, task))
.addToAnnotations(getJobAnnotations(taskRunnerConfig, task))
.endMetadata()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ public void test_getTaskLocation_withRunningTaskState_withoutPeonPod_returnsUnkn
KubernetesPeonLifecycle peonLifecycle = new KubernetesPeonLifecycle(task, kubernetesClient, taskLogs, mapper);
setPeonLifecycleState(peonLifecycle, KubernetesPeonLifecycle.State.RUNNING);

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId)).andReturn(Optional.absent());
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.absent());

replayAll();

Expand All @@ -598,7 +598,7 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithoutStatus_r
.endMetadata()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId)).andReturn(Optional.of(pod));
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));

replayAll();

Expand All @@ -623,7 +623,7 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatus_retu
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId)).andReturn(Optional.of(pod));
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));

replayAll();

Expand Down Expand Up @@ -653,7 +653,7 @@ public void test_getTaskLocation_withRunningTaskState_withPeonPodWithStatusWithT
.endStatus()
.build();

EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId)).andReturn(Optional.of(pod));
EasyMock.expect(kubernetesClient.getPeonPod(k8sTaskId.getK8sJobName())).andReturn(Optional.of(pod));

replayAll();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ public class K8sTaskIdTest
public void testModifyingTaskIDToBeK8sCompliant()
{
String original = "coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z";
String result = new K8sTaskId(original).getK8sTaskId();
assertEquals("coordinatorissuedcompactk8smetricsaeifmefd20220818t153326094z", result);
String result = new K8sTaskId(original).getK8sJobName();
assertEquals("coordinatorissuedcompactk8smet-2e2c1862cb7ad1d01f4794b27a4438b0", result);
}

@Test
public void testEquals()
{
EqualsVerifier.forClass(K8sTaskId.class)
.withNonnullFields("k8sTaskId", "originalTaskId")
.withNonnullFields("k8sJobName", "originalTaskId")
.usingGetClass()
.verify();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,20 @@ public void test_nullTaskId()
{
Assert.assertEquals("", KubernetesOverlordUtils.convertTaskIdToK8sLabel(null));
}

@Test
public void test_stripJobName()
{
Assert.assertEquals("apiissuedkillwikipedianewbalhn-8916017dfd5469fe9a8881b1035497a2", KubernetesOverlordUtils.convertTaskIdToJobName(
"api-issued_kill_wikipedia_new_balhnoib_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:28:42.526Z"
));
}

@Test
public void test_stripJobName_avoidDuplicatesWithLongDataSourceName()
{
String jobName1 = KubernetesOverlordUtils.convertTaskIdToJobName("coordinator-issued_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_116_pcgkebcl_2023-07-19T16:53:11.416Z");
String jobName2 = KubernetesOverlordUtils.convertTaskIdToJobName("coordinator-issued_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_117_pcgkebcl_2023-07-19T16:53:11.416Z");
Assert.assertNotEquals(jobName1, jobName2);
}
}
Loading