diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java index 40e7a6bc0731..4f3ccc1578ab 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/docker/IngestionDockerTest.java @@ -52,7 +52,7 @@ protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster) overlord.addProperty("druid.plaintextPort", "7090"); return cluster - .useDefaultTimeoutForLatchableEmitter(60) + .useDefaultTimeoutForLatchableEmitter(120) .useContainerFriendlyHostname() .addResource(containerOverlord) .addResource(containerCoordinator) diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java index 2c701c762138..ad23d94712cd 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java @@ -60,7 +60,8 @@ public class CompactionTask extends CompactionTaskTest @Override protected EmbeddedDruidCluster createCluster() { - return configureCluster(super.createCluster()); + return configureCluster(super.createCluster()) + .useDefaultTimeoutForLatchableEmitter(60); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index 4444916e69f7..dc219d4f8b7f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -69,6 +69,7 @@ public class WorkerHolder private Worker disabledWorker; protected final AtomicBoolean disabled; + private final AtomicBoolean syncedAtleastOnce = new AtomicBoolean(false); // Known list of tasks running/completed on this worker. protected final AtomicReference> tasksSnapshotRef; @@ -299,9 +300,18 @@ public void waitForInitialization() throws InterruptedException } } + /** + * Whether this worker has been synced successfully atleast once. + */ public boolean isInitialized() { - return syncer.isInitialized(); + // Do not use syncer.isInitialized() as it becomes true only after the first + // fullSync() or deltaSync() callback has completed. But the callback itself + // wakes up the HttpRemoteTaskRunner.pendingTaskExecutionLoop() which checks + // if this WorkerHolder is initialized before assigning tasks to it. + // If not initialized, execution loop goes to sleep for 1 minute thus delaying + // task assignment. + return syncedAtleastOnce.get(); } public boolean isEnabled() @@ -439,6 +449,7 @@ private void notifyListener(List announcements, boolean isWork } } + syncedAtleastOnce.set(true); if (isWorkerDisabled != disabled.get()) { disabled.set(isWorkerDisabled); log.info("Worker[%s] disabled set to [%s].", worker.getHost(), isWorkerDisabled);