diff --git a/docs/development/extensions-core/k8s-jobs.md b/docs/development/extensions-core/k8s-jobs.md index 6dc7bdd70ea9..a9c32370d609 100644 --- a/docs/development/extensions-core/k8s-jobs.md +++ b/docs/development/extensions-core/k8s-jobs.md @@ -33,6 +33,31 @@ Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it ha The K8s extension builds a pod spec for each task using the specified pod adapter. All jobs are natively restorable, they are decoupled from the Druid deployment, thus restarting pods or doing upgrades has no effect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again. +## Kubernetes Client Mode + +### "Direct" K8s API Interaction per task *(Default)* + +Task lifecycle code in Druid talks directly to the Kubernetes API server for all operations that require interaction with the Kubernetes cluster. + +### `SharedInformer` "Caching" *(Experimental)* + +Enabled by setting `druid.indexer.runner.useK8sSharedInformers=true`, this mode uses `Fabric8` `SharedInformer` objects for monitoring state changes in the remote K8s cluster, reducing the number of direct API calls to the Kubernetes API server. This can greatly reduce load on the API server, especially in environments with a high volume of tasks. + +This mode is experimental and should be used with caution in production until it has been vetted more thoroughly by the community. + +The core idea is to use two `SharedInformers`, one for jobs and one for pods, to watch for changes in the remote K8s cluster. These informers maintain a local cache of jobs and pods that tasks can query. The informers can also notify listeners when changes occur, allowing tasks to react to state changes without polling the API server or creating per-task watches on the K8s cluster. + +#### Architecture: Direct vs. Caching Mode + +**Key Differences:** + +- `DirectKubernetesPeonClient` (Default): Every read operation makes a direct HTTP call to the K8s API server. With 100 concurrent tasks, this results in 100+ active API connections with continuous polling. + +- `CachingKubernetesPeonClient` (Experimental): All read operations query an in-memory cache maintained by `SharedInformers`. With 100 concurrent tasks, only 2 persistent watch connections are used (one for Jobs, one for Pods), achieving a large reduction in API calls. + +**Shared Operations**: + +Both implementations share the same write (job creation, deletion) and log read operations code, which always use direct API calls. ## Configuration @@ -798,7 +823,8 @@ Should you require the needed permissions for interacting across Kubernetes name | `druid.indexer.runner.capacity` | `Integer` | Number of concurrent jobs that can be sent to Kubernetes. Value will be overridden if a dynamic config value has been set. | `2147483647` | No | | `druid.indexer.runner.cpuCoreInMicro` | `Integer` | Number of CPU micro core for the task. | `1000` | No | | `druid.indexer.runner.logSaveTimeout` | `Duration` | The peon executing the ingestion task makes a best effort to persist the pod logs from `k8s` to persistent task log storage. The timeout ensures that `k8s` connection issues do not cause the pod to hang indefinitely thereby blocking Overlord operations. If the timeout occurs before the logs are saved, those logs will not be available in Druid. | `PT300S` | NO | - +| `druid.indexer.runner.useK8sSharedInformers` | `boolean` | Whether to use shared informers to watch for pod/job changes. This is more efficient on the Kubernetes API server, but may use more memory in the Overlord. | `false` | No | +| `druid.indexer.runner.k8sSharedInformerResyncPeriod` | `Duration` | When using shared informers, controls how frequently the informers resync with the Kubernetes API server. This prevents change events from being missed, keeping the informer cache clean and accurate. | `PT300S` | No | ### Metrics added diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java index 13245227d2aa..cb0e2052d318 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java @@ -20,13 +20,10 @@ package org.apache.druid.testing.embedded.indexing; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableList; import org.apache.commons.io.IOUtils; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; -import org.apache.druid.indexer.TaskState; -import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.NoopTask; @@ -40,7 +37,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.http.SqlTaskStatus; @@ -243,7 +239,7 @@ public void test_runIndexParallelTask_andCompactData() .dynamicPartitionWithMaxRows(5000) .withId(compactTaskId); cluster.callApi().onLeaderOverlord(o -> o.runTask(compactTaskId, compactionTask)); - cluster.callApi().waitForTaskToSucceed(taskId, eventCollector.latchableEmitter()); + cluster.callApi().waitForTaskToSucceed(compactTaskId, eventCollector.latchableEmitter()); // Verify the compacted data final int numCompactedSegments = 5; @@ -308,13 +304,10 @@ public void test_runKafkaSupervisor() Assertions.assertEquals("RUNNING", supervisorStatus.getState()); Assertions.assertEquals(topic, supervisorStatus.getSource()); - // Get the task statuses - List taskStatuses = ImmutableList.copyOf( - (CloseableIterator) - cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null, dataSource, 1)) - ); - Assertions.assertFalse(taskStatuses.isEmpty()); - Assertions.assertEquals(TaskState.RUNNING, taskStatuses.get(0).getStatusCode()); + // Confirm tasks are being created and running + int runningTasks = cluster.callApi().getTaskCount("running", dataSource); + int completedTasks = cluster.callApi().getTaskCount("complete", dataSource); + Assertions.assertTrue(runningTasks + completedTasks > 0); // Suspend the supervisor and verify the state cluster.callApi().onLeaderOverlord( diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/BaseKubernetesTaskRunnerDockerTest.java similarity index 83% rename from embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java rename to embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/BaseKubernetesTaskRunnerDockerTest.java index 014186cdd2fc..75ae9a8e6159 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDockerTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/BaseKubernetesTaskRunnerDockerTest.java @@ -27,12 +27,17 @@ import org.junit.jupiter.api.BeforeEach; /** - * Runs some basic ingestion tests against latest image Druid containers running - * on a K3s cluster with druid-operator and using {@code k8s} task runner type. + * Base class for Kubernetes task runner tests. Subclasses configure whether to use + * SharedInformers for caching. */ -public class KubernetesTaskRunnerDockerTest extends IngestionSmokeTest implements LatestImageDockerTest +abstract class BaseKubernetesTaskRunnerDockerTest extends IngestionSmokeTest implements LatestImageDockerTest { - private static final String MANIFEST_TEMPLATE = "manifests/druid-service-with-operator.yaml"; + protected static final String MANIFEST_TEMPLATE = "manifests/druid-service-with-operator.yaml"; + + /** + * Subclasses override to enable/disable SharedInformer caching. + */ + protected abstract boolean useSharedInformers(); @Override protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster) @@ -45,6 +50,8 @@ protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster) .addProperty("druid.indexer.runner.type", "k8s") .addProperty("druid.indexer.runner.namespace", "druid") .addProperty("druid.indexer.runner.capacity", "4") + .addProperty("druid.indexer.runner.useK8sSharedInformers", String.valueOf(useSharedInformers())) + .addProperty("druid.indexer.runner.k8sSharedInformerResyncPeriod", "PT1s") .usingPort(30090); final K3sClusterResource k3sCluster = new K3sClusterWithOperatorResource() diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerCachingModeDockerTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerCachingModeDockerTest.java new file mode 100644 index 000000000000..a23b89172948 --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerCachingModeDockerTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.k8s; + +/** + * Runs ingestion tests using SharedInformer caching mode. + * Uses Fabric8 SharedInformers to maintain a local cache of Jobs and Pods, + * reducing load on the Kubernetes API server. + */ +public class KubernetesTaskRunnerCachingModeDockerTest extends BaseKubernetesTaskRunnerDockerTest +{ + @Override + protected boolean useSharedInformers() + { + return true; + } +} diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDirectModeDockerTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDirectModeDockerTest.java new file mode 100644 index 000000000000..7142f35bbb3f --- /dev/null +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/k8s/KubernetesTaskRunnerDirectModeDockerTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testing.embedded.k8s; + +/** + * Runs ingestion tests using direct K8s API interaction (default mode). + * Each task makes direct API calls to the Kubernetes API server. + */ +public class KubernetesTaskRunnerDirectModeDockerTest extends BaseKubernetesTaskRunnerDockerTest +{ + @Override + protected boolean useSharedInformers() + { + return false; + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java index 936e86c888ce..b6ba2fdd6773 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesOverlordModule.java @@ -53,6 +53,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.k8s.overlord.common.httpclient.DruidKubernetesHttpClientFactory; import org.apache.druid.k8s.overlord.common.httpclient.jdk.DruidKubernetesJdkHttpClientConfig; @@ -73,6 +74,7 @@ import org.apache.druid.server.log.StartupLoggingConfig; import org.apache.druid.tasklogs.TaskLogs; +import javax.annotation.Nullable; import java.util.Locale; import java.util.Properties; @@ -160,6 +162,10 @@ public KubernetesTaskRunnerEffectiveConfig provideEffectiveConfig( return new KubernetesTaskRunnerEffectiveConfig(staticConfig, dynamicConfigSupplier); } + /** + * Provides the base Kubernetes client for direct API operations. + * This is always created regardless of caching configuration. + */ @Provides @LazySingleton public DruidKubernetesClient makeKubernetesClient( @@ -168,7 +174,6 @@ public DruidKubernetesClient makeKubernetesClient( Lifecycle lifecycle ) { - final DruidKubernetesClient client; final Config config = new ConfigBuilder().build(); if (kubernetesTaskRunnerConfig.isDisableClientProxy()) { @@ -176,7 +181,9 @@ public DruidKubernetesClient makeKubernetesClient( config.setHttpProxy(null); } - client = new DruidKubernetesClient(httpClientFactory, config); + config.setNamespace(kubernetesTaskRunnerConfig.getNamespace()); + + final DruidKubernetesClient client = new DruidKubernetesClient(httpClientFactory, config); lifecycle.addHandler( new Lifecycle.Handler() @@ -199,6 +206,58 @@ public void stop() return client; } + /** + * Provides the caching Kubernetes client that uses informers for efficient resource watching. + * Only created when caching is enabled via configuration. + */ + @Provides + @LazySingleton + @Nullable + public DruidKubernetesCachingClient makeCachingKubernetesClient( + KubernetesTaskRunnerStaticConfig kubernetesTaskRunnerConfig, + DruidKubernetesClient baseClient, + Lifecycle lifecycle + ) + { + if (!kubernetesTaskRunnerConfig.isUseK8sSharedInformers()) { + log.info("Kubernetes shared informers disabled, caching client will not be created"); + return null; + } + + String namespace = kubernetesTaskRunnerConfig.getNamespace(); + long resyncPeriodMillis = kubernetesTaskRunnerConfig + .getK8sSharedInformerResyncPeriod() + .toStandardDuration() + .getMillis(); + + log.info("Creating Kubernetes caching client with informer resync period: %d ms", resyncPeriodMillis); + final DruidKubernetesCachingClient cachingClient = new DruidKubernetesCachingClient( + baseClient, + namespace, + resyncPeriodMillis + ); + + lifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() + { + + } + + @Override + public void stop() + { + log.info("Stopping Kubernetes caching client"); + cachingClient.stop(); + } + } + ); + + return cachingClient; + } + /** * Provides a TaskRunnerFactory instance suitable for environments without Zookeeper. * In such environments, the standard RemoteTaskRunnerFactory may not be operational. diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index e7b9eb2b8043..0e96e6ab2c3f 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -72,6 +72,25 @@ public interface KubernetesTaskRunnerConfig Integer getCapacity(); + /** + * Whether to use caching for Kubernetes resources tied to indexing tasks. + *

