Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected EmbeddedDruidCluster addServers(EmbeddedDruidCluster cluster)
overlord.addProperty("druid.plaintextPort", "7090");

return cluster
.useDefaultTimeoutForLatchableEmitter(60)
.useDefaultTimeoutForLatchableEmitter(120)
.useContainerFriendlyHostname()
.addResource(containerOverlord)
.addResource(containerCoordinator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public class CompactionTask extends CompactionTaskTest
@Override
protected EmbeddedDruidCluster createCluster()
{
return configureCluster(super.createCluster());
return configureCluster(super.createCluster())
.useDefaultTimeoutForLatchableEmitter(60);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class WorkerHolder
private Worker disabledWorker;

protected final AtomicBoolean disabled;
private final AtomicBoolean syncedAtleastOnce = new AtomicBoolean(false);

// Known list of tasks running/completed on this worker.
protected final AtomicReference<Map<String, TaskAnnouncement>> tasksSnapshotRef;
Expand Down Expand Up @@ -299,9 +300,18 @@ public void waitForInitialization() throws InterruptedException
}
}

/**
* Whether this worker has been synced successfully atleast once.
*/
public boolean isInitialized()
{
return syncer.isInitialized();
// Do not use syncer.isInitialized() as it becomes true only after the first
// fullSync() or deltaSync() callback has completed. But the callback itself
// wakes up the HttpRemoteTaskRunner.pendingTaskExecutionLoop() which checks
// if this WorkerHolder is initialized before assigning tasks to it.
// If not initialized, execution loop goes to sleep for 1 minute thus delaying
// task assignment.
return syncedAtleastOnce.get();
}

public boolean isEnabled()
Expand Down Expand Up @@ -439,6 +449,7 @@ private void notifyListener(List<TaskAnnouncement> announcements, boolean isWork
}
}

syncedAtleastOnce.set(true);
if (isWorkerDisabled != disabled.get()) {
disabled.set(isWorkerDisabled);
log.info("Worker[%s] disabled set to [%s].", worker.getHost(), isWorkerDisabled);
Expand Down