diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 4d79319ab0b2..bcc8805b3a52 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -236,6 +236,7 @@ data: |`druid.indexer.runner.peonMonitors`| `JsonArray` | Overrides `druid.monitoring.monitors`. Use this property if you don't want to inherit monitors from the Overlord. |`[]`|No| |`druid.indexer.runner.graceTerminationPeriodSeconds`| `Long` | Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods. |`PT30S` (K8s default)|No| |`druid.indexer.runner.capacity`| `Integer` | Number of concurrent jobs that can be sent to Kubernetes. |`2147483647`|No| +|`druid.indexer.runner.cpuCoreInMicro`| `Integer` | Number of CPU micro core for the task. | `1000`|No| ### Metrics added diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 0d67c55b30aa..60efa3c48569 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -101,6 +101,10 @@ public class KubernetesTaskRunnerConfig @NotNull private List javaOptsArray = ImmutableList.of(); + @JsonProperty + @NotNull + private int cpuCoreInMicro = 0; + @JsonProperty @NotNull private Map labels = ImmutableMap.of(); @@ -133,6 +137,7 @@ private KubernetesTaskRunnerConfig( Period k8sjobLaunchTimeout, List peonMonitors, List javaOptsArray, + int cpuCoreInMicro, Map labels, Map annotations, Integer capacity @@ -184,6 +189,10 @@ private KubernetesTaskRunnerConfig( javaOptsArray, this.javaOptsArray ); + this.cpuCoreInMicro = ObjectUtils.defaultIfNull( + cpuCoreInMicro, + this.cpuCoreInMicro + ); this.labels = ObjectUtils.defaultIfNull( labels, this.labels @@ -264,6 +273,11 @@ public List getJavaOptsArray() return javaOptsArray; } + public int getCpuCoreInMicro() + { + return cpuCoreInMicro; + } + public Map getLabels() { return labels; @@ -299,6 +313,7 @@ public static class Builder private Period k8sjobLaunchTimeout; private List peonMonitors; private List javaOptsArray; + private int cpuCoreInMicro; private Map labels; private Map annotations; private Integer capacity; @@ -379,6 +394,12 @@ public Builder withPeonMonitors(List peonMonitors) return this; } + public Builder withCpuCore(int cpuCore) + { + this.cpuCoreInMicro = cpuCore; + return this; + } + public Builder withJavaOptsArray(List javaOptsArray) { this.javaOptsArray = javaOptsArray; @@ -397,6 +418,7 @@ public Builder withAnnotations(Map annotations) return this; } + public Builder withCapacity(@Min(0) @Max(Integer.MAX_VALUE) Integer capacity) { this.capacity = capacity; @@ -419,6 +441,7 @@ public KubernetesTaskRunnerConfig build() this.k8sjobLaunchTimeout, this.peonMonitors, this.javaOptsArray, + this.cpuCoreInMicro, this.labels, this.annotations, this.capacity diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java index 7d35827f89bb..6c195ed15151 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidK8sConstants.java @@ -30,6 +30,8 @@ public class DruidK8sConstants public static final String TASK_DATASOURCE = "task.datasource"; public static final int PORT = 8100; public static final int TLS_PORT = 8091; + public static final int DEFAULT_CPU_MILLICORES = 1000; + public static final String DEFAULT_JAVA_HEAP_SIZE = "1G"; public static final String TLS_ENABLED = "tls.enabled"; public static final String TASK_JSON_ENV = "TASK_JSON"; public static final String TASK_DIR_ENV = "TASK_DIR"; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonCommandContext.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonCommandContext.java index 8984a73bd1ba..ef06d36d3d0b 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonCommandContext.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/PeonCommandContext.java @@ -31,17 +31,25 @@ public class PeonCommandContext private final List javaOpts; private final File taskDir; private final boolean enableTls; + private final int CpuMicroCore; - public PeonCommandContext(List comamnd, List javaOpts, File taskDir) + public PeonCommandContext(List comamnd, List javaOpts, File taskDir, int CpuMicroCore) { - this(comamnd, javaOpts, taskDir, false); + this(comamnd, javaOpts, taskDir, CpuMicroCore, false); } - public PeonCommandContext(List comamnd, List javaOpts, File taskDir, boolean enableTls) + public PeonCommandContext( + List comamnd, + List javaOpts, + File taskDir, + int CpuMicroCore, + boolean enableTls + ) { this.comamnd = comamnd; this.javaOpts = javaOpts; this.taskDir = taskDir; + this.CpuMicroCore = CpuMicroCore; this.enableTls = enableTls; } @@ -66,6 +74,11 @@ public File getTaskDir() return taskDir; } + public int getCpuMicroCore() + { + return CpuMicroCore; + } + public boolean isEnableTls() { return enableTls; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java index 862b176b1159..4dfb66ba706c 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java @@ -125,6 +125,7 @@ public Job fromTask(Task task) throws IOException generateCommand(task), javaOpts(task), taskConfig.getBaseTaskDir(), + taskRunnerConfig.getCpuCoreInMicro(), node.isEnableTlsPort() ); PodSpec podSpec = pod.getSpec(); @@ -216,7 +217,7 @@ static long getContainerMemory(PeonCommandContext context) { List javaOpts = context.getJavaOpts(); Optional optionalXmx = getJavaOptValueBytes("-Xmx", javaOpts); - long heapSize = HumanReadableBytes.parse("1g"); + long heapSize = HumanReadableBytes.parse(DruidK8sConstants.DEFAULT_JAVA_HEAP_SIZE); if (optionalXmx.isPresent()) { heapSize = optionalXmx.get(); } @@ -319,7 +320,8 @@ protected Container setupMainContainer( mainContainer.setName("main"); ResourceRequirements requirements = getResourceRequirements( mainContainer.getResources(), - containerSize + containerSize, + context.getCpuMicroCore() ); mainContainer.setResources(requirements); return mainContainer; @@ -457,10 +459,13 @@ private List generateCommand(Task task) } @VisibleForTesting - static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize) + static ResourceRequirements getResourceRequirements(ResourceRequirements requirements, long containerSize, int cpuMicroCore) { Map resourceMap = new HashMap<>(); - resourceMap.put("cpu", new Quantity("1000", "m")); + resourceMap.put( + "cpu", + new Quantity(String.valueOf(cpuMicroCore > 0 ? cpuMicroCore : DruidK8sConstants.DEFAULT_CPU_MILLICORES), "m") + ); resourceMap.put("memory", new Quantity(String.valueOf(containerSize))); ResourceRequirementsBuilder result = new ResourceRequirementsBuilder(); if (requirements != null) { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 8bcdf15cb69b..241b4d9fc68f 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -125,9 +125,12 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception null ); String taskBasePath = "/home/taskDir"; - PeonCommandContext context = new PeonCommandContext(Collections.singletonList( - "sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0" - ), new ArrayList<>(), new File(taskBasePath)); + PeonCommandContext context = new PeonCommandContext( + Collections.singletonList("sleep 10; for i in `seq 1 1000`; do echo $i; done; exit 0"), + new ArrayList<>(), + new File(taskBasePath), + config.getCpuCoreInMicro() + ); Job job = adapter.createJobFromPodSpec(podSpec, task, context); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java index e28854959383..102565efc35a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java @@ -190,7 +190,7 @@ public void serializingAndDeserializingATask() throws IOException Job jobFromSpec = adapter.createJobFromPodSpec( K8sTestUtils.getDummyPodSpec(), task, - new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/")) + new PeonCommandContext(new ArrayList<>(), new ArrayList<>(), new File("/tmp/"), config.getCpuCoreInMicro()) ); client.batch().v1().jobs().inNamespace("test").create(jobFromSpec); JobList jobList = client.batch().v1().jobs().inNamespace("test").list(); @@ -391,7 +391,8 @@ void testGettingContainerSize() PeonCommandContext context = new PeonCommandContext( new ArrayList<>(), new ArrayList<>(), - new File("/tmp") + new File("/tmp"), + 0 ); assertEquals(expected, K8sTaskAdapter.getContainerMemory(context)); @@ -399,7 +400,8 @@ void testGettingContainerSize() new ArrayList<>(), Collections.singletonList( "-server -Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Djava.io.tmpdir=/druid/data -XX:+ExitOnOutOfMemoryError -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager"), - new File("/tmp") + new File("/tmp"), + 0 ); expected = (long) ((HumanReadableBytes.parse("512m") + HumanReadableBytes.parse("1g")) * 1.2); assertEquals(expected, K8sTaskAdapter.getContainerMemory(context)); @@ -452,7 +454,8 @@ void testAddingMonitors() throws IOException PeonCommandContext context = new PeonCommandContext( new ArrayList<>(), new ArrayList<>(), - new File("/tmp/") + new File("/tmp/"), + 0 ); KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("test") @@ -548,7 +551,8 @@ void testEphemeralStorageIsRespected() throws IOException new PeonCommandContext( Collections.singletonList("foo && bar"), new ArrayList<>(), - new File("/tmp") + new File("/tmp"), + config.getCpuCoreInMicro() ) ); Job expected = K8sTestUtils.fileToResource("expectedEphemeralOutput.yaml", Job.class); @@ -572,6 +576,63 @@ void testEphemeralStorageIsRespected() throws IOException Assertions.assertEquals(expected, actual); } + @Test + void testCPUResourceIsEspected() throws IOException + { + TestKubernetesClient testClient = new TestKubernetesClient(client); + Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class); + + List javaOpts = new ArrayList<>(); + javaOpts.add("-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G"); + KubernetesTaskRunnerConfig config = KubernetesTaskRunnerConfig.builder() + .withNamespace("test") + .withJavaOptsArray(javaOpts) + .withCpuCore(2000) + .build(); + + SingleContainerTaskAdapter adapter = new SingleContainerTaskAdapter( + testClient, + config, + taskConfig, + startupLoggingConfig, + node, + jsonMapper, + taskLogs + ); + NoopTask task = K8sTestUtils.createTask("id", 1); + Job actual = adapter.createJobFromPodSpec( + pod.getSpec(), + task, + new PeonCommandContext( + Collections.singletonList("foo && bar"), + javaOpts, + new File("/tmp"), + config.getCpuCoreInMicro() + ) + ); + Job expected = K8sTestUtils.fileToResource("expectedCPUResourceOutput.yaml", Job.class); + // something is up with jdk 17, where if you compress with jdk < 17 and try and decompress you get different results, + // this would never happen in real life, but for the jdk 17 tests this is a problem + // could be related to: https://bugs.openjdk.org/browse/JDK-8081450 + actual.getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getEnv() + .removeIf(x -> x.getName().equals("TASK_JSON")); + expected.getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getEnv() + .removeIf(x -> x.getName().equals("TASK_JSON")); + Assertions.assertEquals(expected, actual); + + } + + @Test void testEphemeralStorage() { @@ -579,7 +640,8 @@ void testEphemeralStorage() Container container = new ContainerBuilder().build(); ResourceRequirements result = K8sTaskAdapter.getResourceRequirements( container.getResources(), - 100 + 100, + 1000 ); // requests and limits will only have 2 items, cpu / memory assertEquals(2, result.getLimits().size()); @@ -591,7 +653,8 @@ void testEphemeralStorage() container.setResources(new ResourceRequirementsBuilder().withRequests(requestMap).withLimits(limitMap).build()); ResourceRequirements ephemeralResult = K8sTaskAdapter.getResourceRequirements( container.getResources(), - 100 + 100, + 1000 ); // you will have ephemeral storage as well. assertEquals(3, ephemeralResult.getLimits().size()); @@ -609,7 +672,8 @@ void testEphemeralStorage() container.getResources().setAdditionalProperty("additional", "some-value"); ResourceRequirements additionalProperties = K8sTaskAdapter.getResourceRequirements( container.getResources(), - 100 + 100, + 1000 ); assertEquals(1, additionalProperties.getAdditionalProperties().size()); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java index 45ea08733768..58993b9a6a00 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java @@ -108,9 +108,12 @@ public void testMultiContainerSupport() throws IOException Job actual = adapter.createJobFromPodSpec( pod.getSpec(), task, - new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), - new ArrayList<>(), - new File("/tmp") + new PeonCommandContext( + Collections.singletonList( + "/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), + new ArrayList<>(), + new File("/tmp"), + config.getCpuCoreInMicro() ) ); Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutput.yaml", Job.class); @@ -159,9 +162,12 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException Job actual = adapter.createJobFromPodSpec( spec, task, - new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), - new ArrayList<>(), - new File("/tmp") + new PeonCommandContext( + Collections.singletonList( + "/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), + new ArrayList<>(), + new File("/tmp"), + config.getCpuCoreInMicro() ) ); Job expected = K8sTestUtils.fileToResource("expectedMultiContainerOutputOrder.yaml", Job.class); @@ -212,9 +218,12 @@ public void testOverridingPeonMonitors() throws IOException Job actual = adapter.createJobFromPodSpec( spec, task, - new PeonCommandContext(Collections.singletonList("/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), - new ArrayList<>(), - new File("/tmp") + new PeonCommandContext( + Collections.singletonList( + "/peon.sh /druid/data/baseTaskDir/noop_2022-09-26T22:08:00.582Z_352988d2-5ff7-4b70-977c-3de96f9bfca6 1"), + new ArrayList<>(), + new File("/tmp"), + config.getCpuCoreInMicro() ) ); Job expected = K8sTestUtils.fileToResource("expectedPodSpec.yaml", Job.class); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java index 43a40daedc1d..afc5299927fa 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java @@ -110,7 +110,8 @@ public void testSingleContainerSupport() throws IOException new PeonCommandContext( Collections.singletonList("foo && bar"), new ArrayList<>(), - new File("/tmp") + new File("/tmp"), + config.getCpuCoreInMicro() ) ); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedCPUResourceOutput.yaml b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedCPUResourceOutput.yaml new file mode 100644 index 000000000000..f9fec4eac31b --- /dev/null +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/resources/expectedCPUResourceOutput.yaml @@ -0,0 +1,65 @@ +apiVersion: "batch/v1" +kind: "Job" +metadata: + annotations: + task.id: "id" + tls.enabled: "false" + labels: + druid.k8s.peons: "true" + name: "id-3e70afe5cd823dfc7dd308eea616426b" +spec: + activeDeadlineSeconds: 14400 + backoffLimit: 0 + template: + metadata: + annotations: + task.id: "id" + tls.enabled: "false" + labels: + druid.k8s.peons: "true" + spec: + containers: + - args: + - "foo && bar" + command: + - "sh" + - "-c" + env: + - name: "druid_monitoring_monitors" + value: "[\"org.apache.druid.java.util.metrics.JvmMonitor\", \"org.apache.druid.server.metrics.TaskCountStatsMonitor\"\ + ]" + - name: "TASK_DIR" + value: "/tmp" + - name: "TASK_JSON" + value: "H4sIAAAAAAAAAEVOOw7CMAy9i+cOBYmlK0KItWVhNI0BSyEOToKoqt4doxZYLPv9/EbIQyRoIIhEqICd7TYquKqUePidDjN2UrSfxYEM0xKOfDdgvalr86aW0A0z9L9bSsVnc512nZkurHSTZJJQvK+gl5DpZfwIUVmU8wDNarJ0Ssu/EfCJ7PHM3tj9p9i3ltKjWKDbYsR+sU5vP86oMNUAAAA=" + - name: "JAVA_OPTS" + value: "-Xms1G -Xmx2G -XX:MaxDirectMemorySize=3G" + - name: "druid_host" + valueFrom: + fieldRef: + fieldPath: "status.podIP" + - name: "HOSTNAME" + valueFrom: + fieldRef: + fieldPath: "metadata.name" + image: "one" + name: "main" + ports: + - containerPort: 8091 + name: "druid-tls-port" + protocol: "TCP" + - containerPort: 8100 + name: "druid-port" + protocol: "TCP" + resources: + limits: + memory: "6000000000" + cpu: "2000m" + ephemeral-storage: 10Gi + requests: + memory: "6000000000" + cpu: "2000m" + ephemeral-storage: 1Gi + hostname: "id-3e70afe5cd823dfc7dd308eea616426b" + restartPolicy: "Never" + ttlSecondsAfterFinished: 172800 \ No newline at end of file