+ * Enabling shared informers can significantly reduce the number of API calls made to the Kubernetes API server, + * improving performance and reducing load on the server. However, it also increases memory usage as informers + * maintain local caches of resources. + *

+ */ + boolean isUseK8sSharedInformers(); + + /** + * The resync period for the Kubernetes shared informers, if enabled. + *

+ * Periodic resyncs ensure that the informer's local cache is kept up to date with the remote Kubernetes API server + * state. This helps handle missed events or transient errors. + *

+ */ + Period getK8sSharedInformerResyncPeriod(); + static Builder builder() { return new Builder(); @@ -100,6 +119,8 @@ public static class Builder private Integer capacity; private Period taskJoinTimeout; private Period logSaveTimeout; + private boolean useK8sSharedInformers; + private Period k8sSharedInformerResyncPeriod; public Builder() { @@ -232,6 +253,18 @@ public Builder withLogSaveTimeout(Period logSaveTimeout) return this; } + public Builder withUseK8sSharedInformers(boolean useK8sSharedInformers) + { + this.useK8sSharedInformers = useK8sSharedInformers; + return this; + } + + public Builder withK8sSharedInformerResyncPeriod(Period k8sSharedInformerResyncPeriod) + { + this.k8sSharedInformerResyncPeriod = k8sSharedInformerResyncPeriod; + return this; + } + public KubernetesTaskRunnerStaticConfig build() { return new KubernetesTaskRunnerStaticConfig( @@ -255,7 +288,9 @@ public KubernetesTaskRunnerStaticConfig build() this.labels, this.annotations, this.capacity, - this.taskJoinTimeout + this.taskJoinTimeout, + this.useK8sSharedInformers, + this.k8sSharedInformerResyncPeriod ); } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java index c1593c3577db..8343ebe15879 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerEffectiveConfig.java @@ -177,6 +177,18 @@ public Integer getCapacity() return dynamicConfigSupplier.get().getCapacity(); } + @Override + public boolean isUseK8sSharedInformers() + { + return staticConfig.isUseK8sSharedInformers(); + } + + @Override + public Period getK8sSharedInformerResyncPeriod() + { + return staticConfig.getK8sSharedInformerResyncPeriod(); + } + public PodTemplateSelectStrategy getPodTemplateSelectStrategy() { if (dynamicConfigSupplier == null || dynamicConfigSupplier.get() == null || dynamicConfigSupplier.get().getPodTemplateSelectStrategy() == null) { diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java index 516d229c8917..325f16eed729 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactory.java @@ -27,12 +27,15 @@ import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.k8s.overlord.common.CachingKubernetesPeonClient; +import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.k8s.overlord.common.KubernetesPeonClient; import org.apache.druid.k8s.overlord.taskadapter.PodTemplateTaskAdapter; import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogs; +import javax.annotation.Nullable; import java.util.Set; public class KubernetesTaskRunnerFactory implements TaskRunnerFactory @@ -43,6 +46,7 @@ public class KubernetesTaskRunnerFactory implements TaskRunnerFactory labels, Map annotations, Integer capacity, - Period taskJoinTimeout + Period taskJoinTimeout, + boolean useK8sSharedInformers, + Period k8sSharedInformerResyncPeriod ) { this.namespace = namespace; @@ -247,6 +256,14 @@ public KubernetesTaskRunnerStaticConfig( capacity, this.capacity ); + this.useK8sSharedInformers = ObjectUtils.getIfNull( + useK8sSharedInformers, + this.useK8sSharedInformers + ); + this.k8sSharedInformerResyncPeriod = ObjectUtils.getIfNull( + k8sSharedInformerResyncPeriod, + this.k8sSharedInformerResyncPeriod + ); } @Override @@ -376,4 +393,16 @@ public Integer getCapacity() { return capacity; } + + @Override + public boolean isUseK8sSharedInformers() + { + return useK8sSharedInformers; + } + + @Override + public Period getK8sSharedInformerResyncPeriod() + { + return k8sSharedInformerResyncPeriod; + } } diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java new file mode 100644 index 000000000000..e67cddb5e467 --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClient.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import com.google.common.base.Optional; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.informers.cache.Store; +import org.apache.druid.error.DruidException; +import org.apache.druid.java.util.common.Stopwatch; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.joda.time.Duration; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A KubernetesPeonClient implementation that uses shared informers to read Job and Pod state from a local cache. + *

+ * This implementation greatly reduces load on the Kubernetes API server by centralizing watches and allowing + * tasks to query cached resource state instead of making per-task API calls. Mutable operations (job creation, + * deletion) still contact the API server directly. + *

