From 2c7f87774a9faa60d29e510681dacb6b894ab6a3 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 11:02:10 +0530 Subject: [PATCH 1/7] Add log lines --- .../druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 62f6b499cb30..0feadbb7f576 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1206,6 +1206,7 @@ private void pendingTasksExecutionLoop() ImmutableWorkerInfo immutableWorker = null; synchronized (statusLock) { + log.info("Trying to assign [%d] pending tasks: %s", pendingTaskIds.size(), pendingTaskIds); Iterator iter = pendingTaskIds.iterator(); while (iter.hasNext()) { String taskId = iter.next(); @@ -1234,6 +1235,7 @@ private void pendingTasksExecutionLoop() immutableWorker = findWorkerToRunTask(ti.getTask()); if (immutableWorker == null) { + log.info("No eligible worker to run task[%s]", ti.getTask().getId()); continue; } @@ -1720,6 +1722,7 @@ public void taskAddedOrUpdated(final TaskAnnouncement announcement, final Worker public void stateChanged(boolean enabled, WorkerHolder workerHolder) { synchronized (statusLock) { + log.info("Worker[%s] is now enabled[%s]", workerHolder.getWorker().getHost(), enabled); statusLock.notifyAll(); } } From 2b7419c3cb1c618dddc8226361f96b16b0b23e9f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 15:38:35 +0530 Subject: [PATCH 2/7] Dummy change to trigger CI --- .../druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 0feadbb7f576..83702b575add 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1235,7 +1235,7 @@ private void pendingTasksExecutionLoop() immutableWorker = findWorkerToRunTask(ti.getTask()); if (immutableWorker == null) { - log.info("No eligible worker to run task[%s]", ti.getTask().getId()); + log.info("No eligible worker to run task[%s].", ti.getTask().getId()); continue; } From f182ca40196007fdc88ab97bab7f46116baab5a3 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 27 Sep 2025 17:53:18 +0530 Subject: [PATCH 3/7] Handle race condition --- .../apache/druid/indexing/overlord/hrtr/WorkerHolder.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index 4444916e69f7..3d0a51f1e466 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -69,6 +69,7 @@ public class WorkerHolder private Worker disabledWorker; protected final AtomicBoolean disabled; + private final AtomicBoolean syncedAtleastOnce = new AtomicBoolean(false); // Known list of tasks running/completed on this worker. protected final AtomicReference> tasksSnapshotRef; @@ -299,9 +300,12 @@ public void waitForInitialization() throws InterruptedException } } + /** + * Whether this worker has been synced successfully atleast once. + */ public boolean isInitialized() { - return syncer.isInitialized(); + return syncedAtleastOnce.get(); } public boolean isEnabled() @@ -425,6 +429,7 @@ public void deltaSync(List changes) private void notifyListener(List announcements, boolean isWorkerDisabled) { + syncedAtleastOnce.set(true); for (TaskAnnouncement announcement : announcements) { try { listener.taskAddedOrUpdated(announcement, WorkerHolder.this); From fc143acb339d211a9c8d7da202e69d18d18d9169 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sun, 28 Sep 2025 09:26:36 +0530 Subject: [PATCH 4/7] Another attemp --- .../druid/testing/embedded/docker/IngestionDockerTest.java | 2 +- .../org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java index 40e7a6bc0731..4f3ccc1578ab 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java @@ -52,7 +52,7 @@ protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster) overlord.addProperty("druid.plaintextPort", "7090"); return cluster - .useDefaultTimeoutForLatchableEmitter(60) + .useDefaultTimeoutForLatchableEmitter(120) .useContainerFriendlyHostname() .addResource(containerOverlord) .addResource(containerCoordinator) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index 3d0a51f1e466..929b77aaaf77 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -305,6 +305,8 @@ public void waitForInitialization() throws InterruptedException */ public boolean isInitialized() { + // Do not use syncer.isInitialized() to avoid delay of task assignment due to + // the HttpRemoteTaskRunner.pendingTaskExecutionLoop() going to sleep return syncedAtleastOnce.get(); } From f53b9c61bc527451fe2d6c024a65db15a614ba07 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sun, 28 Sep 2025 15:48:09 +0530 Subject: [PATCH 5/7] Increase timeout for schema test --- .../schema/CentralizedSchemaMetadataQueryDisabledTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java index 2c701c762138..ad23d94712cd 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java @@ -60,7 +60,8 @@ public class CompactionTask extends CompactionTaskTest @Override protected EmbeddedDruidCluster createCluster() { - return configureCluster(super.createCluster()); + return configureCluster(super.createCluster()) + .useDefaultTimeoutForLatchableEmitter(60); } } From 16295c21239053f9270e7b57508ca6fe68c03e88 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 29 Sep 2025 08:08:22 +0530 Subject: [PATCH 6/7] Remove extra logs --- .../druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 83702b575add..62f6b499cb30 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1206,7 +1206,6 @@ private void pendingTasksExecutionLoop() ImmutableWorkerInfo immutableWorker = null; synchronized (statusLock) { - log.info("Trying to assign [%d] pending tasks: %s", pendingTaskIds.size(), pendingTaskIds); Iterator iter = pendingTaskIds.iterator(); while (iter.hasNext()) { String taskId = iter.next(); @@ -1235,7 +1234,6 @@ private void pendingTasksExecutionLoop() immutableWorker = findWorkerToRunTask(ti.getTask()); if (immutableWorker == null) { - log.info("No eligible worker to run task[%s].", ti.getTask().getId()); continue; } @@ -1722,7 +1720,6 @@ public void taskAddedOrUpdated(final TaskAnnouncement announcement, final Worker public void stateChanged(boolean enabled, WorkerHolder workerHolder) { synchronized (statusLock) { - log.info("Worker[%s] is now enabled[%s]", workerHolder.getWorker().getHost(), enabled); statusLock.notifyAll(); } } From 9e642c7dc112980ae05534caa35a126a25f1e9c8 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 29 Sep 2025 11:50:40 +0530 Subject: [PATCH 7/7] Add comments --- .../druid/indexing/overlord/hrtr/WorkerHolder.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index 929b77aaaf77..dc219d4f8b7f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -305,8 +305,12 @@ public void waitForInitialization() throws InterruptedException */ public boolean isInitialized() { - // Do not use syncer.isInitialized() to avoid delay of task assignment due to - // the HttpRemoteTaskRunner.pendingTaskExecutionLoop() going to sleep + // Do not use syncer.isInitialized() as it becomes true only after the first + // fullSync() or deltaSync() callback has completed. But the callback itself + // wakes up the HttpRemoteTaskRunner.pendingTaskExecutionLoop() which checks + // if this WorkerHolder is initialized before assigning tasks to it. + // If not initialized, execution loop goes to sleep for 1 minute thus delaying + // task assignment. return syncedAtleastOnce.get(); } @@ -431,7 +435,6 @@ public void deltaSync(List changes) private void notifyListener(List announcements, boolean isWorkerDisabled) { - syncedAtleastOnce.set(true); for (TaskAnnouncement announcement : announcements) { try { listener.taskAddedOrUpdated(announcement, WorkerHolder.this); @@ -446,6 +449,7 @@ private void notifyListener(List announcements, boolean isWork } } + syncedAtleastOnce.set(true); if (isWorkerDisabled != disabled.get()) { disabled.set(isWorkerDisabled); log.info("Worker[%s] disabled set to [%s].", worker.getHost(), isWorkerDisabled);