From d534d2459eb4231b7f34e41744d3570d40d737b5 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Thu, 24 Feb 2022 16:33:16 +0800 Subject: [PATCH 1/4] fix unexpected task status Change-Id: I9eb0b260611e6b65280969d96b8e5856403cc583 --- .../com/baidu/hugegraph/api/job/TaskAPI.java | 2 +- .../hugegraph/task/StandardTaskScheduler.java | 9 ++++--- .../com/baidu/hugegraph/api/BaseApiTest.java | 9 ++++++- .../com/baidu/hugegraph/api/TaskApiTest.java | 26 ++++++++++++++++++- 4 files changed, 40 insertions(+), 6 deletions(-) diff --git a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/TaskAPI.java b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/TaskAPI.java index f124362cdf..57a64f8905 100644 --- a/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/TaskAPI.java +++ b/hugegraph-api/src/main/java/com/baidu/hugegraph/api/job/TaskAPI.java @@ -176,7 +176,7 @@ public Map update(@Context GraphManager manager, private static TaskStatus parseStatus(String status) { try { - return TaskStatus.valueOf(status); + return TaskStatus.valueOf(status.toUpperCase()); } catch (Exception e) { throw new IllegalArgumentException(String.format( "Status value must be in %s, but got '%s'", diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java index 0db34f6fdc..b7d6bd60a9 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/StandardTaskScheduler.java @@ -218,6 +218,7 @@ public Future schedule(HugeTask task) { /* * Due to EphemeralJob won't be serialized and deserialized through * shared storage, submit EphemeralJob immediately on master + * NOTE: don't need to save EphemeralJob task */ task.status(TaskStatus.QUEUED); return this.submitTask(task); @@ -228,8 +229,8 @@ public Future schedule(HugeTask task) { if (this.serverManager().onlySingleNode() && !task.computer()) { /* - * Speed up for single node, submit task immediately - * this can be removed without affecting logic + * Speed up for single node, submit task immediately, + * this code can be removed without affecting logic */ task.status(TaskStatus.QUEUED); task.server(this.serverManager().selfServerId()); @@ -237,7 +238,7 @@ public Future schedule(HugeTask task) { return this.submitTask(task); } else { /* - * Just set SCHEDULING status and save task + * Just set SCHEDULING status and save task, * it will be scheduled by periodic scheduler worker */ task.status(TaskStatus.SCHEDULING); @@ -394,6 +395,7 @@ protected void executeTasksOnWorker(Id server) { } if (taskServer.equals(server)) { task.status(TaskStatus.QUEUED); + this.save(task); this.submitTask(task); } } @@ -467,6 +469,7 @@ protected void remove(HugeTask task) { @Override public void save(HugeTask task) { + LOG.debug("Saving task: {}", task); task.scheduler(this); E.checkArgumentNotNull(task, "Task can't be null"); this.call(() -> { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java index 4531f2bfee..cb4ddd147b 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java @@ -46,6 +46,7 @@ import org.junit.BeforeClass; import com.baidu.hugegraph.HugeException; +import com.baidu.hugegraph.util.CollectionUtil; import com.baidu.hugegraph.util.JsonUtil; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonNode; @@ -519,6 +520,8 @@ protected static void clearSchema() { List list = readList(content, type, Map.class); List names = list.stream().map(e -> e.get("name")) .collect(Collectors.toList()); + Assert.assertTrue("Expect all names are unique: " + names, + CollectionUtil.allUnique(names)); Set tasks = new HashSet<>(); names.forEach(name -> { Response response = client.delete(path, (String) name); @@ -540,13 +543,17 @@ protected static void clearSchema() { } protected static void waitTaskSuccess(int task) { + waitTaskStatus(task, "success"); + } + + protected static void waitTaskStatus(int task, String expectedStatus) { String status; do { Response r = client.get("/graphs/hugegraph/tasks/", String.valueOf(task)); String content = assertResponseStatus(200, r); status = assertJsonContains(content, "task_status"); - } while (!"success".equals(status)); + } while (!status.equals(expectedStatus)); } protected static String parseId(String content) throws IOException { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/TaskApiTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/TaskApiTest.java index 50b2812844..771b923fcd 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/TaskApiTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/TaskApiTest.java @@ -44,6 +44,7 @@ public void prepareSchema() { @Test public void testList() { + // create a task int taskId = this.rebuild(); Response r = client().get(path, ImmutableMap.of("limit", -1)); @@ -52,6 +53,12 @@ public void testList() { assertArrayContains(tasks, "id", taskId); waitTaskSuccess(taskId); + + r = client().get(path, String.valueOf(taskId)); + content = assertResponseStatus(200, r); + String status = assertJsonContains(content, "task_status"); + Assert.assertEquals("success", status); + /* * FIXME: sometimes may get results of RUNNING tasks after the task * status is SUCCESS, which is stored in DB if there are worker @@ -62,23 +69,36 @@ public void testList() { r = client().get(path, ImmutableMap.of("status", "RUNNING")); content = assertResponseStatus(200, r); tasks = assertJsonContains(content, "tasks"); - Assert.assertTrue(tasks.toString(), tasks.isEmpty()); + String message = String.format("Expect none RUNNING tasks(%d), " + + "but got %s", taskId, tasks); + Assert.assertTrue(message, tasks.isEmpty()); } @Test public void testGet() { + // create a task int taskId = this.rebuild(); Response r = client().get(path, String.valueOf(taskId)); String content = assertResponseStatus(200, r); assertJsonContains(content, "id"); + + waitTaskSuccess(taskId); + + r = client().get(path, String.valueOf(taskId)); + content = assertResponseStatus(200, r); + String status = assertJsonContains(content, "task_status"); + Assert.assertEquals("success", status); } @Test public void testCancel() { + // create a task int taskId = this.gremlinJob(); sleepAWhile(); + + // cancel task Map params = ImmutableMap.of("action", "cancel"); Response r = client().put(path, String.valueOf(taskId), "", params); String content = r.readEntity(String.class); @@ -88,6 +108,7 @@ public void testCancel() { String status = assertJsonContains(content, "task_status"); Assert.assertTrue(status, status.equals("cancelling") || status.equals("cancelled")); + waitTaskStatus(taskId, "cancelled"); } else { assert r.getStatus() == 400; String error = String.format( @@ -103,14 +124,17 @@ public void testCancel() { @Test public void testDelete() { + // create a task int taskId = this.rebuild(); waitTaskSuccess(taskId); + // delete task Response r = client().delete(path, String.valueOf(taskId)); assertResponseStatus(204, r); } private int rebuild() { + // create a rebuild_index task String rebuildPath = "/graphs/hugegraph/jobs/rebuild/indexlabels"; String personByCity = "personByCity"; Map params = ImmutableMap.of(); From 8f53ecc6f118354f4562feeace283a01f717de52 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Fri, 25 Feb 2022 20:15:53 +0800 Subject: [PATCH 2/4] fix task-manager become single-node mode due to server-info is missing Change-Id: Ia42fdbf28ac7428501fb36bc89e5ab4368a0b553 --- .../com/baidu/hugegraph/task/HugeTask.java | 9 +++- .../hugegraph/task/ServerInfoManager.java | 46 ++++++++++++------- .../com/baidu/hugegraph/task/TaskManager.java | 9 ++-- .../com/baidu/hugegraph/api/BaseApiTest.java | 20 ++++++-- .../com/baidu/hugegraph/api/TaskApiTest.java | 7 ++- 5 files changed, 65 insertions(+), 26 deletions(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java index 28086ba399..c981e7550c 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/HugeTask.java @@ -275,7 +275,7 @@ public String toString() { @Override public void run() { if (this.cancelled()) { - // Scheduled task is running after cancelled + // A task is running after cancelled which scheduled/queued before return; } @@ -283,6 +283,11 @@ public void run() { try { assert this.status.code() < TaskStatus.RUNNING.code() : this.status; if (this.checkDependenciesSuccess()) { + /* + * FIXME: worker node may reset status to RUNNING here, and the + * status in DB is CANCELLING that set by master node, + * it will lead to cancel() operation not to take effect. + */ this.status(TaskStatus.RUNNING); super.run(); } @@ -308,7 +313,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { // Callback for saving status to store this.callable.cancelled(); } else { - // Maybe the worker is still running then set status SUCCESS + // Maybe worker node is still running then set status SUCCESS cancelled = false; } } catch (Throwable e) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/ServerInfoManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/ServerInfoManager.java index 807673eae2..be4e3d0035 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/ServerInfoManager.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/ServerInfoManager.java @@ -160,11 +160,7 @@ public synchronized void initServerInfo(Id server, NodeRole role) { } while (page != null); } - HugeServerInfo serverInfo = new HugeServerInfo(server, role); - serverInfo.maxLoad(this.calcMaxLoad()); - this.save(serverInfo); - - LOG.info("Init server info: {}", serverInfo); + this.saveServerInfo(this.selfServerId, this.selfServerRole); } public Id selfServerId() { @@ -186,8 +182,9 @@ public boolean onlySingleNode() { public void heartbeat() { HugeServerInfo serverInfo = this.selfServerInfo(); - if (serverInfo == null) { - return; + if (serverInfo == null && this.selfServerId != null) { + serverInfo = this.saveServerInfo(this.selfServerId, + this.selfServerRole); } serverInfo.updateTime(DateUtil.now()); this.save(serverInfo); @@ -239,7 +236,11 @@ protected synchronized HugeServerInfo pickWorkerNode( } } - this.onlySingleNode = !hasWorkerNode; + boolean singleNode = !hasWorkerNode; + if (singleNode != this.onlySingleNode) { + LOG.info("Switch only_single_node to {}", singleNode); + this.onlySingleNode = singleNode; + } // Only schedule to master if there is no workers and master is suitable if (!hasWorkerNode) { @@ -260,26 +261,35 @@ private GraphTransaction tx() { return this.graph.systemTransaction(); } - private Id save(HugeServerInfo server) { + private HugeServerInfo saveServerInfo(Id server, NodeRole role) { + HugeServerInfo serverInfo = new HugeServerInfo(server, role); + serverInfo.maxLoad(this.calcMaxLoad()); + this.save(serverInfo); + + LOG.info("Init server info: {}", serverInfo); + return serverInfo; + } + + private Id save(HugeServerInfo serverInfo) { return this.call(() -> { // Construct vertex from server info HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph); if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) { throw new HugeException("Schema is missing for %s '%s'", - HugeServerInfo.P.SERVER, server); + HugeServerInfo.P.SERVER, serverInfo); } HugeVertex vertex = this.tx().constructVertex(false, - server.asArray()); + serverInfo.asArray()); // Add or update server info in backend store vertex = this.tx().addVertex(vertex); return vertex.id(); }); } - private int save(Collection servers) { + private int save(Collection serverInfos) { return this.call(() -> { - if (servers.isEmpty()) { - return servers.size(); + if (serverInfos.isEmpty()) { + return serverInfos.size(); } HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph); if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) { @@ -289,7 +299,7 @@ private int save(Collection servers) { // Save server info in batch GraphTransaction tx = this.tx(); int updated = 0; - for (HugeServerInfo server : servers) { + for (HugeServerInfo server : serverInfos) { if (!server.updated()) { continue; } @@ -319,7 +329,11 @@ private V call(Callable callable) { } private HugeServerInfo selfServerInfo() { - return this.serverInfo(this.selfServerId); + HugeServerInfo selfServerInfo = this.serverInfo(this.selfServerId); + if (selfServerInfo == null) { + LOG.warn("ServerInfo is missing: {}", this.selfServerId); + } + return selfServerInfo; } private HugeServerInfo serverInfo(Id server) { diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskManager.java index 390abe39e5..d99fd2923b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskManager.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/TaskManager.java @@ -50,7 +50,7 @@ public final class TaskManager { "server-info-db-worker-%d"; public static final String TASK_SCHEDULER = "task-scheduler-%d"; - protected static final int SCHEDULE_PERIOD = 1; // Unit second + protected static final long SCHEDULE_PERIOD = 1000L; // unit ms private static final int THREADS = 4; private static final TaskManager MANAGER = new TaskManager(THREADS); @@ -79,10 +79,11 @@ private TaskManager(int pool) { // For schedule task to run, just one thread is ok this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool( 1, TASK_SCHEDULER); - // Start after 10s waiting for HugeGraphServer startup + // Start after 10x period time waiting for HugeGraphServer startup this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob, - 10L, SCHEDULE_PERIOD, - TimeUnit.SECONDS); + 10 * SCHEDULE_PERIOD, + SCHEDULE_PERIOD, + TimeUnit.MILLISECONDS); } public void addScheduler(HugeGraphParams graph) { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java index cb4ddd147b..eb68f251e7 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java @@ -52,6 +52,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; public class BaseApiTest { @@ -543,17 +544,30 @@ protected static void clearSchema() { } protected static void waitTaskSuccess(int task) { - waitTaskStatus(task, "success"); + waitTaskStatus(task, ImmutableSet.of("success")); } - protected static void waitTaskStatus(int task, String expectedStatus) { + protected static void waitTaskCompleted(int task) { + Set completed = ImmutableSet.of("success", + "cancelled", + "failed"); + waitTaskStatus(task, completed); + } + + protected static void waitTaskStatus(int task, Set expectedStatus) { String status; + int times = 0; + int maxTimes = 100000; do { Response r = client.get("/graphs/hugegraph/tasks/", String.valueOf(task)); String content = assertResponseStatus(200, r); status = assertJsonContains(content, "task_status"); - } while (!status.equals(expectedStatus)); + if (times++ > maxTimes) { + Assert.fail(String.format("Failed to wait for task %s " + + "due to timeout", task)); + } + } while (!expectedStatus.contains(status)); } protected static String parseId(String content) throws IOException { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/TaskApiTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/TaskApiTest.java index 771b923fcd..da486078d0 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/TaskApiTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/TaskApiTest.java @@ -108,7 +108,12 @@ public void testCancel() { String status = assertJsonContains(content, "task_status"); Assert.assertTrue(status, status.equals("cancelling") || status.equals("cancelled")); - waitTaskStatus(taskId, "cancelled"); + /* + * NOTE: should be waitTaskStatus(taskId, "cancelled"), but worker + * node may ignore the CANCELLING status due to now we can't atomic + * update task status, and then the task is running to SUCCESS. + */ + waitTaskCompleted(taskId); } else { assert r.getStatus() == 400; String error = String.format( From f90e0120a67e476cf6e2b8ba397ce3cbc5e05a8a Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Mon, 28 Feb 2022 16:13:35 +0800 Subject: [PATCH 3/4] fix don't auto save server-info on master node Change-Id: I36bfd4d556edbdc2e5c1d9863dcf471802682720 --- .../main/java/com/baidu/hugegraph/task/ServerInfoManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/ServerInfoManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/ServerInfoManager.java index be4e3d0035..477a148b3b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/task/ServerInfoManager.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/task/ServerInfoManager.java @@ -182,7 +182,8 @@ public boolean onlySingleNode() { public void heartbeat() { HugeServerInfo serverInfo = this.selfServerInfo(); - if (serverInfo == null && this.selfServerId != null) { + if (serverInfo == null && this.selfServerId != null && + this.selfServerRole != NodeRole.MASTER) { serverInfo = this.saveServerInfo(this.selfServerId, this.selfServerRole); } From 13c25675e0b6962bc0ea4050e4d5a2829e2ab1c9 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Fri, 4 Mar 2022 17:32:36 +0800 Subject: [PATCH 4/4] fix: create index label task Change-Id: I5eca4b6663ad85e10a75dfc9c813e2aa35ab2078 --- .../src/main/java/com/baidu/hugegraph/api/BaseApiTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java index eb68f251e7..f232c77bd5 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/api/BaseApiTest.java @@ -296,7 +296,7 @@ protected static void initEdgeLabel() { + "}"); } - protected static void initIndexLabel() { + protected static int initIndexLabel() { String path = URL_PREFIX + SCHEMA_ILS; Response r = client.post(path, "{\n" @@ -305,11 +305,13 @@ protected static void initIndexLabel() { + "\"base_value\": \"person\",\n" + "\"index_type\": \"SECONDARY\",\n" + "\"check_exist\": false,\n" + + "\"rebuild\": false,\n" + "\"fields\": [\n" + "\"city\"\n" + "]\n" + "}"); - assertResponseStatus(202, r); + String content = assertResponseStatus(202, r); + return assertJsonContains(content, "task_id"); } protected static void initEdge() {