Skip to content
Merged
1 change: 0 additions & 1 deletion docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ The indexing service also uses its own set of paths. These configs can be includ
|`druid.zk.paths.indexer.announcementsPath`|Middle managers announce themselves here.|`${druid.zk.paths.indexer.base}/announcements`|
|`druid.zk.paths.indexer.tasksPath`|Used to assign tasks to middle managers.|`${druid.zk.paths.indexer.base}/tasks`|
|`druid.zk.paths.indexer.statusPath`|Parent path for announcement of task statuses.|`${druid.zk.paths.indexer.base}/status`|
|`druid.zk.paths.indexer.leaderLatchPath`|Used for Overlord leader election.|`${druid.zk.paths.indexer.base}/leaderLatchPath`|

If `druid.zk.paths.base` and `druid.zk.paths.indexer.base` are both set, and none of the other `druid.zk.paths.*` or `druid.zk.paths.indexer.*` values are set, then the other properties will be evaluated relative to their respective `base`.
For example, if `druid.zk.paths.base` is set to `/druid1` and `druid.zk.paths.indexer.base` is set to `/druid2` then `druid.zk.paths.announcementsPath` will default to `/druid1/announcements` while `druid.zk.paths.indexer.announcementsPath` will default to `/druid2/announcements`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.indexing.IndexingService;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.discovery.DruidLeaderSelector;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
Expand All @@ -38,31 +40,24 @@
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.DruidNode;
import io.druid.server.coordinator.CoordinatorOverlordServiceConfig;
import io.druid.server.initialization.IndexerZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.framework.state.ConnectionState;

import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
* Encapsulates the indexer leadership lifecycle.
*/
public class TaskMaster
{
private final LeaderSelector leaderSelector;
private final DruidLeaderSelector overlordLeaderSelector;
private final DruidLeaderSelector.Listener leadershipListener;

private final ReentrantLock giant = new ReentrantLock(true);
private final Condition mayBeStopped = giant.newCondition();
private final TaskActionClientFactory taskActionClientFactory;
private final SupervisorManager supervisorManager;

private final AtomicReference<Lifecycle> leaderLifecycleRef = new AtomicReference<>(null);

private volatile boolean leading = false;
private volatile TaskRunner taskRunner;
private volatile TaskQueue taskQueue;

Expand All @@ -75,115 +70,99 @@ public TaskMaster(
final TaskStorage taskStorage,
final TaskActionClientFactory taskActionClientFactory,
@Self final DruidNode selfNode,
final IndexerZkConfig zkPaths,
final TaskRunnerFactory runnerFactory,
final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig,
final ServiceEmitter emitter,
final SupervisorManager supervisorManager,
final OverlordHelperManager overlordHelperManager
)
final OverlordHelperManager overlordHelperManager,
@IndexingService final DruidLeaderSelector overlordLeaderSelector
)
{
this.supervisorManager = supervisorManager;
this.taskActionClientFactory = taskActionClientFactory;

this.overlordLeaderSelector = overlordLeaderSelector;

final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode :
selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService());

this.leaderSelector = new LeaderSelector(
curator,
zkPaths.getLeaderLatchPath(),
new LeaderSelectorListener()
{
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
giant.lock();

try {
// Make sure the previous leadership cycle is really, really over.
stopLeading();

// I AM THE MASTER OF THE UNIVERSE.
log.info("By the power of Grayskull, I have the power!");
taskLockbox.syncFromStorage();
taskRunner = runnerFactory.build();
taskQueue = new TaskQueue(
taskQueueConfig,
taskStorage,
taskRunner,
taskActionClientFactory,
taskLockbox,
emitter
);

// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle();
if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) {
log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition")
.emit();
}

leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);
this.leadershipListener = new DruidLeaderSelector.Listener()
{
@Override
public void becomeLeader()
{
giant.lock();

// I AM THE MASTER OF THE UNIVERSE.
log.info("By the power of Grayskull, I have the power!");

try {
taskLockbox.syncFromStorage();
taskRunner = runnerFactory.build();
taskQueue = new TaskQueue(
taskQueueConfig,
taskStorage,
taskRunner,
taskActionClientFactory,
taskLockbox,
emitter
);

// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle();
if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) {
log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition")
.emit();
}

leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(node);
}
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);

leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(node);
}