+ */ +public class CachingKubernetesPeonClient extends KubernetesPeonClient +{ + protected static final EmittingLogger log = new EmittingLogger(CachingKubernetesPeonClient.class); + + private final DruidKubernetesCachingClient cachingClient; + + public CachingKubernetesPeonClient( + DruidKubernetesCachingClient cachingClient, + String namespace, + String overlordNamespace, + boolean debugJobs, + ServiceEmitter emitter + ) + { + + super(cachingClient.getBaseClient(), namespace, overlordNamespace == null ? "" : overlordNamespace, debugJobs, emitter); + this.cachingClient = cachingClient; + } + + @Override + public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit) + { + final Duration timeout = Duration.millis(unit.toMillis(howLong)); + final Duration jobMustBeSeenWithin = Duration.millis(cachingClient.getInformerResyncPeriodMillis() * 2); + final Stopwatch stopwatch = Stopwatch.createStarted(); + boolean jobSeenInCache = false; + + try { + CompletableFuture jobFuture = null; + while (stopwatch.hasNotElapsed(timeout) && (jobSeenInCache || stopwatch.hasNotElapsed(jobMustBeSeenWithin))) { + if (jobFuture == null || jobFuture.isDone()) { + // Register a future to watch the next change to this job + jobFuture = cachingClient.waitForJobChange(taskId.getK8sJobName()); + } + Optional maybeJob = getPeonJob(taskId.getK8sJobName()); + if (maybeJob.isPresent()) { + jobSeenInCache = true; + Job job = maybeJob.get(); + JobResponse currentResponse = determineJobResponse(job); + if (currentResponse.getPhase() != PeonPhase.RUNNING) { + return currentResponse; + } else { + log.debug("K8s job[%s] found in cache and is still running", taskId.getK8sJobName()); + } + } else if (jobSeenInCache) { + // Job was in cache before, but now it's gone - it was deleted and will never complete. + log.warn("K8s Job[%s] was not found. It can happen if the task was canceled", taskId.getK8sJobName()); + return new JobResponse(null, PeonPhase.FAILED); + } else { + log.debug("K8s job[%s] not yet found in cache", taskId.getK8sJobName()); + } + + try { + jobFuture.get(cachingClient.getInformerResyncPeriodMillis(), TimeUnit.MILLISECONDS); + } + catch (ExecutionException | CancellationException e) { + Throwable cause = e.getCause(); + if (cause instanceof CancellationException) { + log.noStackTrace().warn("Job change watch for job[%s] was cancelled", taskId.getK8sJobName()); + } else { + log.noStackTrace().warn(cause, "Exception while waiting for change notification of job[%s]", taskId.getK8sJobName()); + } + } + catch (TimeoutException e) { + // No job change event notified within the timeout time. If there is more time, it will loop back and check the cache again. + log.debug("Timeout waiting for change notification of job[%s].", taskId.getK8sJobName()); + } + catch (InterruptedException e) { + throw DruidException.defensive(e, "Interrupted waiting for job change notification for job[%s]", taskId.getK8sJobName()); + } + } + } + finally { + // Clean up: remove from map and cancel if still pending + cachingClient.cancelJobWatcher(taskId.getK8sJobName()); + } + + log.warn("Timed out waiting for K8s job[%s] to complete", taskId.getK8sJobName()); + return new JobResponse(null, PeonPhase.FAILED); + } + + @Override + public List getPeonJobs() + { + if (overlordNamespace.isEmpty()) { + return cachingClient.readJobCache(Store::list); + } else { + return cachingClient.readJobCache( + indexer -> + indexer.byIndex(DruidKubernetesCachingClient.OVERLORD_NAMESPACE_INDEX, overlordNamespace)); + } + } + + @Override + public Optional getPeonPod(String jobName) + { + return cachingClient.readPodCache(indexer -> { + List pods = indexer.byIndex(DruidKubernetesCachingClient.JOB_NAME_INDEX, jobName); + return pods.isEmpty() ? Optional.absent() : Optional.of(pods.get(0)); + }); + } + + public Optional getPeonJob(String jobName) + { + return cachingClient.readJobCache(indexer -> { + List jobs = indexer.byIndex(DruidKubernetesCachingClient.JOB_NAME_INDEX, jobName); + return jobs.isEmpty() ? Optional.absent() : Optional.of(jobs.get(0)); + }); + } + + @Override + @Nullable + protected Pod waitUntilPeonPodCreatedAndReady(String jobName, long howLong, TimeUnit timeUnit) + { + final Duration timeout = Duration.millis(timeUnit.toMillis(howLong)); + final Stopwatch stopwatch = Stopwatch.createStarted(); + + try { + CompletableFuture podFuture = null; + while (stopwatch.hasNotElapsed(timeout)) { + if (podFuture == null || podFuture.isDone()) { + // Register a future to watch the next change to this pod + podFuture = cachingClient.waitForPodChange(jobName); + } + Optional maybePod = getPeonPod(jobName); + if (maybePod.isPresent()) { + Pod pod = maybePod.get(); + String podName = pod.getMetadata() != null && pod.getMetadata().getName() != null + ? pod.getMetadata().getName() + : "unknown"; + if (isPodRunningOrComplete(pod)) { + log.info("Pod[%s] for job[%s] is now in Running/Complete state", podName, jobName); + return pod; + } else { + log.debug("Pod[%s] for job[%s] found in cache but not yet Running/Complete", podName, jobName); + } + } else { + log.debug("Pod for job[%s] not yet found in cache", jobName); + } + + try { + podFuture.get(cachingClient.getInformerResyncPeriodMillis(), TimeUnit.MILLISECONDS); + } + catch (ExecutionException | CancellationException e) { + // This is unusual. Log warning but try to continue + Throwable cause = e.getCause(); + if (cause instanceof CancellationException) { + log.noStackTrace().warn("Pod change watch for job[%s] was cancelled", jobName); + } else { + log.noStackTrace().warn(cause, "Unexpected exception while waiting for pod change notification for job[%s]", jobName); + } + } + catch (TimeoutException e) { + // No pod change event notified within the timeout time. If there is more time, it will loop back and check the cache again. + log.debug("Timeout waiting for change notification of pod for job[%s].", jobName); + } + catch (InterruptedException e) { + throw DruidException.defensive(e, "Interrupted waiting for pod change notification for job[%s]", jobName); + } + } + } + finally { + // Clean up: remove from map and cancel if still pending + cachingClient.cancelPodWatcher(jobName); + } + log.warn("Timed out waiting for pod for job[%s] to be created and ready", jobName); + return null; + } + + /** + * Check if the pod is in Running, Succeeded or Failed phase. + */ + private boolean isPodRunningOrComplete(Pod pod) + { + // I could not find constants for Pod phases in fabric8, so hardcoding them here. + // They are documented here: https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase + List matchingPhases = List.of("Running", "Succeeded", "Failed"); + return pod.getStatus() != null && pod.getStatus().getPhase() != null && + matchingPhases.contains(pod.getStatus().getPhase()); + } + + /** + * Determine the JobResponse based on the current state of the Job. + */ + private JobResponse determineJobResponse(Job job) + { + if (job.getStatus() != null) { + Integer active = job.getStatus().getActive(); + Integer succeeded = job.getStatus().getSucceeded(); + Integer failed = job.getStatus().getFailed(); + + if ((active == null || active == 0) && (succeeded != null || failed != null)) { + if (succeeded != null && succeeded > 0) { + log.info("K8s job[%s] completed successfully", job.getMetadata().getName()); + return new JobResponse(job, PeonPhase.SUCCEEDED); + } else { + log.warn("K8s job[%s] failed with status %s", job.getMetadata().getName(), job.getStatus()); + return new JobResponse(job, PeonPhase.FAILED); + } + } + } + + log.debug("K8s job[%s] is still active.", job.getMetadata().getName()); + return new JobResponse(job, PeonPhase.RUNNING); + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java new file mode 100644 index 000000000000..b6128177377b --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/DruidKubernetesCachingClient.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class DruidKubernetesCachingClient +{ + private static final EmittingLogger log = new EmittingLogger(DruidKubernetesCachingClient.class); + + public static final String JOB_NAME_INDEX = "byJobName"; + public static final String OVERLORD_NAMESPACE_INDEX = "byOverlordNamespace"; + + private final SharedIndexInformer podInformer; + private final SharedIndexInformer jobInformer; + private final KubernetesResourceEventNotifier eventNotifier; + private final KubernetesClientApi baseClient; + private final long informerResyncPeriodMillis; + + public DruidKubernetesCachingClient( + KubernetesClientApi baseClient, + String namespace, + long informerResyncPeriodMillis + ) + { + this.baseClient = baseClient; + this.informerResyncPeriodMillis = informerResyncPeriodMillis; + this.eventNotifier = new KubernetesResourceEventNotifier(); + + this.podInformer = setupPodInformer(namespace); + this.jobInformer = setupJobInformer(namespace); + } + + /** + * Stops the fabric8 informers and cancels all pending futures in the event notifier. + */ + public void stop() + { + if (podInformer != null) { + podInformer.stop(); + } + if (jobInformer != null) { + jobInformer.stop(); + } + // Cancel all pending futures in the event notifier + eventNotifier.cancelAll(); + } + + public KubernetesClientApi getBaseClient() + { + return baseClient; + } + + public KubernetesClient getClient() + { + return baseClient.getClient(); + } + + /** + * Reads from thePod Informer's {@link io.fabric8.kubernetes.client.informers.cache.Indexer} using the provided executor. + */ + public T readPodCache(SharedInformerCacheReader executor) + { + return executor.readFromCache(podInformer.getIndexer()); + } + + /** + * Reads from the Job Informer's {@link io.fabric8.kubernetes.client.informers.cache.Indexer} using the provided executor. + */ + public T readJobCache(SharedInformerCacheReader executor) + { + return executor.readFromCache(jobInformer.getIndexer()); + } + + /** + * Sets up a shared informer to watch and cache Pod resources in the specified namespace. + *

+ * Registers event handlers for pod add/update/delete events and creates a custom index by job-name + * for efficient pod lookup by job. + *

+ */ + private SharedIndexInformer setupPodInformer(String namespace) + { + SharedIndexInformer podInformer = + baseClient.getClient().pods() + .inNamespace(namespace) + .withLabel(DruidK8sConstants.LABEL_KEY) + .inform( + new InformerEventHandler<>( + (pod, eventType) -> { + log.debug("Pod[%s] got %s", pod.getMetadata().getName(), eventType.name()); + notifyPodChange(pod); + } + ), informerResyncPeriodMillis + ); + + Function> jobNameIndexer = pod -> { + if (pod.getMetadata() != null && pod.getMetadata().getLabels() != null) { + String jobName = pod.getMetadata().getLabels().get("job-name"); + if (jobName != null) { + return Collections.singletonList(jobName); + } + } + return Collections.emptyList(); + }; + + Map>> customPodIndexers = new HashMap<>(); + customPodIndexers.put(JOB_NAME_INDEX, jobNameIndexer); + + podInformer.addIndexers(customPodIndexers); + return podInformer; + } + + /** + * Sets up a shared informer to watch and cache Job resources in the specified namespace. + *

+ * Registers event handlers for job add/update/delete events and creates custom indexes by job-name + * and overlord-namespace for efficient job lookup and filtering. + *

+ */ + private SharedIndexInformer setupJobInformer(String namespace) + { + SharedIndexInformer jobInformer = + baseClient.getClient().batch() + .v1() + .jobs() + .inNamespace(namespace) + .withLabel(DruidK8sConstants.LABEL_KEY) + .inform( + new InformerEventHandler<>( + (job, eventType) -> { + log.debug("Job[%s] got %s", job.getMetadata().getName(), eventType.name()); + eventNotifier.notifyJobChange(job.getMetadata().getName(), job); + } + ), informerResyncPeriodMillis + ); + + Function> overlordNamespaceIndexer = job -> { + if (job.getMetadata() != null && job.getMetadata().getLabels() != null) { + String overlordNamespace = job.getMetadata().getLabels().get(DruidK8sConstants.OVERLORD_NAMESPACE_KEY); + if (overlordNamespace != null) { + return Collections.singletonList(overlordNamespace); + } + } + return Collections.emptyList(); + }; + + Function> jobNameIndexer = job -> { + if (job.getMetadata() != null && job.getMetadata().getName() != null) { + return Collections.singletonList(job.getMetadata().getName()); + } + return Collections.emptyList(); + }; + + Map>> customJobIndexers = new HashMap<>(); + customJobIndexers.put(OVERLORD_NAMESPACE_INDEX, overlordNamespaceIndexer); + customJobIndexers.put(JOB_NAME_INDEX, jobNameIndexer); + + jobInformer.addIndexers(customJobIndexers); + + return jobInformer; + } + + /** + * Utility method to only notify pod changes for pods that are part of indexing jobs. + */ + private void notifyPodChange(Pod pod) + { + if (pod.getMetadata() != null && pod.getMetadata().getLabels() != null) { + String jobName = pod.getMetadata().getLabels().get("job-name"); + if (jobName != null) { + eventNotifier.notifyPodChange(jobName, pod); + } + } + } + + public CompletableFuture waitForJobChange(String jobName) + { + return eventNotifier.waitForJobChange(jobName); + } + + public CompletableFuture waitForPodChange(String jobName) + { + return eventNotifier.waitForPodChange(jobName); + } + + public void cancelJobWatcher(String jobName) + { + eventNotifier.cancelJobWatcher(jobName); + } + + public void cancelPodWatcher(String jobName) + { + eventNotifier.cancelPodWatcher(jobName); + } + + public long getInformerResyncPeriodMillis() + { + return informerResyncPeriodMillis; + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventHandler.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventHandler.java new file mode 100644 index 000000000000..0e553e27bd96 --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; + +import java.util.function.BiConsumer; + +/** + * Implementation of ResourceEventHandler that simplifies event handling + * by accepting a single lambda BiConsumer for all event types (add, update, delete). + * + * @param The Kubernetes resource type (e.g., Pod, Job) + */ +public class InformerEventHandler implements ResourceEventHandler +{ + private final BiConsumer eventConsumer; + + public InformerEventHandler(BiConsumer eventConsumer) + { + this.eventConsumer = eventConsumer; + } + + @Override + public void onAdd(T resource) + { + eventConsumer.accept(resource, InformerEventType.ADD); + } + + @Override + public void onUpdate(T oldResource, T newResource) + { + eventConsumer.accept(newResource, InformerEventType.UPDATE); + } + + @Override + public void onDelete(T resource, boolean deletedFinalStateUnknown) + { + eventConsumer.accept(resource, InformerEventType.DELETE); + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventType.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventType.java new file mode 100644 index 000000000000..001b534670d5 --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/InformerEventType.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +/** + * Event types for Kubernetes informer resource events. + */ +public enum InformerEventType +{ + ADD, + UPDATE, + DELETE +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java index b06e8efb824b..6f30e384b244 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClient.java @@ -46,14 +46,22 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +/** + * A KubernetesPeonClient implementation that directly queries the Kubernetes API server for all read and write + * operations on a per-task basis. + *

+ * This implementation does not use caching and may put more load on the Kubernetes API server compared to + * {@link CachingKubernetesPeonClient}, especially when many tasks are running concurrently. + *

+ */ public class KubernetesPeonClient { private static final EmittingLogger log = new EmittingLogger(KubernetesPeonClient.class); - private final KubernetesClientApi clientApi; - private final String namespace; - private final String overlordNamespace; - private final boolean debugJobs; + protected final KubernetesClientApi clientApi; + protected final String namespace; + protected final String overlordNamespace; + protected final boolean debugJobs; private final ServiceEmitter emitter; public KubernetesPeonClient( @@ -66,21 +74,11 @@ public KubernetesPeonClient( { this.clientApi = clientApi; this.namespace = namespace; - this.overlordNamespace = overlordNamespace; + this.overlordNamespace = overlordNamespace == null ? "" : overlordNamespace; this.debugJobs = debugJobs; this.emitter = emitter; } - public KubernetesPeonClient( - KubernetesClientApi clientApi, - String namespace, - boolean debugJobs, - ServiceEmitter emitter - ) - { - this(clientApi, namespace, "", debugJobs, emitter); - } - public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, TimeUnit timeUnit) throws IllegalStateException { long start = System.currentTimeMillis(); @@ -92,23 +90,41 @@ public Pod launchPeonJobAndWaitForStart(Job job, Task task, long howLong, TimeUn createK8sJobWithRetries(job); log.info("Submitted job[%s] for task[%s]. Waiting for POD to launch.", jobName, task.getId()); - // Wait for the pod to be available - Pod mainPod = getPeonPodWithRetries(jobName); - log.info("Pod for job[%s] launched for task[%s]. Waiting for pod to be in running state.", jobName, task.getId()); - - // Wait for the pod to be in state running, completed, or failed. - Pod result = waitForPodResultWithRetries(mainPod, howLong, timeUnit); + Pod result = waitUntilPeonPodCreatedAndReady(jobName, howLong, timeUnit); if (result == null) { - throw new ISE("K8s pod for the task [%s] appeared and disappeared. It can happen if the task was canceled", task.getId()); + throw new ISE("K8s pod for the task[%s] appeared and disappeared. It can happen if the task was canceled", task.getId()); } - log.info("Pod for job[%s] is in state [%s] for task[%s].", jobName, result.getStatus().getPhase(), task.getId()); + log.info("Pod for job[%s] is in state[%s] for task[%s].", jobName, result.getStatus().getPhase(), task.getId()); long duration = System.currentTimeMillis() - start; emitK8sPodMetrics(task, "k8s/peon/startup/time", duration); return result; }); } + /** + * Waits until a pod for the given job is created and ready to be monitored. + *

+ * A pod can appear and dissapear in some cases, such as the task being canceled. In this case, null is returned and + * the caller should handle accordingly. + *

+ * + * @param jobName the name of the job whose pod we're waiting for + * @param howLong the maximum time to wait + * @param timeUnit the time unit for the timeout + * @return the {@link Pod} which was waited for or null if the pod appeared and dissapeared + * @throws DruidException if the pod never appears within the timeout period + */ + protected Pod waitUntilPeonPodCreatedAndReady(String jobName, long howLong, TimeUnit timeUnit) + { + // Wait for the pod to be available + Pod mainPod = getPeonPodWithRetries(jobName); + log.info("Pod for job[%s] launched. Waiting for pod to be in running state.", jobName); + + // Wait for the pod to be in state running, completed, or failed. + return waitForPodResultWithRetries(mainPod, howLong, timeUnit); + } + public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, TimeUnit unit) { return clientApi.executeRequest(client -> { @@ -119,18 +135,18 @@ public JobResponse waitForPeonJobCompletion(K8sTaskId taskId, long howLong, Time .withName(taskId.getK8sJobName()) .waitUntilCondition( x -> (x == null) || (x.getStatus() != null && x.getStatus().getActive() == null - && (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() != null)), + && (x.getStatus().getFailed() != null || x.getStatus().getSucceeded() != null)), howLong, unit ); if (job == null) { - log.info("K8s job for the task [%s] was not found. It can happen if the task was canceled", taskId); + log.info("K8s job for the task[%s] was not found. It can happen if the task was canceled", taskId); return new JobResponse(null, PeonPhase.FAILED); } if (job.getStatus().getSucceeded() != null) { return new JobResponse(job, PeonPhase.SUCCEEDED); } - log.warn("Task %s failed with status %s", taskId, job.getStatus()); + log.warn("Task[%s] failed with status[%s]", taskId, job.getStatus()); return new JobResponse(job, PeonPhase.FAILED); }); } @@ -145,57 +161,84 @@ public boolean deletePeonJob(K8sTaskId taskId) .withName(taskId.getK8sJobName()) .delete().isEmpty()); if (result) { - log.info("Cleaned up k8s job: %s", taskId); + log.info("Cleaned up k8s job[%s]", taskId); } else { - log.info("K8s job does not exist: %s", taskId); + log.info("K8s job[%s] does not exist", taskId); } return result; } else { - log.info("Not cleaning up job %s due to flag: debugJobs=true", taskId); + log.info("Not cleaning up job[%s] due to flag: debugJobs=true", taskId); return true; } } + /** + * Get a LogWatch for the peon pod associated with the given taskId. Create it if it does not already exist. + *

+ * Any issues creating the LogWatch will be logged and an absent Optional will be returned. + *

+ * + * @return an Optional containing the {@link LogWatch} if it exists or was created. + */ public Optional getPeonLogWatcher(K8sTaskId taskId) { + Optional maybePod = getPeonPod(taskId.getK8sJobName()); + if (!maybePod.isPresent()) { + log.debug("Pod for job[%s] not found in cache, cannot watch logs", taskId.getK8sJobName()); + return Optional.absent(); + } + + Pod pod = maybePod.get(); + String podName = pod.getMetadata().getName(); + KubernetesClient k8sClient = clientApi.getClient(); try { - LogWatch logWatch = k8sClient.batch() - .v1() - .jobs() - .inNamespace(namespace) - .withName(taskId.getK8sJobName()) - .inContainer("main") - .watchLog(); + LogWatch logWatch = k8sClient.pods() + .inNamespace(namespace) + .withName(podName) + .inContainer("main") + .watchLog(); if (logWatch == null) { return Optional.absent(); } return Optional.of(logWatch); } catch (Exception e) { - log.error(e, "Error watching logs from task: %s", taskId); + log.error(e, "Error watching logs from task[%s], pod[%s].", taskId, podName); return Optional.absent(); } } + /** + * Get an InputStream for the logs of the peon pod associated with the given taskId. + * + * @return an Optional containing the {@link InputStream} for the logs of the pod, if it exists and logs could be streamed, or absent otherwise. + */ public Optional getPeonLogs(K8sTaskId taskId) { + Optional maybePod = getPeonPod(taskId.getK8sJobName()); + if (!maybePod.isPresent()) { + log.debug("Pod for job[%s] not found in cache, cannot stream logs", taskId.getK8sJobName()); + return Optional.absent(); + } + + Pod pod = maybePod.get(); + String podName = pod.getMetadata().getName(); + KubernetesClient k8sClient = clientApi.getClient(); try { - InputStream logStream = k8sClient.batch() - .v1() - .jobs() - .inNamespace(namespace) - .withName(taskId.getK8sJobName()) - .inContainer("main") - .getLogInputStream(); + InputStream logStream = k8sClient.pods() + .inNamespace(namespace) + .resource(pod) + .inContainer("main") + .getLogInputStream(); if (logStream == null) { return Optional.absent(); } return Optional.of(logStream); } catch (Exception e) { - log.error(e, "Error streaming logs from task: %s", taskId); + log.error(e, "Error streaming logs for pod[%s] associated with task[%s]", podName, taskId.getOriginalTaskId()); return Optional.absent(); } } @@ -244,7 +287,7 @@ public int deleteCompletedPeonJobsOlderThan(long howFarBack, TimeUnit timeUnit) .delete().isEmpty()) { numDeleted.incrementAndGet(); } else { - log.error("Failed to delete job %s", x.getMetadata().getName()); + log.error("Failed to delete job[%s]", x.getMetadata().getName()); } }); return numDeleted.get(); diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java new file mode 100644 index 000000000000..4d5f366d7826 --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifier.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Manages event notifications for Kubernetes resources (Jobs and Pods). + *

+ * Allows tasks to wait for specific resource changes without polling, improving efficiency and responsiveness. + * Critical component of {@link CachingKubernetesPeonClient} functionality. + *

+ *

+ * This implementation assumes only one waiter per job/pod at a time. If a new waiter is registered for a job that + * already has one, the previous waiter will be cancelled. + *

+ */ +public class KubernetesResourceEventNotifier +{ + private static final EmittingLogger log = new EmittingLogger(KubernetesResourceEventNotifier.class); + + private final ConcurrentHashMap> jobWatchers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> podWatchers = new ConcurrentHashMap<>(); + + /** + * Register to be notified when a job with the given name changes. + *

+ * IMPORTANT: Callers must call {@link #cancelJobWatcher(String)} when done waiting to avoid resource leaks. + * + * @param jobName The name of the job to watch + * @return A future that completes when the job changes + */ + public CompletableFuture waitForJobChange(String jobName) + { + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture previous = jobWatchers.put(jobName, future); + + if (previous != null && !previous.isDone()) { + log.warn("Replacing active watcher for job[%s] - multiple waiters detected", jobName); + previous.cancel(true); + } + + log.debug("Registered watcher for job[%s]", jobName); + return future; + } + + /** + * Register to be notified when a pod for the given job name changes. + *

+ * IMPORTANT: Callers must call {@link #cancelPodWatcher(String)} when done waiting to avoid resource leaks. + * + * @param jobName The job-name label value to watch for + * @return A future that completes when a matching pod changes + */ + public CompletableFuture waitForPodChange(String jobName) + { + CompletableFuture future = new CompletableFuture<>(); + CompletableFuture previous = podWatchers.put(jobName, future); + + if (previous != null && !previous.isDone()) { + log.warn("Replacing active watcher for pod with job-name [%s] - multiple waiters detected", jobName); + previous.cancel(true); + } + + log.debug("Registered watcher for pod with job-name [%s]", jobName); + return future; + } + + /** + * Cancel and remove a job watcher. Safe to call even if the future has already completed. + * + * @param jobName The name of the job to stop watching + */ + public void cancelJobWatcher(String jobName) + { + CompletableFuture future = jobWatchers.remove(jobName); + if (future != null && !future.isDone()) { + log.debug("Cancelling watcher for job[%s]", jobName); + future.cancel(true); + } + } + + /** + * Cancel and remove a pod watcher. Safe to call even if the future has already completed. + * + * @param jobName The job-name label value to stop watching + */ + public void cancelPodWatcher(String jobName) + { + CompletableFuture future = podWatchers.remove(jobName); + if (future != null && !future.isDone()) { + log.debug("Cancelling watcher for pod with job-name [%s]", jobName); + future.cancel(true); + } + } + + /** + * Notify the waiter that a job with the given name has changed. + * Completes the future and removes it from the map. + * + * @param jobName The name of the job that changed + * @param job The job that changed + */ + public void notifyJobChange(String jobName, Job job) + { + CompletableFuture future = jobWatchers.remove(jobName); + if (future != null) { + log.debug("Notifying watcher of job [%s] change", jobName); + future.complete(job); + } + } + + /** + * Notify the waiter that a pod for the given job name has changed. + * Completes the future and removes it from the map. + * + * @param jobName The job-name label value that changed + * @param pod The pod that changed + */ + public void notifyPodChange(String jobName, Pod pod) + { + CompletableFuture future = podWatchers.remove(jobName); + if (future != null) { + log.debug("Notifying watcher of pod change for job-name[%s]", jobName); + future.complete(pod); + } + } + + /** + * Cancel all pending watchers. Used during shutdown. + */ + public void cancelAll() + { + log.info("Cancelling all pending watchers"); + jobWatchers.values().forEach(f -> f.cancel(true)); + podWatchers.values().forEach(f -> f.cancel(true)); + jobWatchers.clear(); + podWatchers.clear(); + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SharedInformerCacheReader.java b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SharedInformerCacheReader.java new file mode 100644 index 000000000000..50f5ee30a8e8 --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/common/SharedInformerCacheReader.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.client.informers.cache.Indexer; + +@FunctionalInterface +public interface SharedInformerCacheReader +{ + T readFromCache(Indexer indexer); +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java index dd8960864e0e..3cf41a4ac067 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesOverlordModuleTest.java @@ -204,7 +204,6 @@ public void test_build_withoutSidecarSupport_returnsKubernetesTaskRunnerWithSing props.setProperty("druid.indexer.runner.namespace", "NAMESPACE"); injector = makeInjectorWithProperties(props, false, true); - TaskAdapter adapter = injector.getInstance(TaskAdapter.class); Assert.assertNotNull(adapter); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java index df6f81532f7d..b709bd2a02e0 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycleTest.java @@ -64,7 +64,8 @@ public class KubernetesPeonLifecycleTest extends EasyMockSupport private static final Period LOG_SAVE_TIMEOUT = new Period("PT300S"); private static final Period SHORT_LOG_SAVE_TIMEOUT = new Period("PT1S"); - @Mock KubernetesPeonClient kubernetesClient; + @Mock + KubernetesPeonClient kubernetesClient; @Mock TaskLogs taskLogs; @Mock LogWatch logWatch; diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java index a67ab70a0a8a..96aec79b68fb 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerFactoryTest.java @@ -21,11 +21,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import org.apache.druid.common.config.ConfigManager; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.k8s.overlord.common.DruidKubernetesCachingClient; import org.apache.druid.k8s.overlord.common.DruidKubernetesClient; import org.apache.druid.k8s.overlord.common.K8sTaskId; import org.apache.druid.k8s.overlord.common.httpclient.vertx.DruidKubernetesVertxHttpClientConfig; @@ -48,6 +50,7 @@ public class KubernetesTaskRunnerFactoryTest private TaskLogs taskLogs; private DruidKubernetesClient druidKubernetesClient; + private DruidKubernetesCachingClient druidKubernetesCachingClient; @Mock private ServiceEmitter emitter; private TaskAdapter taskAdapter; @Mock private ConfigManager configManager; @@ -60,8 +63,12 @@ public void setup() .withCapacity(1) .build(); taskLogs = new NoopTaskLogs(); + + Config config = new ConfigBuilder().build(); + druidKubernetesClient = - new DruidKubernetesClient(new DruidKubernetesVertxHttpClientFactory(new DruidKubernetesVertxHttpClientConfig()), new ConfigBuilder().build()); + new DruidKubernetesClient(new DruidKubernetesVertxHttpClientFactory(new DruidKubernetesVertxHttpClientConfig()), config); + druidKubernetesCachingClient = null; taskAdapter = new TestTaskAdapter(); kubernetesTaskRunnerConfig = new KubernetesTaskRunnerEffectiveConfig(kubernetesTaskRunnerStaticConfig, () -> null); configManager = EasyMock.createNiceMock(ConfigManager.class); @@ -79,7 +86,8 @@ public void test_get_returnsSameKuberentesTaskRunner_asBuild() druidKubernetesClient, emitter, taskAdapter, - configManager + configManager, + druidKubernetesCachingClient ); KubernetesTaskRunner expectedRunner = factory.build(); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClientTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClientTest.java new file mode 100644 index 000000000000..0f1bfb5dd2e4 --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/CachingKubernetesPeonClientTest.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import com.google.common.base.Optional; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import io.fabric8.kubernetes.api.model.batch.v1.JobBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; +import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.net.HttpURLConnection; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@EnableKubernetesMockClient(crud = true) +public class CachingKubernetesPeonClientTest +{ + private static final String NAMESPACE = "test-namespace"; + private static final String OVERLORD_NAMESPACE = "overlord-test"; + private static final String JOB_NAME = "test-job-abc123"; + private static final String POD_NAME = "test-job-abc123-pod"; + + private KubernetesClient client; + private KubernetesMockServer server; + private CachingKubernetesPeonClient peonClient; + private StubServiceEmitter serviceEmitter; + private TestCachingKubernetesClient cachingClient; + + @BeforeEach + public void setup() throws Exception + { + serviceEmitter = new StubServiceEmitter("service", "host"); + + // Set up real informers with the mock client + TestKubernetesClient clientApi = new TestKubernetesClient(client, NAMESPACE); + + cachingClient = new TestCachingKubernetesClient(clientApi, NAMESPACE); + + peonClient = new CachingKubernetesPeonClient(cachingClient, NAMESPACE, "", false, serviceEmitter); + } + + @AfterEach + public void teardown() + { + if (cachingClient != null) { + cachingClient.stop(); + } + } + + @Test + public void test_getPeonPod_withPodInCache_returnsPresentOptional() throws Exception + { + // Create pod in mock server + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels("job-name", JOB_NAME) + .endMetadata() + .withNewStatus() + .withPodIP("10.0.0.1") + .endStatus() + .build(); + + client.pods().inNamespace(NAMESPACE).resource(pod).create(); + + // Wait for informer to sync + cachingClient.waitForSync(); + + // Query from cache + Optional result = peonClient.getPeonPod(JOB_NAME); + + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(POD_NAME, result.get().getMetadata().getName()); + } + + @Test + public void test_getPeonPod_withoutPodInCache_returnsAbsentOptional() throws Exception + { + // Wait for informer to sync (empty cache) + cachingClient.waitForSync(); + + Optional result = peonClient.getPeonPod(JOB_NAME); + + Assertions.assertFalse(result.isPresent()); + } + + @Test + public void test_getPeonPod_withMultiplePodsForSameJob_returnsFirstOne() throws Exception + { + Pod pod1 = new PodBuilder() + .withNewMetadata() + .withName("pod-1") + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels("job-name", JOB_NAME) + .endMetadata() + .build(); + + Pod pod2 = new PodBuilder() + .withNewMetadata() + .withName("pod-2") + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels("job-name", JOB_NAME) + .endMetadata() + .build(); + + client.pods().inNamespace(NAMESPACE).resource(pod1).create(); + client.pods().inNamespace(NAMESPACE).resource(pod2).create(); + + cachingClient.waitForSync(); + + Optional result = peonClient.getPeonPod(JOB_NAME); + + Assertions.assertTrue(result.isPresent()); + // Should return one of them (order may vary) + String podName = result.get().getMetadata().getName(); + Assertions.assertTrue("pod-1".equals(podName) || "pod-2".equals(podName)); + } + + @Test + public void test_getPeonJob_withJobInCache_returnsPresentOptional() throws Exception + { + Job job = new JobBuilder() + .withNewMetadata() + .withName(JOB_NAME) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create(); + + cachingClient.waitForSync(); + + Optional result = peonClient.getPeonJob(JOB_NAME); + + Assertions.assertTrue(result.isPresent()); + Assertions.assertEquals(JOB_NAME, result.get().getMetadata().getName()); + } + + @Test + public void test_getPeonJob_withoutJobInCache_returnsAbsentOptional() throws Exception + { + cachingClient.waitForSync(); + + Optional result = peonClient.getPeonJob(JOB_NAME); + + Assertions.assertFalse(result.isPresent()); + } + + @Test + public void test_getPeonJobs_withoutOverlordNamespace_returnsAllJobsFromCache() throws Exception + { + Job job1 = new JobBuilder() + .withNewMetadata() + .withName("job-1") + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .build(); + + Job job2 = new JobBuilder() + .withNewMetadata() + .withName("job-2") + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job1).create(); + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job2).create(); + + cachingClient.waitForSync(); + + List jobs = peonClient.getPeonJobs(); + + Assertions.assertEquals(2, jobs.size()); + } + + @Test + public void test_getPeonJobs_withOverlordNamespace_returnsFilteredJobs() throws Exception + { + peonClient = new CachingKubernetesPeonClient(cachingClient, NAMESPACE, OVERLORD_NAMESPACE, false, serviceEmitter); + + Job matchingJob = new JobBuilder() + .withNewMetadata() + .withName("matching-job") + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels(DruidK8sConstants.OVERLORD_NAMESPACE_KEY, OVERLORD_NAMESPACE) + .endMetadata() + .build(); + + Job nonMatchingJob = new JobBuilder() + .withNewMetadata() + .withName("non-matching-job") + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels(DruidK8sConstants.OVERLORD_NAMESPACE_KEY, "other-namespace") + .endMetadata() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(matchingJob).create(); + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(nonMatchingJob).create(); + + cachingClient.waitForSync(); + + List jobs = peonClient.getPeonJobs(); + + Assertions.assertEquals(1, jobs.size()); + Assertions.assertEquals("matching-job", jobs.get(0).getMetadata().getName()); + } + + @Test + public void test_getPeonJobs_whenCacheEmpty_returnsEmptyList() throws Exception + { + cachingClient.waitForSync(); + + List jobs = peonClient.getPeonJobs(); + + Assertions.assertEquals(0, jobs.size()); + } + + @Test + public void test_waitForPeonJobCompletion_jobSucceeds() throws Exception + { + // Create job in running state + K8sTaskId taskId = new K8sTaskId("", "original-task-id"); + Job job = new JobBuilder() + .withNewMetadata() + .withName(taskId.getK8sJobName()) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .withNewStatus() + .withActive(1) + .endStatus() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create(); + cachingClient.waitForSync(); + + + // Start waiting in background + CompletableFuture futureResponse = CompletableFuture.supplyAsync(() -> + peonClient.waitForPeonJobCompletion(taskId, 60, TimeUnit.SECONDS) + ); + + // Give it a moment to start waiting + Thread.sleep(500); + + // Update job to succeeded state + Job succeededJob = new JobBuilder() + .withNewMetadata() + .withName(taskId.getK8sJobName()) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .withNewStatus() + .withSucceeded(1) + .withActive(0) + .endStatus() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(succeededJob).update(); + + // Wait for response + JobResponse response = futureResponse.get(60, TimeUnit.SECONDS); + + Assertions.assertEquals(PeonPhase.SUCCEEDED, response.getPhase()); + Assertions.assertNotNull(response.getJob()); + } + + @Test + public void test_waitUntilPeonPodCreatedAndReady_podBecomesReady() throws Exception + { + // Create pod without IP (not ready) + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels("job-name", JOB_NAME) + .endMetadata() + .withNewStatus() + .endStatus() + .build(); + + client.pods().inNamespace(NAMESPACE).resource(pod).create(); + cachingClient.waitForSync(); + + // Start waiting for pod to be ready in background + CompletableFuture futurePod = CompletableFuture.supplyAsync(() -> + peonClient.waitUntilPeonPodCreatedAndReady(JOB_NAME, 10, TimeUnit.SECONDS) + ); + + // Give it a moment to start waiting + Thread.sleep(500); + + // Update pod with IP (becomes ready) + Pod readyPod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels("job-name", JOB_NAME) + .endMetadata() + .withNewStatus() + .withPhase("Running") + .withPodIP("10.0.0.1") + .endStatus() + .build(); + + client.pods().inNamespace(NAMESPACE).resource(readyPod).update(); + + // Wait for result + Pod result = futurePod.get(5, TimeUnit.SECONDS); + + Assertions.assertNotNull(result); + Assertions.assertEquals("10.0.0.1", result.getStatus().getPodIP()); + } + + @Test + public void test_waitUntilPeonPodCreatedAndReady_timeoutWhenPodNotReady() throws Exception + { + // Create pod without IP (never becomes ready) + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels("job-name", JOB_NAME) + .endMetadata() + .withNewStatus() + .endStatus() + .build(); + + client.pods().inNamespace(NAMESPACE).resource(pod).create(); + cachingClient.waitForSync(); + + // Wait for pod to be ready with short timeout + Pod result = peonClient.waitUntilPeonPodCreatedAndReady(JOB_NAME, 1, TimeUnit.SECONDS); + + // Should return null on timeout + Assertions.assertNull(result); + } + + @Test + public void test_waitUntilPeonPodCreatedAndReady_returnNullWhenPodNeverCreated() throws Exception + { + cachingClient.waitForSync(); + + Assertions.assertNull(peonClient.waitUntilPeonPodCreatedAndReady(JOB_NAME, 1, TimeUnit.SECONDS)); + } + + @Test + public void test_waitForPeonJobCompletion_timeoutWhenJobNeverCompletes() throws Exception + { + // Create job that stays in running state + K8sTaskId taskId = new K8sTaskId("", "timeout-task-id"); + Job job = new JobBuilder() + .withNewMetadata() + .withName(taskId.getK8sJobName()) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .withNewStatus() + .withActive(1) + .endStatus() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create(); + cachingClient.waitForSync(); + + // Wait with short timeout - job never completes + JobResponse response = peonClient.waitForPeonJobCompletion(taskId, 500, TimeUnit.MILLISECONDS); + + // Should return FAILED phase on timeout + Assertions.assertEquals(PeonPhase.FAILED, response.getPhase()); + Assertions.assertNull(response.getJob()); + } + + @Test + public void test_waitForPeonJobCompletion_jobFails() throws Exception + { + // Create job in running state + K8sTaskId taskId = new K8sTaskId("", "failing-task-id"); + Job job = new JobBuilder() + .withNewMetadata() + .withName(taskId.getK8sJobName()) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .withNewStatus() + .withActive(1) + .endStatus() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create(); + cachingClient.waitForSync(); + + // Start waiting in background + CompletableFuture futureResponse = CompletableFuture.supplyAsync(() -> + peonClient.waitForPeonJobCompletion(taskId, 60, TimeUnit.SECONDS) + ); + + // Give it a moment to start waiting + Thread.sleep(500); + + // Update job to failed state + Job failedJob = new JobBuilder() + .withNewMetadata() + .withName(taskId.getK8sJobName()) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .withNewStatus() + .withFailed(1) + .withActive(0) + .endStatus() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(failedJob).update(); + + // Wait for response + JobResponse response = futureResponse.get(60, TimeUnit.SECONDS); + + Assertions.assertEquals(PeonPhase.FAILED, response.getPhase()); + Assertions.assertNotNull(response.getJob()); + } + + @Test + public void test_waitForPeonJobCompletion_jobGetsDeleted() throws Exception + { + // Create job in running state + K8sTaskId taskId = new K8sTaskId("", "deleted-task-id"); + Job job = new JobBuilder() + .withNewMetadata() + .withName(taskId.getK8sJobName()) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .withNewStatus() + .withActive(1) + .endStatus() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create(); + cachingClient.waitForSync(); + + // Start waiting in background + CompletableFuture futureResponse = CompletableFuture.supplyAsync(() -> + peonClient.waitForPeonJobCompletion(taskId, 60, TimeUnit.SECONDS) + ); + + // Give it a moment to start waiting + Thread.sleep(500); + + // Delete the job (simulates task cancellation/shutdown) + client.batch().v1().jobs().inNamespace(NAMESPACE).withName(taskId.getK8sJobName()).delete(); + + // Wait for response + JobResponse response = futureResponse.get(10, TimeUnit.SECONDS); + + // Should detect deletion and return FAILED + Assertions.assertEquals(PeonPhase.FAILED, response.getPhase()); + Assertions.assertNull(response.getJob()); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS) + public void test_waitForPeonJobCompletion_jobDeletedBeforeSeenInCache() throws Exception + { + // Create job + K8sTaskId taskId = new K8sTaskId("", "quick-delete-task-id"); + Job job = new JobBuilder() + .withNewMetadata() + .withName(taskId.getK8sJobName()) + .withNamespace(NAMESPACE) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .endMetadata() + .withNewStatus() + .withActive(1) + .endStatus() + .build(); + + client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create(); + + // Delete immediately before informer syncs + client.batch().v1().jobs().inNamespace(NAMESPACE).withName(taskId.getK8sJobName()).delete(); + + cachingClient.waitForSync(); + + JobResponse response = peonClient.waitForPeonJobCompletion(taskId, 10, TimeUnit.SECONDS); + + // Should timeout or detect job was never seen and return FAILED + Assertions.assertEquals(PeonPhase.FAILED, response.getPhase()); + } + + @Test + void test_getPeonLogsWatcher_withJob_returnsWatchLogInOptional() + throws InterruptedException, TimeoutException, ExecutionException + { + K8sTaskId taskId = new K8sTaskId("", "id"); + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .addToLabels(DruidK8sConstants.LABEL_KEY, "true") + .addToLabels("job-name", taskId.getK8sJobName()) + .endMetadata() + .build(); + + CompletableFuture podFuture = cachingClient.waitForPodChange(taskId.getK8sJobName()); + client.pods().inNamespace(NAMESPACE).resource(pod).create(); + podFuture.get(5, TimeUnit.SECONDS); + + server.expect().get() + .withPath("/api/v1/namespaces/namespace/pods/id/log?pretty=false&container=main") + .andReturn(HttpURLConnection.HTTP_OK, "data") + .once(); + + Optional maybeLogWatch = peonClient.getPeonLogWatcher(taskId); + Assertions.assertTrue(maybeLogWatch.isPresent()); + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java index 6bc7b4d283a9..7463c28805d2 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesPeonClientTest.java @@ -63,9 +63,9 @@ public class KubernetesPeonClientTest @BeforeEach public void setup() { - clientApi = new TestKubernetesClient(this.client); + clientApi = new TestKubernetesClient(this.client, NAMESPACE); serviceEmitter = new StubServiceEmitter("service", "host"); - instance = new KubernetesPeonClient(clientApi, NAMESPACE, false, serviceEmitter); + instance = new KubernetesPeonClient(clientApi, NAMESPACE, null, false, serviceEmitter); } @Test @@ -148,7 +148,7 @@ void test_launchPeonJobAndWaitForStart_withPendingPod_throwIllegalStateException () -> instance.launchPeonJobAndWaitForStart(job, NoopTask.create(), 1, TimeUnit.SECONDS) ); } - + @Test void test_waitForPeonJobCompletion_withSuccessfulJob_returnsJobResponseWithJobAndSucceededPeonPhase() { @@ -218,10 +218,27 @@ void test_deletePeonJob_withJob_returnsTrue() Job job = new JobBuilder() .withNewMetadata() .withName(KUBERNETES_JOB_NAME) + .withUid("job-uid-123") + .endMetadata() + .build(); + + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .addToLabels("job-name", KUBERNETES_JOB_NAME) + .addNewOwnerReference() + .withApiVersion("batch/v1") + .withKind("Job") + .withName(KUBERNETES_JOB_NAME) + .withUid("job-uid-123") + .withController(true) + .withBlockOwnerDeletion(true) + .endOwnerReference() .endMetadata() .build(); client.batch().v1().jobs().inNamespace(NAMESPACE).resource(job).create(); + client.pods().inNamespace(NAMESPACE).resource(pod).create(); Assertions.assertTrue(instance.deletePeonJob(new K8sTaskId(TASK_NAME_PREFIX, ID))); } @@ -236,8 +253,9 @@ void test_deletePeonJob_withoutJob_returnsFalse() void test_deletePeonJob_withJob_withDebugJobsTrue_skipsDelete() { KubernetesPeonClient instance = new KubernetesPeonClient( - new TestKubernetesClient(this.client), + new TestKubernetesClient(this.client, NAMESPACE), NAMESPACE, + null, true, serviceEmitter ); @@ -261,8 +279,9 @@ void test_deletePeonJob_withJob_withDebugJobsTrue_skipsDelete() void test_deletePeonJob_withoutJob_withDebugJobsTrue_skipsDelete() { KubernetesPeonClient instance = new KubernetesPeonClient( - new TestKubernetesClient(this.client), + new TestKubernetesClient(this.client, NAMESPACE), NAMESPACE, + null, true, serviceEmitter ); @@ -273,44 +292,14 @@ void test_deletePeonJob_withoutJob_withDebugJobsTrue_skipsDelete() @Test void test_getPeonLogs_withJob_returnsInputStreamInOptional() { - server.expect().get() - .withPath("/apis/batch/v1/namespaces/namespace/jobs/" + KUBERNETES_JOB_NAME) - .andReturn(HttpURLConnection.HTTP_OK, new JobBuilder() - .withNewMetadata() - .withName(KUBERNETES_JOB_NAME) - .withUid("uid") - .endMetadata() - .withNewSpec() - .withNewTemplate() - .withNewSpec() - .addNewContainer() - .withName("main") - .endContainer() - .endSpec() - .endTemplate() - .endSpec() - .build() - ).once(); + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .addToLabels("job-name", KUBERNETES_JOB_NAME) + .endMetadata() + .build(); - server.expect().get() - .withPath("/api/v1/namespaces/namespace/pods?labelSelector=controller-uid%3Duid") - .andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder() - .addNewItem() - .withNewMetadata() - .withName(POD_NAME) - .addNewOwnerReference() - .withUid("uid") - .withController(true) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .addNewContainer() - .withName("main") - .endContainer() - .endSpec() - .endItem() - .build() - ).once(); + client.pods().inNamespace(NAMESPACE).resource(pod).create(); server.expect().get() .withPath("/api/v1/namespaces/namespace/pods/id/log?pretty=false&container=main") @@ -583,7 +572,7 @@ void test_getPeonPodWithRetries_withPod_returnsPod() .build() ).once(); - Pod pod = instance.getPeonPodWithRetries(new K8sTaskId(TASK_NAME_PREFIX, ID).getK8sJobName()); + Pod pod = instance.getPeonPodWithRetries(clientApi.getClient(), new K8sTaskId(TASK_NAME_PREFIX, ID).getK8sJobName(), 0, 2); Assertions.assertNotNull(pod); } @@ -642,44 +631,14 @@ void test_getPeonPodWithRetries_withoutPod_noRestartForBlacklistedEvent_raisesKu @Test void test_getPeonLogsWatcher_withJob_returnsWatchLogInOptional() { - server.expect().get() - .withPath("/apis/batch/v1/namespaces/namespace/jobs/" + KUBERNETES_JOB_NAME) - .andReturn(HttpURLConnection.HTTP_OK, new JobBuilder() - .withNewMetadata() - .withName(KUBERNETES_JOB_NAME) - .withUid("uid") - .endMetadata() - .withNewSpec() - .withNewTemplate() - .withNewSpec() - .addNewContainer() - .withName("main") - .endContainer() - .endSpec() - .endTemplate() - .endSpec() - .build() - ).once(); + Pod pod = new PodBuilder() + .withNewMetadata() + .withName(POD_NAME) + .addToLabels("job-name", KUBERNETES_JOB_NAME) + .endMetadata() + .build(); - server.expect().get() - .withPath("/api/v1/namespaces/namespace/pods?labelSelector=controller-uid%3Duid") - .andReturn(HttpURLConnection.HTTP_OK, new PodListBuilder() - .addNewItem() - .withNewMetadata() - .withName(POD_NAME) - .addNewOwnerReference() - .withUid("uid") - .withController(true) - .endOwnerReference() - .endMetadata() - .withNewSpec() - .addNewContainer() - .withName("main") - .endContainer() - .endSpec() - .endItem() - .build() - ).once(); + client.pods().inNamespace(NAMESPACE).resource(pod).create(); server.expect().get() .withPath("/api/v1/namespaces/namespace/pods/id/log?pretty=false&container=main") @@ -751,7 +710,7 @@ void test_createK8sJobWithRetries_withNonRetryableException_failsImmediately() // Should fail immediately without retries DruidException e = Assertions.assertThrows( DruidException.class, - () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0, 5) + () -> instance.createK8sJobWithRetries(job) ); // Verify the error message contains our job name @@ -777,7 +736,7 @@ void test_createK8sJobWithRetries_withJobAlreadyExists_succeedsGracefully() // Should succeed gracefully without throwing exception Assertions.assertDoesNotThrow( - () -> instance.createK8sJobWithRetries(clientApi.getClient(), job, 0, 5) + () -> instance.createK8sJobWithRetries(job) ); } @@ -798,11 +757,11 @@ void test_waitForPodResultWithRetries_withSuccessfulPodReady_returnsPod() // Should return the pod successfully Pod result = instance.waitForPodResultWithRetries( - clientApi.getClient(), - pod, - 1, - TimeUnit.SECONDS, - 0, + clientApi.getClient(), + pod, + 1, + TimeUnit.SECONDS, + 0, 3 ); @@ -823,23 +782,18 @@ void test_waitForPodResultWithRetries_withNonRetryableFailure_throwsDruidExcepti .endStatus() .build(); - String podPath = "/api/v1/namespaces/" + NAMESPACE + "/pods/" + POD_NAME; - - // Mock server to return the pod without IP, causing timeout - server.expect().get() - .withPath(podPath + "?watch=true") - .andReturn(HttpURLConnection.HTTP_INTERNAL_ERROR, "Internal server error") - .once(); + // Create the pod in the mock client without IP - it will remain unready + client.pods().inNamespace(NAMESPACE).resource(pod).create(); // Should throw DruidException after failure DruidException e = Assertions.assertThrows( DruidException.class, () -> instance.waitForPodResultWithRetries( - clientApi.getClient(), - pod, - 1, + clientApi.getClient(), + pod, + 1, TimeUnit.MILLISECONDS, // Very short timeout to force failure - 0, + 0, 1 ) ); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifierTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifierTest.java new file mode 100644 index 000000000000..46dea8b2b60e --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesResourceEventNotifierTest.java @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class KubernetesResourceEventNotifierTest +{ + private KubernetesResourceEventNotifier notifier; + + @BeforeEach + public void setUp() + { + notifier = new KubernetesResourceEventNotifier(); + } + + @AfterEach + public void tearDown() + { + notifier.cancelAll(); + } + + @Test + public void testWaitForJobChange_CompletesOnNotification() throws Exception + { + String jobName = "test-job"; + Job mockJob = createMockJob(jobName); + + CompletableFuture future = notifier.waitForJobChange(jobName); + assertFalse(future.isDone()); + + notifier.notifyJobChange(jobName, mockJob); + + Job result = future.get(1, TimeUnit.SECONDS); + assertSame(mockJob, result); + assertTrue(future.isDone()); + } + + @Test + public void testWaitForPodChange_CompletesOnNotification() throws Exception + { + String jobName = "test-job"; + Pod mockPod = createMockPod(jobName); + + CompletableFuture future = notifier.waitForPodChange(jobName); + assertFalse(future.isDone()); + + notifier.notifyPodChange(jobName, mockPod); + + Pod result = future.get(1, TimeUnit.SECONDS); + assertSame(mockPod, result); + assertTrue(future.isDone()); + } + + @Test + public void testNotifyWithoutWatchers_NoException() + { + String jobName = "test-job"; + Job mockJob = createMockJob(jobName); + + // Should not throw exception + notifier.notifyJobChange(jobName, mockJob); + notifier.notifyPodChange(jobName, createMockPod(jobName)); + } + + @Test + public void testDifferentJobNames_IndependentNotifications() throws Exception + { + String jobName1 = "job-1"; + String jobName2 = "job-2"; + Job mockJob1 = createMockJob(jobName1); + Job mockJob2 = createMockJob(jobName2); + + CompletableFuture future1 = notifier.waitForJobChange(jobName1); + CompletableFuture future2 = notifier.waitForJobChange(jobName2); + + notifier.notifyJobChange(jobName1, mockJob1); + + Job result1 = future1.get(1, TimeUnit.SECONDS); + assertSame(mockJob1, result1); + assertFalse(future2.isDone()); + + notifier.notifyJobChange(jobName2, mockJob2); + + Job result2 = future2.get(1, TimeUnit.SECONDS); + assertSame(mockJob2, result2); + } + + @Test + public void testDifferentPodJobNames_IndependentNotifications() throws Exception + { + String jobName1 = "job-1"; + String jobName2 = "job-2"; + Pod mockPod1 = createMockPod(jobName1); + Pod mockPod2 = createMockPod(jobName2); + + CompletableFuture future1 = notifier.waitForPodChange(jobName1); + CompletableFuture future2 = notifier.waitForPodChange(jobName2); + + notifier.notifyPodChange(jobName1, mockPod1); + + Pod result1 = future1.get(1, TimeUnit.SECONDS); + assertSame(mockPod1, result1); + assertFalse(future2.isDone()); + + notifier.notifyPodChange(jobName2, mockPod2); + + Pod result2 = future2.get(1, TimeUnit.SECONDS); + assertSame(mockPod2, result2); + } + + @Test + public void testCancelAll_CancelsAllPendingWatchers() + { + String jobName1 = "job-1"; + String jobName2 = "job-2"; + + CompletableFuture jobFuture1 = notifier.waitForJobChange(jobName1); + CompletableFuture jobFuture2 = notifier.waitForJobChange(jobName2); + CompletableFuture podFuture1 = notifier.waitForPodChange(jobName1); + CompletableFuture podFuture2 = notifier.waitForPodChange(jobName2); + + assertFalse(jobFuture1.isDone()); + assertFalse(jobFuture2.isDone()); + assertFalse(podFuture1.isDone()); + assertFalse(podFuture2.isDone()); + + notifier.cancelAll(); + + assertTrue(jobFuture1.isCancelled()); + assertTrue(jobFuture2.isCancelled()); + assertTrue(podFuture1.isCancelled()); + assertTrue(podFuture2.isCancelled()); + } + + @Test + public void testCancelAll_CancelledFuturesThrowException() + { + String jobName = "test-job"; + + CompletableFuture future = notifier.waitForJobChange(jobName); + notifier.cancelAll(); + + assertThrows(CancellationException.class, future::get); + } + + @Test + public void testSequentialNotifications_WatchersAreCleared() throws Exception + { + String jobName = "test-job"; + Job mockJob1 = createMockJob(jobName); + Job mockJob2 = createMockJob(jobName); + + // First notification + CompletableFuture future1 = notifier.waitForJobChange(jobName); + notifier.notifyJobChange(jobName, mockJob1); + Job result1 = future1.get(1, TimeUnit.SECONDS); + assertSame(mockJob1, result1); + + // Second notification - should require new watcher + CompletableFuture future2 = notifier.waitForJobChange(jobName); + assertFalse(future2.isDone()); + notifier.notifyJobChange(jobName, mockJob2); + Job result2 = future2.get(1, TimeUnit.SECONDS); + assertSame(mockJob2, result2); + } + + @Test + public void testJobAndPodWatchers_Independent() throws Exception + { + String jobName = "test-job"; + Job mockJob = createMockJob(jobName); + Pod mockPod = createMockPod(jobName); + + CompletableFuture jobFuture = notifier.waitForJobChange(jobName); + CompletableFuture podFuture = notifier.waitForPodChange(jobName); + + // Notify job change - should not affect pod watcher + notifier.notifyJobChange(jobName, mockJob); + Job jobResult = jobFuture.get(1, TimeUnit.SECONDS); + assertSame(mockJob, jobResult); + assertFalse(podFuture.isDone()); + + // Notify pod change + notifier.notifyPodChange(jobName, mockPod); + Pod podResult = podFuture.get(1, TimeUnit.SECONDS); + assertSame(mockPod, podResult); + } + + @Test + public void test_waitForJobChange_multipleWaitsCancelsOldFutureAndCreatesNewOne() + { + String jobName = "test-job"; + + CompletableFuture future1 = notifier.waitForJobChange(jobName); + CompletableFuture future2 = notifier.waitForJobChange(jobName); + + assertTrue(future1.isCancelled()); + assertNotEquals(future1, future2); + } + + @Test + public void test_waitForPodChange_multipleWaitsCancelsOldFutureAndCreatesNewOne() + { + String jobName = "test-job"; + + CompletableFuture future1 = notifier.waitForPodChange(jobName); + CompletableFuture future2 = notifier.waitForPodChange(jobName); + + assertTrue(future1.isCancelled()); + assertNotEquals(future1, future2); + } + + private Job createMockJob(String name) + { + Job job = new Job(); + ObjectMeta metadata = new ObjectMeta(); + metadata.setName(name); + job.setMetadata(metadata); + return job; + } + + private Pod createMockPod(String jobName) + { + Pod pod = new Pod(); + ObjectMeta metadata = new ObjectMeta(); + metadata.setName(jobName + "-pod"); + pod.setMetadata(metadata); + return pod; + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestCachingKubernetesClient.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestCachingKubernetesClient.java new file mode 100644 index 000000000000..46f3b05df73a --- /dev/null +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestCachingKubernetesClient.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.k8s.overlord.common; + +public class TestCachingKubernetesClient extends DruidKubernetesCachingClient +{ + private static final long TESTING_RESYNC_PERIOD_MS = 10L; + + public TestCachingKubernetesClient(KubernetesClientApi clientApi, String namespace) + { + super(clientApi, namespace, TESTING_RESYNC_PERIOD_MS); + } + + public void waitForSync() throws InterruptedException + { + // Give informers a bit more time to process + Thread.sleep(50L); + } +} diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java index 57be98251a9a..1f9fdcd7b6c9 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/TestKubernetesClient.java @@ -26,7 +26,7 @@ public class TestKubernetesClient implements KubernetesClientApi private final KubernetesClient client; - public TestKubernetesClient(KubernetesClient client) + public TestKubernetesClient(KubernetesClient client, String namespace) { this.client = client; } diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java index 016f3280a472..68371261e162 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/DruidPeonClientIntegrationTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.base.Optional; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodSpec; import io.fabric8.kubernetes.api.model.batch.v1.Job; @@ -90,7 +91,7 @@ public void setup() new NamedType(IndexTask.IndexTuningConfig.class, "index") ); k8sClient = new DruidKubernetesClient(new DruidKubernetesVertxHttpClientFactory(new DruidKubernetesVertxHttpClientConfig()), new ConfigBuilder().build()); - peonClient = new KubernetesPeonClient(k8sClient, "default", false, new NoopServiceEmitter()); + peonClient = new KubernetesPeonClient(k8sClient, "default", null, false, new NoopServiceEmitter()); druidNode = new DruidNode( "test", null, @@ -163,7 +164,9 @@ public void testDeployingSomethingToKind(@TempDir Path tempDir) throws Exception // now copy the task.json file from the pod and make sure its the same as our task.json we expected Path downloadPath = Paths.get(tempDir.toAbsolutePath().toString(), "task.json"); - Pod mainJobPod = peonClient.getPeonPodWithRetries(taskId.getK8sJobName()); + Optional maybeMainJobPod = peonClient.getPeonPod(taskId.getK8sJobName()); + assertTrue(maybeMainJobPod.isPresent()); + Pod mainJobPod = maybeMainJobPod.get(); k8sClient.executeRequest(client -> { client.pods() .inNamespace("default") diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java index 552d7201fd05..c61202d45849 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapterTest.java @@ -122,7 +122,7 @@ public K8sTaskAdapterTest() void testAddingLabelsAndAnnotations() throws IOException { final PodSpec podSpec = K8sTestUtils.getDummyPodSpec(); - TestKubernetesClient testClient = new TestKubernetesClient(client) + TestKubernetesClient testClient = new TestKubernetesClient(client, "namespace") { @SuppressWarnings("unchecked") @Override @@ -175,7 +175,7 @@ public PodSpec getSpec() public void serializingAndDeserializingATask() throws IOException { // given a task create a k8s job - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "test"); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("test") .build(); @@ -213,7 +213,7 @@ public void serializingAndDeserializingATask() throws IOException public void fromTask_dontSetTaskJSON() throws IOException { final PodSpec podSpec = K8sTestUtils.getDummyPodSpec(); - TestKubernetesClient testClient = new TestKubernetesClient(client) + TestKubernetesClient testClient = new TestKubernetesClient(client, "test") { @SuppressWarnings("unchecked") @Override @@ -277,7 +277,7 @@ public PodSpec getSpec() @Test public void toTask_useTaskPayloadManager() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "test"); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("test") .build(); @@ -309,7 +309,7 @@ public void toTask_useTaskPayloadManager() throws IOException @Test public void getTaskId() { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "test"); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder().build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, @@ -331,7 +331,7 @@ public void getTaskId() @Test public void getTaskId_noAnnotations() { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "test"); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder().build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, @@ -353,7 +353,7 @@ public void getTaskId_noAnnotations() @Test public void getTaskId_missingTaskIdAnnotation() { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "test"); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder().build(); K8sTaskAdapter adapter = new SingleContainerTaskAdapter( testClient, @@ -452,7 +452,7 @@ void testNoPrimaryFound() @Test void testAddingMonitors() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "test"); PeonCommandContext context = new PeonCommandContext( new ArrayList<>(), new ArrayList<>(), @@ -531,7 +531,7 @@ void testAddingMonitors() throws IOException @Test void testEphemeralStorageIsRespected() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "namespace"); Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("namespace") @@ -581,7 +581,7 @@ void testEphemeralStorageIsRespected() throws IOException @Test void testProbesRemoved() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "test"); Pod pod = K8sTestUtils.fileToResource("probesPodSpec.yaml", Pod.class); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("test") @@ -631,7 +631,7 @@ void testProbesRemoved() throws IOException @Test void testCPUResourceIsRespected() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "namespace"); Pod pod = K8sTestUtils.fileToResource("ephemeralPodSpec.yaml", Pod.class); List javaOpts = new ArrayList<>(); diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java index 477758d9ac4e..b4e76b108855 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/MultiContainerTaskAdapterTest.java @@ -86,7 +86,7 @@ public void setup() @Test public void testMultiContainerSupport() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "namespace"); Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("namespace") @@ -137,7 +137,7 @@ public void testMultiContainerSupport() throws IOException @Test public void testMultiContainerSupportWithNamedContainer() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "namespace"); Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpecOrder.yaml", Pod.class); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("namespace") @@ -191,7 +191,7 @@ public void testMultiContainerSupportWithNamedContainer() throws IOException @Test public void testOverridingPeonMonitors() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "namespace"); Pod pod = K8sTestUtils.fileToResource("podSpec.yaml", Pod.class); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("namespace") diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java index 832a7292304f..b93ab7cc3334 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/SingleContainerTaskAdapterTest.java @@ -85,7 +85,7 @@ public void setup() @Test public void testSingleContainerSupport() throws IOException { - TestKubernetesClient testClient = new TestKubernetesClient(client); + TestKubernetesClient testClient = new TestKubernetesClient(client, "namespace"); Pod pod = K8sTestUtils.fileToResource("multiContainerPodSpec.yaml", Pod.class); KubernetesTaskRunnerStaticConfig config = KubernetesTaskRunnerConfig.builder() .withNamespace("namespace") diff --git a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java index 11b538530bdd..d93ae182f4bd 100644 --- a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java +++ b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java @@ -53,6 +53,7 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.Comparator; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -230,6 +231,14 @@ public TaskStatus waitForTaskToFinish(String taskId, LatchableEmitter emitter) return getTaskStatus(taskId); } + /** + * Gets the count of tasks with the given status for the specified datasource. + */ + public int getTaskCount(String status, String dataSource) + { + return ImmutableList.copyOf((Iterator) onLeaderOverlord(o -> o.taskStatuses(status, dataSource, 100))).size(); + } + /** * Retrieves all used segments from the metadata store (or cache if applicable). */ diff --git a/website/.spelling b/website/.spelling index dee6a14beeec..a6e7cf573889 100644 --- a/website/.spelling +++ b/website/.spelling @@ -527,6 +527,7 @@ reindexing reingest reingesting reingestion +resync repo requireSSL rollup