From c6c4cfc8d1e9caaa9225816f0adaee6dcb2c1243 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Tue, 8 Jan 2019 12:34:23 +0200 Subject: [PATCH 1/5] KAFKA-7790: Expire Trogdor tasks This commit changes a Trogdor agent/coordinator's behavior to not run tasks that have expired. We define an expired task as one whose sum of `startedMs` and `durationMs` is less than the current time in milliseconds. --- .../kafka/trogdor/agent/WorkerManager.java | 10 +++ .../trogdor/coordinator/NodeManager.java | 32 +++++-- .../trogdor/coordinator/TaskManager.java | 5 +- .../apache/kafka/trogdor/task/TaskSpec.java | 13 +++ .../apache/kafka/trogdor/agent/AgentTest.java | 31 +++++++ .../trogdor/common/MiniTrogdorCluster.java | 23 ++++- .../trogdor/coordinator/CoordinatorTest.java | 87 +++++++++++++++++-- 7 files changed, 183 insertions(+), 18 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java index ef02716921247..486f71c7cbea2 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java @@ -250,6 +250,10 @@ class Worker { this.reference = shutdownManager.takeReference(); } + boolean hasExpired() { + return spec.hasExpired(time, startedMs); + } + long workerId() { return workerId; } @@ -316,6 +320,12 @@ public void createWorker(long workerId, String taskId, TaskSpec spec) throws Thr "a worker with that id.", nodeName, workerId); return; } + if (worker.hasExpired()) { + log.info("{}: Will not run worker {} as it has expired.", nodeName, worker); + stateChangeExecutor.submit(new HandleWorkerHalting(worker, + "worker expired", true)); + return; + } KafkaFutureImpl haltFuture = new KafkaFutureImpl<>(); haltFuture.thenApply((KafkaFuture.BaseFunction) errorString -> { if (errorString == null) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 3f0075e598a02..6f4b7c81e1a26 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -43,12 +43,14 @@ package org.apache.kafka.trogdor.coordinator; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.agent.AgentClient; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerStarting; @@ -82,6 +84,7 @@ class ManagedWorker { private final long workerId; private final String taskId; private final TaskSpec spec; + private long startedMs = -1; private boolean shouldRun; private WorkerState state; @@ -94,9 +97,15 @@ class ManagedWorker { this.state = state; } + boolean hasExpired() { + return spec.hasExpired(time, startedMs); + } + void tryCreate() { try { client.createWorker(new CreateWorkerRequest(workerId, taskId, spec)); + if (startedMs == -1) + startedMs = time.milliseconds(); } catch (Throwable e) { log.error("{}: error creating worker {}.", node.name(), this, e); } @@ -146,14 +155,17 @@ public String toString() { */ private final NodeHeartbeat heartbeat; + private final Time time; + /** * A future which can be used to cancel the periodic hearbeat task. */ private ScheduledFuture heartbeatFuture; - NodeManager(Node node, TaskManager taskManager) { + NodeManager(Node node, TaskManager taskManager, Time time) { this.node = node; this.taskManager = taskManager; + this.time = time; this.client = new AgentClient.Builder(). maxTries(1). target(node.hostname(), Node.Util.getTrogdorAgentPort(node)). @@ -206,11 +218,19 @@ public void run() { // in the agent's response. We need to send startWorker requests for these. for (Map.Entry entry : workers.entrySet()) { Long workerId = entry.getKey(); - if (!agentStatus.workers().containsKey(workerId)) { - ManagedWorker worker = entry.getValue(); - if (worker.shouldRun) { - worker.tryCreate(); - } + if (agentStatus.workers().containsKey(workerId)) + continue; + ManagedWorker worker = entry.getValue(); + if (!worker.shouldRun) + continue; + + if (!worker.hasExpired()) { + worker.tryCreate(); + } else { + log.info("{}: Will not create worker state {} as it has expired. ", node.name(), worker.state); + worker.shouldRun = false; + taskManager.updateWorkerState(node.name(), worker.workerId, + new WorkerDone(worker.taskId, worker.spec, worker.startedMs, -1, null, "worker expired")); } } for (Map.Entry entry : agentStatus.workers().entrySet()) { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 18ff9cb727504..2f8a97d5fe93c 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -136,7 +136,7 @@ public final class TaskManager { this.nextWorkerId = firstWorkerId; for (Node node : platform.topology().nodes().values()) { if (Node.Util.getTrogdorAgentPort(node) > 0) { - this.nodeManagers.put(node.name(), new NodeManager(node, this)); + this.nodeManagers.put(node.name(), new NodeManager(node, this, this.time)); } } log.info("Created TaskManager for agent(s) on: {}", @@ -336,6 +336,9 @@ public Void call() throws Exception { } catch (Throwable t) { failure = "Failed to create TaskController: " + t.getMessage(); } + if (spec.hasExpired(time, -1)) + failure = "worker expired"; + if (failure != null) { log.info("Failed to create a new task {} with spec {}: {}", id, spec, failure); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java index af7a76f854119..405d9485b4418 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.common.JsonUtil; import java.util.Collections; @@ -74,6 +75,18 @@ public final long durationMs() { return durationMs; } + /** + * Whether the task is considered expired + * @param startedMs the actual time that this task started on + */ + public boolean hasExpired(Time time, long startedMs) { + long startMs = this.startMs > 0 ? this.startMs : startedMs; + if (startMs <= 0) // task doesn't have a start time yet + return false; + + return startMs + durationMs < time.milliseconds(); + } + /** * Hydrate this task on the coordinator. * diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java index f0ea47535818f..1fffa6d659907 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java @@ -109,6 +109,37 @@ public void testAgentGetStatus() throws Exception { agent.waitForShutdown(); } + @Test + public void testCreateExpiredWorkerIsNotScheduled() throws Exception { + final boolean[] toSleep = {true}; + MockTime time = new MockTime(15, 100, 0) { + /** + * Modify sleep() to call super.sleep() only some times + * in order to avoid the endless loop in the tick() calls to the MockScheduler listener + */ + @Override + public void sleep(long ms) { + toSleep[0] = !toSleep[0]; + if (toSleep[0]) + super.sleep(ms); + } + }; + MockScheduler scheduler = new MockScheduler(time); + Agent agent = createAgent(scheduler); + AgentClient client = new AgentClient.Builder(). + maxTries(10).target("localhost", agent.port()).build(); + AgentStatusResponse status = client.status(); + assertEquals(Collections.emptyMap(), status.workers()); + new ExpectedTasks().waitFor(client); + + final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 10); + client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec)); + new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo"). + workerState(new WorkerDone("foo", fooSpec, 115, 145, null, "worker expired")). + build()). + waitFor(client); + } + @Test public void testAgentCreateWorkers() throws Exception { MockTime time = new MockTime(0, 0, 0); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java index 9edffaa757d27..0c9aba2cc9394 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java @@ -210,17 +210,25 @@ public MiniTrogdorCluster build() throws Exception { coordinator = node.coordinator; } } - return new MiniTrogdorCluster(agents, coordinator); + return new MiniTrogdorCluster(scheduler, agents, nodes, coordinator); } } private final TreeMap agents; + private final TreeMap nodesByAgent; + private final Coordinator coordinator; - private MiniTrogdorCluster(TreeMap agents, + private final Scheduler scheduler; + + private MiniTrogdorCluster(Scheduler scheduler, + TreeMap agents, + TreeMap nodesByAgent, Coordinator coordinator) { + this.scheduler = scheduler; this.agents = agents; + this.nodesByAgent = nodesByAgent; this.coordinator = coordinator; } @@ -242,6 +250,17 @@ public CoordinatorClient coordinatorClient() { build(); } + /** + * Mimic a restart of a Trogdor agent, essentially cleaning out all of its active workers + */ + public void restartAgent(String nodeName) { + if (!agents.containsKey(nodeName)) { + throw new RuntimeException("There is no agent on node " + nodeName); + } + Builder.NodeData node = nodesByAgent.get(nodeName); + agents.put(nodeName, new Agent(node.platform, scheduler, node.agentRestServer, node.agentRestResource)); + } + public AgentClient agentClient(String nodeName) { Agent agent = agents.get(nodeName); if (agent == null) { diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index db1afac3ea484..6b9f46398b8a4 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -154,7 +154,7 @@ public void testTaskDistribution() throws Exception { waitFor(agentClient1). waitFor(agentClient2); - NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2); + NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 7); coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo").taskState( @@ -176,15 +176,15 @@ public void testTaskDistribution() throws Exception { waitFor(agentClient1). waitFor(agentClient2); - time.sleep(2); + time.sleep(7); ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance); status2.set("node01", new TextNode("done")); status2.set("node02", new TextNode("done")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskDone(fooSpec, 11, 13, + taskState(new TaskDone(fooSpec, 11, 18, "", false, status2)). - workerState(new WorkerDone("foo", fooSpec, 11, 13, new TextNode("done"), "")). + workerState(new WorkerDone("foo", fooSpec, 11, 18, new TextNode("done"), "")). build()). waitFor(coordinatorClient). waitFor(agentClient1). @@ -211,7 +211,7 @@ public void testTaskCancellation() throws Exception { waitFor(agentClient1). waitFor(agentClient2); - NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2); + NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 7); coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()). @@ -236,13 +236,13 @@ public void testTaskCancellation() throws Exception { ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance); status2.set("node01", new TextNode("done")); status2.set("node02", new TextNode("done")); - time.sleep(1); + time.sleep(7); coordinatorClient.stopTask(new StopTaskRequest("foo")); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - taskState(new TaskDone(fooSpec, 11, 12, "", + taskState(new TaskDone(fooSpec, 11, 18, "", true, status2)). - workerState(new WorkerDone("foo", fooSpec, 11, 12, new TextNode("done"), "")). + workerState(new WorkerDone("foo", fooSpec, 11, 18, new TextNode("done"), "")). build()). waitFor(coordinatorClient). waitFor(agentClient1). @@ -275,7 +275,7 @@ public void testTaskDestruction() throws Exception { waitFor(agentClient1). waitFor(agentClient2); - NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 2); + NoOpTaskSpec fooSpec = new NoOpTaskSpec(2, 12); coordinatorClient.destroyTask(new DestroyTaskRequest("foo")); coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); NoOpTaskSpec barSpec = new NoOpTaskSpec(20, 20); @@ -496,6 +496,75 @@ public void testTasksRequest() throws Exception { } } + /** + * If an agent fails in the middle of a task and comes back up when the task is considered expired, + * we want the coordinator to mark it as DONE and not schedule it on the agent + */ + @Test + public void testCoordinatorDoesNotReRunExpiredTask() throws Exception { + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + addAgent("node02"). + scheduler(scheduler). + build()) { + CoordinatorClient coordinatorClient = cluster.coordinatorClient(); + + NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 500); + coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec)); + TaskState expectedState = new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build().taskState(); + + TaskState resp = coordinatorClient.task(new TaskRequest("foo")); + assertEquals(expectedState, resp); + + + time.sleep(2); + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))). + workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))). + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node02")); + + cluster.restartAgent("node02"); + time.sleep(550); + // coordinator heartbeat sees that the agent is back up but does not re-schedule the task as it is expired + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskDone(fooSpec, 2, 552, "worker expired", false, null)). + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node02")); + } + } + + @Test + public void testTaskRequestExpiredTaskDoesNotGetRun() throws Exception { + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + addAgent("node02"). + scheduler(scheduler). + build()) { + + NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 500); + time.sleep(552); + + CoordinatorClient coordinatorClient = cluster.coordinatorClient(); + NoOpTaskSpec alreadyExpired = new NoOpTaskSpec(1, 500); + coordinatorClient.createTask(new CreateTaskRequest("alreadyExpired", alreadyExpired)); + TaskState expectedState = new ExpectedTaskBuilder("alreadyExpired").taskState( + new TaskDone(fooSpec, -1, 552, "worker expired", false, null) + ).build().taskState(); + + TaskState resp = coordinatorClient.task(new TaskRequest("alreadyExpired")); + assertEquals(expectedState, resp); + } + } + @Test public void testTaskRequest() throws Exception { MockTime time = new MockTime(0, 0, 0); From 2918d711a2201601a438327022a95c60043cbe24 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Tue, 8 Jan 2019 12:44:57 +0200 Subject: [PATCH 2/5] Separate methods in NodeManager in order to reduce checkstyle complaint NPathComplexity --- .../trogdor/coordinator/NodeManager.java | 98 ++++++++++--------- 1 file changed, 54 insertions(+), 44 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 6f4b7c81e1a26..00431cd9aae4d 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -214,54 +214,64 @@ public void run() { if (log.isTraceEnabled()) { log.trace("{}: got heartbeat status {}", node.name(), agentStatus); } - // Identify workers which we think should be running, but which do not appear - // in the agent's response. We need to send startWorker requests for these. - for (Map.Entry entry : workers.entrySet()) { - Long workerId = entry.getKey(); - if (agentStatus.workers().containsKey(workerId)) - continue; - ManagedWorker worker = entry.getValue(); - if (!worker.shouldRun) - continue; - - if (!worker.hasExpired()) { - worker.tryCreate(); - } else { - log.info("{}: Will not create worker state {} as it has expired. ", node.name(), worker.state); - worker.shouldRun = false; - taskManager.updateWorkerState(node.name(), worker.workerId, - new WorkerDone(worker.taskId, worker.spec, worker.startedMs, -1, null, "worker expired")); - } + handleMissingWorkers(agentStatus); + handlePresentWorkers(agentStatus); + } catch (Throwable e) { + log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); + } + } + + /** + * Identify workers which we think should be running, but which do not appear + * in the agent's response. We need to send startWorker requests for these. + */ + private void handleMissingWorkers(AgentStatusResponse agentStatus) { + for (Map.Entry entry : workers.entrySet()) { + Long workerId = entry.getKey(); + if (agentStatus.workers().containsKey(workerId)) + continue; + ManagedWorker worker = entry.getValue(); + if (!worker.shouldRun) + continue; + + if (!worker.hasExpired()) { + worker.tryCreate(); + } else { + log.info("{}: Will not create worker state {} as it has expired. ", node.name(), worker.state); + worker.shouldRun = false; + taskManager.updateWorkerState(node.name(), worker.workerId, + new WorkerDone(worker.taskId, worker.spec, worker.startedMs, -1, null, "worker expired")); } - for (Map.Entry entry : agentStatus.workers().entrySet()) { - long workerId = entry.getKey(); - WorkerState state = entry.getValue(); - ManagedWorker worker = workers.get(workerId); - if (worker == null) { - // Identify tasks which are running, but which we don't know about. - // Add these to the NodeManager as tasks that should not be running. - log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId); - workers.put(workerId, new ManagedWorker(workerId, state.taskId(), - state.spec(), false, state)); - } else { - // Handle workers which need to be stopped. - if (state instanceof WorkerStarting || state instanceof WorkerRunning) { - if (!worker.shouldRun) { - worker.tryStop(); - } - } - // Notify the TaskManager if the worker state has changed. - if (worker.state.equals(state)) { - log.debug("{}: worker state is still {}", node.name(), worker.state); - } else { - log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); - worker.state = state; - taskManager.updateWorkerState(node.name(), worker.workerId, state); + } + } + + private void handlePresentWorkers(AgentStatusResponse agentStatus) { + for (Map.Entry entry : agentStatus.workers().entrySet()) { + long workerId = entry.getKey(); + WorkerState state = entry.getValue(); + ManagedWorker worker = workers.get(workerId); + if (worker == null) { + // Identify tasks which are running, but which we don't know about. + // Add these to the NodeManager as tasks that should not be running. + log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId); + workers.put(workerId, new ManagedWorker(workerId, state.taskId(), + state.spec(), false, state)); + } else { + // Handle workers which need to be stopped. + if (state instanceof WorkerStarting || state instanceof WorkerRunning) { + if (!worker.shouldRun) { + worker.tryStop(); } } + // Notify the TaskManager if the worker state has changed. + if (worker.state.equals(state)) { + log.debug("{}: worker state is still {}", node.name(), worker.state); + } else { + log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); + worker.state = state; + taskManager.updateWorkerState(node.name(), worker.workerId, state); + } } - } catch (Throwable e) { - log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); } } } From 2376fb9c96a2eb8ff4fd7bd02ff0ad5f84cca4ee Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 10 Jan 2019 12:51:33 +0200 Subject: [PATCH 3/5] Modify task startMs in task spec and do task expiry in the Agent only Also reverts back changes to NodeManager --- .../kafka/trogdor/agent/WorkerManager.java | 9 +- .../trogdor/coordinator/NodeManager.java | 102 +++++++----------- .../trogdor/coordinator/TaskManager.java | 39 ++++--- .../apache/kafka/trogdor/task/TaskSpec.java | 20 ++-- .../apache/kafka/trogdor/agent/AgentTest.java | 46 +++++--- .../trogdor/coordinator/CoordinatorTest.java | 39 +++++-- 6 files changed, 133 insertions(+), 122 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java index 486f71c7cbea2..bf3d293e417fa 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java @@ -250,10 +250,6 @@ class Worker { this.reference = shutdownManager.takeReference(); } - boolean hasExpired() { - return spec.hasExpired(time, startedMs); - } - long workerId() { return workerId; } @@ -284,7 +280,8 @@ WorkerState state() { void transitionToRunning() { state = State.RUNNING; timeoutFuture = scheduler.schedule(stateChangeExecutor, - new StopWorker(workerId, false), spec.durationMs()); + new StopWorker(workerId, false), + Math.max(0, spec.endMs() - time.milliseconds())); } void transitionToStopping() { @@ -320,7 +317,7 @@ public void createWorker(long workerId, String taskId, TaskSpec spec) throws Thr "a worker with that id.", nodeName, workerId); return; } - if (worker.hasExpired()) { + if (worker.spec.endMs() <= time.milliseconds()) { log.info("{}: Will not run worker {} as it has expired.", nodeName, worker); stateChangeExecutor.submit(new HandleWorkerHalting(worker, "worker expired", true)); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 00431cd9aae4d..3f0075e598a02 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -43,14 +43,12 @@ package org.apache.kafka.trogdor.coordinator; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.agent.AgentClient; import org.apache.kafka.trogdor.common.Node; import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.StopWorkerRequest; -import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerStarting; @@ -84,7 +82,6 @@ class ManagedWorker { private final long workerId; private final String taskId; private final TaskSpec spec; - private long startedMs = -1; private boolean shouldRun; private WorkerState state; @@ -97,15 +94,9 @@ class ManagedWorker { this.state = state; } - boolean hasExpired() { - return spec.hasExpired(time, startedMs); - } - void tryCreate() { try { client.createWorker(new CreateWorkerRequest(workerId, taskId, spec)); - if (startedMs == -1) - startedMs = time.milliseconds(); } catch (Throwable e) { log.error("{}: error creating worker {}.", node.name(), this, e); } @@ -155,17 +146,14 @@ public String toString() { */ private final NodeHeartbeat heartbeat; - private final Time time; - /** * A future which can be used to cancel the periodic hearbeat task. */ private ScheduledFuture heartbeatFuture; - NodeManager(Node node, TaskManager taskManager, Time time) { + NodeManager(Node node, TaskManager taskManager) { this.node = node; this.taskManager = taskManager; - this.time = time; this.client = new AgentClient.Builder(). maxTries(1). target(node.hostname(), Node.Util.getTrogdorAgentPort(node)). @@ -214,64 +202,46 @@ public void run() { if (log.isTraceEnabled()) { log.trace("{}: got heartbeat status {}", node.name(), agentStatus); } - handleMissingWorkers(agentStatus); - handlePresentWorkers(agentStatus); - } catch (Throwable e) { - log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); - } - } - - /** - * Identify workers which we think should be running, but which do not appear - * in the agent's response. We need to send startWorker requests for these. - */ - private void handleMissingWorkers(AgentStatusResponse agentStatus) { - for (Map.Entry entry : workers.entrySet()) { - Long workerId = entry.getKey(); - if (agentStatus.workers().containsKey(workerId)) - continue; - ManagedWorker worker = entry.getValue(); - if (!worker.shouldRun) - continue; - - if (!worker.hasExpired()) { - worker.tryCreate(); - } else { - log.info("{}: Will not create worker state {} as it has expired. ", node.name(), worker.state); - worker.shouldRun = false; - taskManager.updateWorkerState(node.name(), worker.workerId, - new WorkerDone(worker.taskId, worker.spec, worker.startedMs, -1, null, "worker expired")); - } - } - } - - private void handlePresentWorkers(AgentStatusResponse agentStatus) { - for (Map.Entry entry : agentStatus.workers().entrySet()) { - long workerId = entry.getKey(); - WorkerState state = entry.getValue(); - ManagedWorker worker = workers.get(workerId); - if (worker == null) { - // Identify tasks which are running, but which we don't know about. - // Add these to the NodeManager as tasks that should not be running. - log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId); - workers.put(workerId, new ManagedWorker(workerId, state.taskId(), - state.spec(), false, state)); - } else { - // Handle workers which need to be stopped. - if (state instanceof WorkerStarting || state instanceof WorkerRunning) { - if (!worker.shouldRun) { - worker.tryStop(); + // Identify workers which we think should be running, but which do not appear + // in the agent's response. We need to send startWorker requests for these. + for (Map.Entry entry : workers.entrySet()) { + Long workerId = entry.getKey(); + if (!agentStatus.workers().containsKey(workerId)) { + ManagedWorker worker = entry.getValue(); + if (worker.shouldRun) { + worker.tryCreate(); } } - // Notify the TaskManager if the worker state has changed. - if (worker.state.equals(state)) { - log.debug("{}: worker state is still {}", node.name(), worker.state); + } + for (Map.Entry entry : agentStatus.workers().entrySet()) { + long workerId = entry.getKey(); + WorkerState state = entry.getValue(); + ManagedWorker worker = workers.get(workerId); + if (worker == null) { + // Identify tasks which are running, but which we don't know about. + // Add these to the NodeManager as tasks that should not be running. + log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId); + workers.put(workerId, new ManagedWorker(workerId, state.taskId(), + state.spec(), false, state)); } else { - log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); - worker.state = state; - taskManager.updateWorkerState(node.name(), worker.workerId, state); + // Handle workers which need to be stopped. + if (state instanceof WorkerStarting || state instanceof WorkerRunning) { + if (!worker.shouldRun) { + worker.tryStop(); + } + } + // Notify the TaskManager if the worker state has changed. + if (worker.state.equals(state)) { + log.debug("{}: worker state is still {}", node.name(), worker.state); + } else { + log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); + worker.state = state; + taskManager.updateWorkerState(node.name(), worker.workerId, state); + } } } + } catch (Throwable e) { + log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); } } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 2f8a97d5fe93c..941656e36186e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -17,8 +17,10 @@ package org.apache.kafka.trogdor.coordinator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.LongNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.InvalidRequestException; @@ -136,7 +138,7 @@ public final class TaskManager { this.nextWorkerId = firstWorkerId; for (Node node : platform.topology().nodes().values()) { if (Node.Util.getTrogdorAgentPort(node) > 0) { - this.nodeManagers.put(node.name(), new NodeManager(node, this, this.time)); + this.nodeManagers.put(node.name(), new NodeManager(node, this)); } } log.info("Created TaskManager for agent(s) on: {}", @@ -150,7 +152,13 @@ class ManagedTask { final private String id; /** - * The task specification. + * The original task specification as submitted when the task was created. + */ + final private TaskSpec originalSpec; + + /** + * The effective task specification. + * The start time will be adjusted to reflect the time when the task was submitted. */ final private TaskSpec spec; @@ -195,8 +203,10 @@ class ManagedTask { */ private String error = ""; - ManagedTask(String id, TaskSpec spec, TaskController controller, TaskStateType state) { + ManagedTask(String id, TaskSpec originalSpec, TaskSpec spec, + TaskController controller, TaskStateType state) { this.id = id; + this.originalSpec = originalSpec; this.spec = spec; this.controller = controller; this.state = state; @@ -297,7 +307,7 @@ public void createTask(final String id, TaskSpec spec) throws Throwable { try { executor.submit(new CreateTask(id, spec)).get(); - } catch (ExecutionException e) { + } catch (ExecutionException | JsonProcessingException e) { log.info("createTask(id={}, spec={}) error", id, spec, e); throw e.getCause(); } @@ -308,11 +318,15 @@ public void createTask(final String id, TaskSpec spec) */ class CreateTask implements Callable { private final String id; + private final TaskSpec originalSpec; private final TaskSpec spec; - CreateTask(String id, TaskSpec spec) { + CreateTask(String id, TaskSpec spec) throws JsonProcessingException { this.id = id; - this.spec = spec; + this.originalSpec = spec; + ObjectNode node = JsonUtil.JSON_SERDE.valueToTree(originalSpec); + node.set("startMs", new LongNode(Math.max(time.milliseconds(), originalSpec.startMs()))); + this.spec = JsonUtil.JSON_SERDE.treeToValue(node, TaskSpec.class); } @Override @@ -322,11 +336,11 @@ public Void call() throws Exception { } ManagedTask task = tasks.get(id); if (task != null) { - if (!task.spec.equals(spec)) { + if (!task.originalSpec.equals(originalSpec)) { throw new RequestConflictException("Task ID " + id + " already " + - "exists, and has a different spec " + task.spec); + "exists, and has a different spec " + task.originalSpec); } - log.info("Task {} already exists with spec {}", id, spec); + log.info("Task {} already exists with spec {}", id, originalSpec); return null; } TaskController controller = null; @@ -336,19 +350,16 @@ public Void call() throws Exception { } catch (Throwable t) { failure = "Failed to create TaskController: " + t.getMessage(); } - if (spec.hasExpired(time, -1)) - failure = "worker expired"; - if (failure != null) { log.info("Failed to create a new task {} with spec {}: {}", id, spec, failure); - task = new ManagedTask(id, spec, null, TaskStateType.DONE); + task = new ManagedTask(id, originalSpec, spec, null, TaskStateType.DONE); task.doneMs = time.milliseconds(); task.maybeSetError(failure); tasks.put(id, task); return null; } - task = new ManagedTask(id, spec, controller, TaskStateType.PENDING); + task = new ManagedTask(id, originalSpec, spec, controller, TaskStateType.PENDING); tasks.put(id, task); long delayMs = task.startDelayMs(time.milliseconds()); task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java index 405d9485b4418..acb19f6587913 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskSpec.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.trogdor.common.JsonUtil; import java.util.Collections; @@ -68,23 +67,18 @@ public final long startMs() { } /** - * Get the duration of this task in ms. + * Get the deadline time of this task in ms */ - @JsonProperty - public final long durationMs() { - return durationMs; + public final long endMs() { + return startMs + durationMs; } /** - * Whether the task is considered expired - * @param startedMs the actual time that this task started on + * Get the duration of this task in ms. */ - public boolean hasExpired(Time time, long startedMs) { - long startMs = this.startMs > 0 ? this.startMs : startedMs; - if (startMs <= 0) // task doesn't have a start time yet - return false; - - return startMs + durationMs < time.milliseconds(); + @JsonProperty + public final long durationMs() { + return durationMs; } /** diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java index 1fffa6d659907..6c200838111c7 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.trogdor.rest.JsonRestServer; import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.task.NoOpTaskSpec; @@ -111,10 +112,12 @@ public void testAgentGetStatus() throws Exception { @Test public void testCreateExpiredWorkerIsNotScheduled() throws Exception { + long initialTimeMs = 100; + long tickMs = 15; final boolean[] toSleep = {true}; - MockTime time = new MockTime(15, 100, 0) { + MockTime time = new MockTime(tickMs, initialTimeMs, 0) { /** - * Modify sleep() to call super.sleep() only some times + * Modify sleep() to call super.sleep() every second call * in order to avoid the endless loop in the tick() calls to the MockScheduler listener */ @Override @@ -134,8 +137,12 @@ public void sleep(long ms) { final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 10); client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec)); + long actualStartTimeMs = initialTimeMs + tickMs; + long doneMs = actualStartTimeMs + 2 * tickMs; new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone("foo", fooSpec, 115, 145, null, "worker expired")). + workerState(new WorkerDone("foo", fooSpec, actualStartTimeMs, + doneMs, null, "worker expired")). + taskState(new TaskDone(fooSpec, actualStartTimeMs, doneMs, "worker expired", false, null)). build()). waitFor(client); } @@ -202,53 +209,58 @@ public void testAgentCreateWorkers() throws Exception { @Test public void testAgentFinishesTasks() throws Exception { - MockTime time = new MockTime(0, 0, 0); + long startTimeMs = 2000; + MockTime time = new MockTime(0, startTimeMs, 0); MockScheduler scheduler = new MockScheduler(time); Agent agent = createAgent(scheduler); AgentClient client = new AgentClient.Builder(). maxTries(10).target("localhost", agent.port()).build(); new ExpectedTasks().waitFor(client); - final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2); + final NoOpTaskSpec fooSpec = new NoOpTaskSpec(startTimeMs, 2); + long fooSpecStartTimeMs = startTimeMs; client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))). + workerState(new WorkerRunning("foo", fooSpec, startTimeMs, new TextNode("active"))). build()). waitFor(client); time.sleep(1); - final NoOpTaskSpec barSpec = new NoOpTaskSpec(2000, 900000); - client.createWorker(new CreateWorkerRequest(1, "bar", barSpec)); + long barSpecWorkerId = 1; + long barSpecStartTimeMs = startTimeMs + 1; + final NoOpTaskSpec barSpec = new NoOpTaskSpec(startTimeMs, 900000); + client.createWorker(new CreateWorkerRequest(barSpecWorkerId, "bar", barSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerRunning("foo", fooSpec, 0, new TextNode("active"))). + workerState(new WorkerRunning("foo", fooSpec, fooSpecStartTimeMs, new TextNode("active"))). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))). + workerState(new WorkerRunning("bar", barSpec, barSpecStartTimeMs, new TextNode("active"))). build()). waitFor(client); time.sleep(1); + // foo task expired new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")). + workerState(new WorkerDone("foo", fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs + 2, new TextNode("done"), "")). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerRunning("bar", barSpec, 1, new TextNode("active"))). + workerState(new WorkerRunning("bar", barSpec, barSpecStartTimeMs, new TextNode("active"))). build()). waitFor(client); time.sleep(5); - client.stopWorker(new StopWorkerRequest(1)); + client.stopWorker(new StopWorkerRequest(barSpecWorkerId)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). - workerState(new WorkerDone("foo", fooSpec, 0, 2, new TextNode("done"), "")). + workerState(new WorkerDone("foo", fooSpec, fooSpecStartTimeMs, fooSpecStartTimeMs + 2, new TextNode("done"), "")). build()). addTask(new ExpectedTaskBuilder("bar"). - workerState(new WorkerDone("bar", barSpec, 1, 7, new TextNode("done"), "")). + workerState(new WorkerDone("bar", barSpec, barSpecStartTimeMs, startTimeMs + 7, new TextNode("done"), "")). build()). waitFor(client); @@ -379,7 +391,7 @@ public void testDestroyWorkers() throws Exception { maxTries(10).target("localhost", agent.port()).build(); new ExpectedTasks().waitFor(client); - final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 5); + final NoOpTaskSpec fooSpec = new NoOpTaskSpec(0, 5); client.createWorker(new CreateWorkerRequest(0, "foo", fooSpec)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). @@ -394,7 +406,7 @@ public void testDestroyWorkers() throws Exception { new ExpectedTasks().waitFor(client); time.sleep(1); - final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(100, 1); + final NoOpTaskSpec fooSpec2 = new NoOpTaskSpec(2, 1); client.createWorker(new CreateWorkerRequest(1, "foo", fooSpec2)); new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index 6b9f46398b8a4..48b612b8d7c87 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -363,12 +363,15 @@ private static List> createPartitionLists(String[][] array) { @Test public void testNetworkPartitionFault() throws Exception { CapturingCommandRunner runner = new CapturingCommandRunner(); + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). addCoordinator("node01"). addAgent("node01"). addAgent("node02"). addAgent("node03"). commandRunner(runner). + scheduler(scheduler). build()) { CoordinatorClient coordinatorClient = cluster.coordinatorClient(); NetworkPartitionFaultSpec spec = new NetworkPartitionFaultSpec(0, Long.MAX_VALUE, @@ -541,7 +544,7 @@ public void testCoordinatorDoesNotReRunExpiredTask() throws Exception { } @Test - public void testTaskRequestExpiredTaskDoesNotGetRun() throws Exception { + public void testTaskRequestWithOldStartMsGetsUpdated() throws Exception { MockTime time = new MockTime(0, 0, 0); Scheduler scheduler = new MockScheduler(time); try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). @@ -554,13 +557,37 @@ public void testTaskRequestExpiredTaskDoesNotGetRun() throws Exception { time.sleep(552); CoordinatorClient coordinatorClient = cluster.coordinatorClient(); - NoOpTaskSpec alreadyExpired = new NoOpTaskSpec(1, 500); - coordinatorClient.createTask(new CreateTaskRequest("alreadyExpired", alreadyExpired)); - TaskState expectedState = new ExpectedTaskBuilder("alreadyExpired").taskState( - new TaskDone(fooSpec, -1, 552, "worker expired", false, null) + NoOpTaskSpec updatedSpec = new NoOpTaskSpec(552, 500); + coordinatorClient.createTask(new CreateTaskRequest("fooSpec", fooSpec)); + TaskState expectedState = new ExpectedTaskBuilder("fooSpec").taskState( + new TaskRunning(updatedSpec, 552, new TextNode("receiving")) + ).build().taskState(); + + TaskState resp = coordinatorClient.task(new TaskRequest("fooSpec")); + assertEquals(expectedState, resp); + } + } + + @Test + public void testTaskRequestWithFutureStartMsDoesNotGetRun() throws Exception { + MockTime time = new MockTime(0, 0, 0); + Scheduler scheduler = new MockScheduler(time); + try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). + addCoordinator("node01"). + addAgent("node02"). + scheduler(scheduler). + build()) { + + NoOpTaskSpec fooSpec = new NoOpTaskSpec(1000, 500); + time.sleep(999); + + CoordinatorClient coordinatorClient = cluster.coordinatorClient(); + coordinatorClient.createTask(new CreateTaskRequest("fooSpec", fooSpec)); + TaskState expectedState = new ExpectedTaskBuilder("fooSpec").taskState( + new TaskPending(fooSpec) ).build().taskState(); - TaskState resp = coordinatorClient.task(new TaskRequest("alreadyExpired")); + TaskState resp = coordinatorClient.task(new TaskRequest("fooSpec")); assertEquals(expectedState, resp); } } From b430ebca60d9ce641e3f44d28c7314e29879d0f4 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 10 Jan 2019 13:20:10 +0200 Subject: [PATCH 4/5] Do not re-schedule DONE/STOPPED tasks in the Coordinator --- .../kafka/trogdor/coordinator/NodeManager.java | 4 ++++ .../trogdor/coordinator/CoordinatorTest.java | 17 ++++++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 3f0075e598a02..0da48321f30c1 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -49,10 +49,12 @@ import org.apache.kafka.trogdor.rest.AgentStatusResponse; import org.apache.kafka.trogdor.rest.CreateWorkerRequest; import org.apache.kafka.trogdor.rest.StopWorkerRequest; +import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerStarting; import org.apache.kafka.trogdor.rest.WorkerState; +import org.apache.kafka.trogdor.rest.WorkerStopping; import org.apache.kafka.trogdor.task.TaskSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -235,6 +237,8 @@ public void run() { log.debug("{}: worker state is still {}", node.name(), worker.state); } else { log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); + if (state instanceof WorkerDone || state instanceof WorkerStopping) + worker.shouldRun = false; worker.state = state; taskManager.updateWorkerState(node.name(), worker.workerId, state); } diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index 48b612b8d7c87..02479511c52cc 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -501,10 +501,10 @@ public void testTasksRequest() throws Exception { /** * If an agent fails in the middle of a task and comes back up when the task is considered expired, - * we want the coordinator to mark it as DONE and not schedule it on the agent + * we want the task to be marked as DONE and not re-sent should a second failure happen. */ @Test - public void testCoordinatorDoesNotReRunExpiredTask() throws Exception { + public void testAgentFailureAndTaskExpiry() throws Exception { MockTime time = new MockTime(0, 0, 0); Scheduler scheduler = new MockScheduler(time); try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder(). @@ -533,10 +533,21 @@ public void testCoordinatorDoesNotReRunExpiredTask() throws Exception { cluster.restartAgent("node02"); time.sleep(550); - // coordinator heartbeat sees that the agent is back up but does not re-schedule the task as it is expired + // coordinator heartbeat sees that the agent is back up, re-schedules the task but the agent expires it new ExpectedTasks(). addTask(new ExpectedTaskBuilder("foo"). taskState(new TaskDone(fooSpec, 2, 552, "worker expired", false, null)). + workerState(new WorkerDone("foo", fooSpec, 552, 552, null, "worker expired")). + build()). + waitFor(coordinatorClient). + waitFor(cluster.agentClient("node02")); + + cluster.restartAgent("node02"); + // coordinator heartbeat sees that the agent is back up but does not re-schedule the task as it is DONE + new ExpectedTasks(). + addTask(new ExpectedTaskBuilder("foo"). + taskState(new TaskDone(fooSpec, 2, 552, "worker expired", false, null)). + // no worker states build()). waitFor(coordinatorClient). waitFor(cluster.agentClient("node02")); From 53109684cc2eca79f14d6502b76a57c52d1667c7 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 10 Jan 2019 13:25:51 +0200 Subject: [PATCH 5/5] Separate methods in NodeManager in order to avoid checkstyle complaint NPathComplexity --- .../trogdor/coordinator/NodeManager.java | 84 +++++++++++-------- 1 file changed, 47 insertions(+), 37 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java index 0da48321f30c1..97ad4ae1506f3 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java @@ -204,48 +204,58 @@ public void run() { if (log.isTraceEnabled()) { log.trace("{}: got heartbeat status {}", node.name(), agentStatus); } - // Identify workers which we think should be running, but which do not appear - // in the agent's response. We need to send startWorker requests for these. - for (Map.Entry entry : workers.entrySet()) { - Long workerId = entry.getKey(); - if (!agentStatus.workers().containsKey(workerId)) { - ManagedWorker worker = entry.getValue(); - if (worker.shouldRun) { - worker.tryCreate(); - } + handleMissingWorkers(agentStatus); + handlePresentWorkers(agentStatus); + } catch (Throwable e) { + log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); + } + } + + /** + * Identify workers which we think should be running but do not appear in the agent's response. + * We need to send startWorker requests for those + */ + private void handleMissingWorkers(AgentStatusResponse agentStatus) { + for (Map.Entry entry : workers.entrySet()) { + Long workerId = entry.getKey(); + if (!agentStatus.workers().containsKey(workerId)) { + ManagedWorker worker = entry.getValue(); + if (worker.shouldRun) { + worker.tryCreate(); } } - for (Map.Entry entry : agentStatus.workers().entrySet()) { - long workerId = entry.getKey(); - WorkerState state = entry.getValue(); - ManagedWorker worker = workers.get(workerId); - if (worker == null) { - // Identify tasks which are running, but which we don't know about. - // Add these to the NodeManager as tasks that should not be running. - log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId); - workers.put(workerId, new ManagedWorker(workerId, state.taskId(), - state.spec(), false, state)); - } else { - // Handle workers which need to be stopped. - if (state instanceof WorkerStarting || state instanceof WorkerRunning) { - if (!worker.shouldRun) { - worker.tryStop(); - } - } - // Notify the TaskManager if the worker state has changed. - if (worker.state.equals(state)) { - log.debug("{}: worker state is still {}", node.name(), worker.state); - } else { - log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); - if (state instanceof WorkerDone || state instanceof WorkerStopping) - worker.shouldRun = false; - worker.state = state; - taskManager.updateWorkerState(node.name(), worker.workerId, state); + } + } + + private void handlePresentWorkers(AgentStatusResponse agentStatus) { + for (Map.Entry entry : agentStatus.workers().entrySet()) { + long workerId = entry.getKey(); + WorkerState state = entry.getValue(); + ManagedWorker worker = workers.get(workerId); + if (worker == null) { + // Identify tasks which are running, but which we don't know about. + // Add these to the NodeManager as tasks that should not be running. + log.warn("{}: scheduling unknown worker with ID {} for stopping.", node.name(), workerId); + workers.put(workerId, new ManagedWorker(workerId, state.taskId(), + state.spec(), false, state)); + } else { + // Handle workers which need to be stopped. + if (state instanceof WorkerStarting || state instanceof WorkerRunning) { + if (!worker.shouldRun) { + worker.tryStop(); } } + // Notify the TaskManager if the worker state has changed. + if (worker.state.equals(state)) { + log.debug("{}: worker state is still {}", node.name(), worker.state); + } else { + log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state); + if (state instanceof WorkerDone || state instanceof WorkerStopping) + worker.shouldRun = false; + worker.state = state; + taskManager.updateWorkerState(node.name(), worker.workerId, state); + } } - } catch (Throwable e) { - log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e); } } }