@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
}
}
);
try {
leaderLifecycle.start();
leading = true;
while (leading && !Thread.currentThread().isInterrupted()) {
mayBeStopped.await();
@Override
public void stop()
{
serviceAnnouncer.unannounce(node);
}
}
catch (InterruptedException e) {
log.debug("Interrupted while waiting");
// Suppress so we can bow out gracefully
}
finally {
log.info("Bowing out!");
stopLeading();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to lead").emit();
throw Throwables.propagate(e);
}
finally {
giant.unlock();
}
}
);

@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) {
// disconnected from zk. assume leadership is gone
stopLeading();
}
}
leaderLifecycle.start();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
giant.unlock();
}
);
}

leaderSelector.setId(node.getHostAndPortToUse());
leaderSelector.autoRequeue();
@Override
public void stopBeingLeader()
{
giant.lock();
try {
final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);
if (leaderLifecycle != null) {
leaderLifecycle.stop();
}
}
finally {
giant.unlock();
}
}
};
}

/**
Expand All @@ -195,7 +174,7 @@ public void start()
giant.lock();

try {
leaderSelector.start();
overlordLeaderSelector.registerListener(leadershipListener);
}
finally {
giant.unlock();
Expand All @@ -212,30 +191,7 @@ public void stop()
giant.lock();

try {
leaderSelector.close();
stopLeading();
}
finally {
giant.unlock();
}
}

/**
* Relinquish leadership. May be called multiple times, even when not currently the leader.
*/
private void stopLeading()
{
giant.lock();

try {
if (leading) {
leading = false;
mayBeStopped.signalAll();
final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);
if (leaderLifecycle != null) {
leaderLifecycle.stop();
}
}
overlordLeaderSelector.unregisterListener();
}
finally {
giant.unlock();
Expand All @@ -244,27 +200,17 @@ private void stopLeading()

public boolean isLeader()
{
return leading;
return overlordLeaderSelector.isLeader();
}

public String getCurrentLeader()
{
try {
final Participant leader = leaderSelector.getLeader();
if (leader != null && leader.isLeader()) {
return leader.getId();
} else {
return null;
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return overlordLeaderSelector.getCurrentLeader();
}

public Optional<TaskRunner> getTaskRunner()
{
if (leading) {
if (overlordLeaderSelector.isLeader()) {
return Optional.of(taskRunner);
} else {
return Optional.absent();
Expand All @@ -273,7 +219,7 @@ public Optional<TaskRunner> getTaskRunner()

public Optional<TaskQueue> getTaskQueue()
{
if (leading) {
if (overlordLeaderSelector.isLeader()) {
return Optional.of(taskQueue);
} else {
return Optional.absent();
Expand All @@ -282,7 +228,7 @@ public Optional<TaskQueue> getTaskQueue()

public Optional<TaskActionClient> getTaskActionClient(Task task)
{
if (leading) {
if (overlordLeaderSelector.isLeader()) {
return Optional.of(taskActionClientFactory.create(task));
} else {
return Optional.absent();
Expand All @@ -291,7 +237,7 @@ public Optional<TaskActionClient> getTaskActionClient(Task task)

public Optional<ScalingStats> getScalingStats()
{
if (leading) {
if (overlordLeaderSelector.isLeader()) {
return taskRunner.getScalingStats();
} else {
return Optional.absent();
Expand All @@ -300,7 +246,7 @@ public Optional<ScalingStats> getScalingStats()

public Optional<SupervisorManager> getSupervisorManager()
{
if (leading) {
if (overlordLeaderSelector.isLeader()) {
return Optional.of(supervisorManager);
} else {
return Optional.absent();
Expand Down
Loading