diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java index ef02716921247..bf3d293e417fa 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java @@ -280,7 +280,8 @@ WorkerState state() { void transitionToRunning() { state = State.RUNNING; timeoutFuture = scheduler.schedule(stateChangeExecutor, - new StopWorker(workerId, false), spec.durationMs()); + new StopWorker(workerId, false), + Math.max(0, spec.endMs() - time.milliseconds())); } void transitionToStopping() { @@ -316,6 +317,12 @@ public void createWorker(long workerId, String taskId, TaskSpec spec) throws Thr "a worker with that id.", nodeName, workerId); return; } + if (worker.spec.endMs() <= time.milliseconds()) { + log.info("{}: Will not run worker {} as it has expired.", nodeName, worker); + stateChangeExecutor.submit(new HandleWorkerHalting(worker, + "worker expired", true)); + return; + } KafkaFutureImpl haltFuture = new KafkaFutureImpl<>(); haltFuture.thenApply((KafkaFuture.BaseFunction) errorString -> { if (errorString == null) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 3f0075e598a02..97ad4ae1506f3 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -49,10 +49,12 @@ import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerStarting; import org.apache.kafka.trogdor.rest.WorkerState; +import org.apache.kafka.trogdor.rest.WorkerStopping; import org.apache.kafka.trogdor.task.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -202,46 +204,58 @@ public void run() { if (log.isTraceEnabled()) { log.trace("{}: got heartbeat status {}", node.name(), agentStatus); } - // Identify workers which we think should be running, but which do not appear - // in the agent's response. We need to send startWorker requests for these. - for (Map.Entry entry : workers.entrySet()) { - Long workerId = entry.getKey(); - if (!agentStatus.workers().containsKey(workerId)) { - ManagedWorker worker = entry.getValue(); - if (worker.shouldRun) { - worker.tryCreate(); - } + handleMissingWorkers(agentStatus); + handlePresentWorkers(agentStatus); + } catch (Throwable e) { + log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); + } + } + + /** + * Identify workers which we think should be running but do not appear in the agent's response. + * We need to send startWorker requests for those + */ + private void handleMissingWorkers(AgentStatusResponse agentStatus) { + for (Map.Entry entry : workers.entrySet()) { + Long workerId = entry.getKey(); + if (!agentStatus.workers().containsKey(workerId)) { + ManagedWorker worker = entry.getValue(); + if (worker.shouldRun) { + worker.tryCreate(); } } - for (Map.Entry entry : agentStatus.workers().entrySet()) { - long workerId = entry.getKey(); - WorkerState state = entry.getValue(); - ManagedWorker worker = workers.get(workerId); - if (worker == null) { - // Identify tasks which are running, but which we don't know about. - // Add these to the NodeManager as tasks that should not be running. - log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId); - workers.put(workerId, new ManagedWorker(workerId, state.taskId(), - state.spec(), false, state)); - } else { - // Handle workers which need to be stopped. - if (state instanceof WorkerStarting || state instanceof WorkerRunning) { - if (!worker.shouldRun) { - worker.tryStop(); - } - } - // Notify the TaskManager if the worker state has changed. - if (worker.state.equals(state)) { - log.debug("{}: worker state is still {}", node.name(), worker.state); - } else { - log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); - worker.state = state; - taskManager.updateWorkerState(node.name(), worker.workerId, state); + } + } + + private void handlePresentWorkers(AgentStatusResponse agentStatus) { + for (Map.Entry entry : agentStatus.workers().entrySet()) { + long workerId = entry.getKey(); + WorkerState state = entry.getValue(); + ManagedWorker worker = workers.get(workerId); + if (worker == null) { + // Identify tasks which are running, but which we don't know about. + // Add these to the NodeManager as tasks that should not be running. + log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId); + workers.put(workerId, new ManagedWorker(workerId, state.taskId(), + state.spec(), false, state)); + } else { + // Handle workers which need to be stopped. + if (state instanceof WorkerStarting || state instanceof WorkerRunning) { + if (!worker.shouldRun) { + worker.tryStop(); } } + // Notify the TaskManager if the worker state has changed. + if (worker.state.equals(state)) { + log.debug("{}: worker state is still {}", node.name(), worker.state); + } else { + log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); + if (state instanceof WorkerDone || state instanceof WorkerStopping) + worker.shouldRun = false; + worker.state = state; + taskManager.updateWorkerState(node.name(), worker.workerId, state); + } } - } catch (Throwable e) { - log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); } } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 18ff9cb727504..941656e36186e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -17,8 +17,10 @@ package org.apache.kafka.trogdor.coordinator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.LongNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -150,7 +152,13 @@ class ManagedTask { final private String id; /** - * The task specification. + * The original task specification as submitted when the task was created. + */ + final private TaskSpec originalSpec; + + /** + * The effective task specification. + * The start time will be adjusted to reflect the time when the task was submitted. */ final private TaskSpec spec; @@ -195,8 +203,10 @@ class ManagedTask { */ private String error = ""; - ManagedTask(String id, TaskSpec spec, TaskController controller, TaskStateType state) { + ManagedTask(String id, TaskSpec originalSpec, TaskSpec spec, + TaskController controller, TaskStateType state) { this.id = id; + this.originalSpec = originalSpec; this.spec = spec; this.controller = controller; this.state = state; @@ -297,7 +307,7 @@ public void createTask(final String id, TaskSpec spec) throws Throwable { try { executor.submit(new CreateTask(id, spec)).get(); - } catch (ExecutionException e) { + } catch (ExecutionException | JsonProcessingException e) { log.info("createTask(id={}, spec={}) error", id, spec, e); throw e.getCause(); } @@ -308,11 +318,15 @@ public void createTask(final String id, TaskSpec spec) */ class CreateTask implements Callable { private final String id; + private final TaskSpec originalSpec; private final TaskSpec spec; - CreateTask(String id, TaskSpec spec) { + CreateTask(String id, TaskSpec spec) throws JsonProcessingException { this.id = id; - this.spec = spec; + this.originalSpec = spec; + ObjectNode node = JsonUtil.JSON_SERDE.valueToTree(originalSpec); + node.set("startMs", new LongNode(Math.max(time.milliseconds(), originalSpec.startMs()))); + this.spec = JsonUtil.JSON_SERDE.treeToValue(node, TaskSpec.class); } @Override @@ -322,11 +336,11 @@ public Void call() throws Exception { } ManagedTask task = tasks.get(id); if (task != null) { - if (!task.spec.equals(spec)) { + if (!task.originalSpec.equals(originalSpec)) { throw new RequestConflictException("Task ID " + id + " already " + - "exists, and has a different spec " + task.spec); + "exists, and has a different spec " + task.originalSpec); } - log.info("Task {} already exists with spec {}", id, spec); + log.info("Task {} already exists with spec {}", id, originalSpec); return null; } TaskController controller = null; @@ -339,13 +353,13 @@ public Void call() throws Exception { if (failure != null) { log.info("Failed to create a new task {} with spec {}: {}", id, spec, failure); - task = new ManagedTask(id, spec, null, TaskStateType.DONE); + task = new ManagedTask(id, originalSpec, spec, null, TaskStateType.DONE); task.doneMs = time.milliseconds(); task.maybeSetError(failure); tasks.put(id, task); return null; } - task = new ManagedTask(id, spec, controller, TaskStateType.PENDING); + task = new ManagedTask(id, originalSpec, spec, controller, TaskStateType.PENDING); tasks.put(id, task); long delayMs = task.startDelayMs(time.milliseconds()); task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java index af7a76f854119..acb19f6587913 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java @@ -66,6 +66,13 @@ public final long startMs() { return startMs; } + /** + * Get the deadline time of this task in ms + */ + public final long endMs() { + return startMs + durationMs; + } + /** * Get the duration of this task in ms. */ diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java index f0ea47535818f..6c200838111c7 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.task.NoOpTaskSpec; @@ -109,6 +110,43 @@ public void testAgentGetStatus() throws Exception { agent.waitForShutdown(); } + @Test + public void testCreateExpiredWorkerIsNotScheduled() throws Exception { + long initialTimeMs = 100; + long tickMs = 15; + final boolean[] toSleep = {true}; + MockTime time = new MockTime(tickMs, initialTimeMs, 0) { + /** + * Modify sleep() to call super.sleep() every second call + * in order to avoid the endless loop in the tick() calls to the MockScheduler listener + */ + @Override + public void sleep(long ms) { + toSleep[0] = !toSleep[0]; + if (toSleep[0]) + super.sleep(ms); + } + }; + MockScheduler scheduler = new MockScheduler(time); + Agent agent = createAgent(scheduler); + AgentClient client = new AgentClient.Builder(). + maxTries(10).target("localhost", agent.port()).build(); + AgentStatusResponse status = client.status(); + assertEquals(Collections.emptyMap(), status.workers()); + new ExpectedTasks().waitFor(client); + + final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 10); + client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec)); + long actualStartTimeMs = initialTimeMs + tickMs; + long doneMs = actualStartTimeMs + 2 * tickMs; + new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo"). + workerState(new WorkerDone("foo", fooSpec, actualStartTimeMs, + doneMs, null, "worker expired")). + taskState(new TaskDone(fooSpec, actualStartTimeMs, doneMs, "worker expired", false, null)). + build()). + waitFor(client); + } + @Test public void testAgentCreateWorkers() throws Exception { MockTime time = new MockTime(0, 0, 0); @@ -171,53 +209,58 @@ public void testAgentCreateWorkers() throws Exception { @Test public void testAgentFinishesTasks() throws Exception { - MockTime time = new MockTime(0, 0, 0); + long startTimeMs = 2000; + MockTime time = new MockTime(0, startTimeMs, 0); MockScheduler scheduler = new MockScheduler(time); Agent agent = createAgent(scheduler); AgentClient client = new AgentClient.Builder(). maxTries(10).target("localhost", agent.port()).build(); new ExpectedTasks().waitFor(client); - final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2); + final NoOpTaskSpec fooSpec = new NoOpTaskSpec(startTimeMs, 2); + long fooSpecStartTimeMs = startTimeMs; client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))). + workerState(new WorkerRunning("foo", fooSpec, startTimeMs, new TextNode("active"))). build()). waitFor(client); time.sleep(1); - final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000); - client.createWorker(new CreateWorkerRequest(1, "bar", barSpec)); + long barSpecWorkerId = 1; + long barSpecStartTimeMs = startTimeMs + 1; + final NoOpTaskSpec barSpec = new NoOpTaskSpec(startTimeMs, 900000); + client.createWorker(new CreateWorkerRequest(barSpecWorkerId, "bar", barSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))). + workerState(new WorkerRunning("foo", fooSpec, fooSpecStartTimeMs, new TextNode("active"))). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))). + workerState(new WorkerRunning("bar", barSpec, barSpecStartTimeMs, new TextNode("active"))). build()). waitFor(client); time.sleep(1); + // foo task expired new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")). + workerState(new WorkerDone("foo", fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs + 2, new TextNode("done"), "")). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))). + workerState(new WorkerRunning("bar", barSpec, barSpecStartTimeMs, new TextNode("active"))). build()). waitFor(client); time.sleep(5); - client.stopWorker(new StopWorkerRequest(1)); + client.stopWorker(new StopWorkerRequest(barSpecWorkerId)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")). + workerState(new WorkerDone("foo", fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs + 2, new TextNode("done"), "")). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerDone("bar", barSpec, 1, 7, new TextNode("done"), "")). + workerState(new WorkerDone("bar", barSpec, barSpecStartTimeMs, startTimeMs + 7, new TextNode("done"), "")). build()). waitFor(client); @@ -348,7 +391,7 @@ public void testDestroyWorkers() throws Exception { maxTries(10).target("localhost", agent.port()).build(); new ExpectedTasks().waitFor(client); - final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 5); + final NoOpTaskSpec fooSpec = new NoOpTaskSpec(0, 5); client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). @@ -363,7 +406,7 @@ public void testDestroyWorkers() throws Exception { new ExpectedTasks().waitFor(client); time.sleep(1); - final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(100, 1); + final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(2, 1); client.createWorker(new CreateWorkerRequest(1, "foo", fooSpec2)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java index 9edffaa757d27..0c9aba2cc9394 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java @@ -210,17 +210,25 @@ public MiniTrogdorCluster build() throws Exception { coordinator = node.coordinator; } } - return new MiniTrogdorCluster(agents, coordinator); + return new MiniTrogdorCluster(scheduler, agents, nodes, coordinator); } } private final TreeMap agents; + private final TreeMap nodesByAgent; + private final Coordinator coordinator; - private MiniTrogdorCluster(TreeMap agents, + private final Scheduler scheduler; + + private MiniTrogdorCluster(Scheduler scheduler, + TreeMap agents, + TreeMap nodesByAgent, Coordinator coordinator) { + this.scheduler = scheduler; this.agents = agents; + this.nodesByAgent = nodesByAgent; this.coordinator = coordinator; } @@ -242,6 +250,17 @@ public CoordinatorClient coordinatorClient() { build(); } + /** + * Mimic a restart of a Trogdor agent, essentially cleaning out all of its active workers + */ + public void restartAgent(String nodeName) { + if (!agents.containsKey(nodeName)) { + throw new RuntimeException("There is no agent on node " + nodeName); + } + Builder.NodeData node = nodesByAgent.get(nodeName); + agents.put(nodeName, new Agent(node.platform, scheduler, node.agentRestServer, node.agentRestResource)); + } + public AgentClient agentClient(String nodeName) { Agent agent = agents.get(nodeName); if (agent == null) { diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index db1afac3ea484..02479511c52cc 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -154,7 +154,7 @@ public void testTaskDistribution() throws Exception { waitFor(agentClient1). waitFor(agentClient2); - NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2); + NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 7); coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo").taskState( @@ -176,15 +176,15 @@ public void testTaskDistribution() throws Exception { waitFor(agentClient1). waitFor(agentClient2); - time.sleep(2); + time.sleep(7); ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance); status2.set("node01", new TextNode("done")); status2.set("node02", new TextNode("done")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskDone(fooSpec, 11, 13, + taskState(new TaskDone(fooSpec, 11, 18, "", false, status2)). - workerState(new WorkerDone("foo", fooSpec, 11, 13, new TextNode("done"), "")). + workerState(new WorkerDone("foo", fooSpec, 11, 18, new TextNode("done"), "")). build()). waitFor(coordinatorClient). waitFor(agentClient1). @@ -211,7 +211,7 @@ public void testTaskCancellation() throws Exception { waitFor(agentClient1). waitFor(agentClient2); - NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2); + NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 7); coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()). @@ -236,13 +236,13 @@ public void testTaskCancellation() throws Exception { ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance); status2.set("node01", new TextNode("done")); status2.set("node02", new TextNode("done")); - time.sleep(1); + time.sleep(7); coordinatorClient.stopTask(new StopTaskRequest("foo")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskDone(fooSpec, 11, 12, "", + taskState(new TaskDone(fooSpec, 11, 18, "", true, status2)). - workerState(new WorkerDone("foo", fooSpec, 11, 12, new TextNode("done"), "")). + workerState(new WorkerDone("foo", fooSpec, 11, 18, new TextNode("done"), "")). build()). waitFor(coordinatorClient). waitFor(agentClient1). @@ -275,7 +275,7 @@ public void testTaskDestruction() throws Exception { waitFor(agentClient1). waitFor(agentClient2); - NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 2); + NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 12); coordinatorClient.destroyTask(new DestroyTaskRequest("foo")); coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); NoOpTaskSpec barSpec = new NoOpTaskSpec(20, 20); @@ -363,12 +363,15 @@ private static List> createPartitionLists(String[][] array) { @Test public void testNetworkPartitionFault() throws Exception { CapturingCommandRunner runner = new CapturingCommandRunner(); + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). addCoordinator("node01"). addAgent("node01"). addAgent("node02"). addAgent("node03"). commandRunner(runner). + scheduler(scheduler). build()) { CoordinatorClient coordinatorClient = cluster.coordinatorClient(); NetworkPartitionFaultSpec spec = new NetworkPartitionFaultSpec(0, Long.MAX_VALUE, @@ -496,6 +499,110 @@ public void testTasksRequest() throws Exception { } } + /** + * If an agent fails in the middle of a task and comes back up when the task is considered expired, + * we want the task to be marked as DONE and not re-sent should a second failure happen. + */ + @Test + public void testAgentFailureAndTaskExpiry() throws Exception { + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + addAgent("node02"). + scheduler(scheduler). + build()) { + CoordinatorClient coordinatorClient = cluster.coordinatorClient(); + + NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 500); + coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); + TaskState expectedState = new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build().taskState(); + + TaskState resp = coordinatorClient.task(new TaskRequest("foo")); + assertEquals(expectedState, resp); + + + time.sleep(2); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))). + workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))). + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node02")); + + cluster.restartAgent("node02"); + time.sleep(550); + // coordinator heartbeat sees that the agent is back up, re-schedules the task but the agent expires it + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskDone(fooSpec, 2, 552, "worker expired", false, null)). + workerState(new WorkerDone("foo", fooSpec, 552, 552, null, "worker expired")). + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node02")); + + cluster.restartAgent("node02"); + // coordinator heartbeat sees that the agent is back up but does not re-schedule the task as it is DONE + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskDone(fooSpec, 2, 552, "worker expired", false, null)). + // no worker states + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node02")); + } + } + + @Test + public void testTaskRequestWithOldStartMsGetsUpdated() throws Exception { + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + addAgent("node02"). + scheduler(scheduler). + build()) { + + NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 500); + time.sleep(552); + + CoordinatorClient coordinatorClient = cluster.coordinatorClient(); + NoOpTaskSpec updatedSpec = new NoOpTaskSpec(552, 500); + coordinatorClient.createTask(new CreateTaskRequest("fooSpec", fooSpec)); + TaskState expectedState = new ExpectedTaskBuilder("fooSpec").taskState( + new TaskRunning(updatedSpec, 552, new TextNode("receiving")) + ).build().taskState(); + + TaskState resp = coordinatorClient.task(new TaskRequest("fooSpec")); + assertEquals(expectedState, resp); + } + } + + @Test + public void testTaskRequestWithFutureStartMsDoesNotGetRun() throws Exception { + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + addAgent("node02"). + scheduler(scheduler). + build()) { + + NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000, 500); + time.sleep(999); + + CoordinatorClient coordinatorClient = cluster.coordinatorClient(); + coordinatorClient.createTask(new CreateTaskRequest("fooSpec", fooSpec)); + TaskState expectedState = new ExpectedTaskBuilder("fooSpec").taskState( + new TaskPending(fooSpec) + ).build().taskState(); + + TaskState resp = coordinatorClient.task(new TaskRequest("fooSpec")); + assertEquals(expectedState, resp); + } + } + @Test public void testTaskRequest() throws Exception { MockTime time = new MockTime(0, 0, 0);