diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index f17f6858a014..8ae9cd7a1fce 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -71,7 +71,6 @@ The indexing service also uses its own set of paths. These configs can be includ |`druid.zk.paths.indexer.announcementsPath`|Middle managers announce themselves here.|`${druid.zk.paths.indexer.base}/announcements`| |`druid.zk.paths.indexer.tasksPath`|Used to assign tasks to middle managers.|`${druid.zk.paths.indexer.base}/tasks`| |`druid.zk.paths.indexer.statusPath`|Parent path for announcement of task statuses.|`${druid.zk.paths.indexer.base}/status`| -|`druid.zk.paths.indexer.leaderLatchPath`|Used for Overlord leader election.|`${druid.zk.paths.indexer.base}/leaderLatchPath`| If `druid.zk.paths.base` and `druid.zk.paths.indexer.base` are both set, and none of the other `druid.zk.paths.*` or `druid.zk.paths.indexer.*` values are set, then the other properties will be evaluated relative to their respective `base`. For example, if `druid.zk.paths.base` is set to `/druid1` and `druid.zk.paths.indexer.base` is set to `/druid2` then `druid.zk.paths.announcementsPath` will default to `/druid1/announcements` while `druid.zk.paths.indexer.announcementsPath` will default to `/druid2/announcements`. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index f46b627ebb6e..989b97b0a2ac 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -24,7 +24,9 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.indexing.IndexingService; import io.druid.curator.discovery.ServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; import io.druid.guice.annotations.Self; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -38,15 +40,8 @@ import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.server.DruidNode; import io.druid.server.coordinator.CoordinatorOverlordServiceConfig; -import io.druid.server.initialization.IndexerZkConfig; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderSelector; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; -import org.apache.curator.framework.recipes.leader.Participant; -import org.apache.curator.framework.state.ConnectionState; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** @@ -54,15 +49,15 @@ */ public class TaskMaster { - private final LeaderSelector leaderSelector; + private final DruidLeaderSelector overlordLeaderSelector; + private final DruidLeaderSelector.Listener leadershipListener; + private final ReentrantLock giant = new ReentrantLock(true); - private final Condition mayBeStopped = giant.newCondition(); private final TaskActionClientFactory taskActionClientFactory; private final SupervisorManager supervisorManager; private final AtomicReference leaderLifecycleRef = new AtomicReference<>(null); - private volatile boolean leading = false; private volatile TaskRunner taskRunner; private volatile TaskQueue taskQueue; @@ -75,115 +70,99 @@ public TaskMaster( final TaskStorage taskStorage, final TaskActionClientFactory taskActionClientFactory, @Self final DruidNode selfNode, - final IndexerZkConfig zkPaths, final TaskRunnerFactory runnerFactory, - final CuratorFramework curator, final ServiceAnnouncer serviceAnnouncer, final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, final ServiceEmitter emitter, final SupervisorManager supervisorManager, - final OverlordHelperManager overlordHelperManager - ) + final OverlordHelperManager overlordHelperManager, + @IndexingService final DruidLeaderSelector overlordLeaderSelector + ) { this.supervisorManager = supervisorManager; this.taskActionClientFactory = taskActionClientFactory; + this.overlordLeaderSelector = overlordLeaderSelector; + final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode : selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService()); - this.leaderSelector = new LeaderSelector( - curator, - zkPaths.getLeaderLatchPath(), - new LeaderSelectorListener() - { - @Override - public void takeLeadership(CuratorFramework client) throws Exception - { - giant.lock(); - - try { - // Make sure the previous leadership cycle is really, really over. - stopLeading(); - - // I AM THE MASTER OF THE UNIVERSE. - log.info("By the power of Grayskull, I have the power!"); - taskLockbox.syncFromStorage(); - taskRunner = runnerFactory.build(); - taskQueue = new TaskQueue( - taskQueueConfig, - taskStorage, - taskRunner, - taskActionClientFactory, - taskLockbox, - emitter - ); - - // Sensible order to start stuff: - final Lifecycle leaderLifecycle = new Lifecycle(); - if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { - log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") - .emit(); - } - - leaderLifecycle.addManagedInstance(taskRunner); - leaderLifecycle.addManagedInstance(taskQueue); - leaderLifecycle.addManagedInstance(supervisorManager); - leaderLifecycle.addManagedInstance(overlordHelperManager); + this.leadershipListener = new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + giant.lock(); + + // I AM THE MASTER OF THE UNIVERSE. + log.info("By the power of Grayskull, I have the power!"); + + try { + taskLockbox.syncFromStorage(); + taskRunner = runnerFactory.build(); + taskQueue = new TaskQueue( + taskQueueConfig, + taskStorage, + taskRunner, + taskActionClientFactory, + taskLockbox, + emitter + ); + + // Sensible order to start stuff: + final Lifecycle leaderLifecycle = new Lifecycle(); + if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { + log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") + .emit(); + } - leaderLifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() throws Exception - { - serviceAnnouncer.announce(node); - } + leaderLifecycle.addManagedInstance(taskRunner); + leaderLifecycle.addManagedInstance(taskQueue); + leaderLifecycle.addManagedInstance(supervisorManager); + leaderLifecycle.addManagedInstance(overlordHelperManager); + + leaderLifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + serviceAnnouncer.announce(node); + } - @Override - public void stop() - { - serviceAnnouncer.unannounce(node); - } - } - ); - try { - leaderLifecycle.start(); - leading = true; - while (leading && !Thread.currentThread().isInterrupted()) { - mayBeStopped.await(); + @Override + public void stop() + { + serviceAnnouncer.unannounce(node); } } - catch (InterruptedException e) { - log.debug("Interrupted while waiting"); - // Suppress so we can bow out gracefully - } - finally { - log.info("Bowing out!"); - stopLeading(); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to lead").emit(); - throw Throwables.propagate(e); - } - finally { - giant.unlock(); - } - } + ); - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) - { - if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) { - // disconnected from zk. assume leadership is gone - stopLeading(); - } - } + leaderLifecycle.start(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + giant.unlock(); } - ); + } - leaderSelector.setId(node.getHostAndPortToUse()); - leaderSelector.autoRequeue(); + @Override + public void stopBeingLeader() + { + giant.lock(); + try { + final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); + if (leaderLifecycle != null) { + leaderLifecycle.stop(); + } + } + finally { + giant.unlock(); + } + } + }; } /** @@ -195,7 +174,7 @@ public void start() giant.lock(); try { - leaderSelector.start(); + overlordLeaderSelector.registerListener(leadershipListener); } finally { giant.unlock(); @@ -212,30 +191,7 @@ public void stop() giant.lock(); try { - leaderSelector.close(); - stopLeading(); - } - finally { - giant.unlock(); - } - } - - /** - * Relinquish leadership. May be called multiple times, even when not currently the leader. - */ - private void stopLeading() - { - giant.lock(); - - try { - if (leading) { - leading = false; - mayBeStopped.signalAll(); - final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); - if (leaderLifecycle != null) { - leaderLifecycle.stop(); - } - } + overlordLeaderSelector.unregisterListener(); } finally { giant.unlock(); @@ -244,27 +200,17 @@ private void stopLeading() public boolean isLeader() { - return leading; + return overlordLeaderSelector.isLeader(); } public String getCurrentLeader() { - try { - final Participant leader = leaderSelector.getLeader(); - if (leader != null && leader.isLeader()) { - return leader.getId(); - } else { - return null; - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } + return overlordLeaderSelector.getCurrentLeader(); } public Optional getTaskRunner() { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return Optional.of(taskRunner); } else { return Optional.absent(); @@ -273,7 +219,7 @@ public Optional getTaskRunner() public Optional getTaskQueue() { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return Optional.of(taskQueue); } else { return Optional.absent(); @@ -282,7 +228,7 @@ public Optional getTaskQueue() public Optional getTaskActionClient(Task task) { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return Optional.of(taskActionClientFactory.create(task)); } else { return Optional.absent(); @@ -291,7 +237,7 @@ public Optional getTaskActionClient(Task task) public Optional getScalingStats() { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return taskRunner.getScalingStats(); } else { return Optional.absent(); @@ -300,7 +246,7 @@ public Optional getScalingStats() public Optional getSupervisorManager() { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return Optional.of(supervisorManager); } else { return Optional.absent(); diff --git a/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java b/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java index 50c45f443f61..eee927116d04 100644 --- a/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java +++ b/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java @@ -35,8 +35,7 @@ public IndexerZkConfig( @JsonProperty("base") String base, @JsonProperty("announcementsPath") String announcementsPath, @JsonProperty("tasksPath") String tasksPath, - @JsonProperty("statusPath") String statusPath, - @JsonProperty("leaderLatchPath") String leaderLatchPath + @JsonProperty("statusPath") String statusPath ) { this.zkPathsConfig = zkPathsConfig; @@ -44,7 +43,6 @@ public IndexerZkConfig( this.announcementsPath = announcementsPath; this.tasksPath = tasksPath; this.statusPath = statusPath; - this.leaderLatchPath = leaderLatchPath; } @JacksonInject @@ -62,9 +60,6 @@ public IndexerZkConfig( @JsonProperty private final String statusPath; - @JsonProperty - private final String leaderLatchPath; - private String defaultIndexerPath(final String subPath) { return ZKPaths.makePath(getBase(), subPath); @@ -90,11 +85,6 @@ public String getStatusPath() return statusPath == null ? defaultIndexerPath("status") : statusPath; } - public String getLeaderLatchPath() - { - return leaderLatchPath == null ? defaultIndexerPath("leaderLatchPath") : leaderLatchPath; - } - public ZkPathsConfig getZkPathsConfig() { return zkPathsConfig; @@ -120,9 +110,6 @@ public boolean equals(Object o) if (base != null ? !base.equals(that.base) : that.base != null) { return false; } - if (leaderLatchPath != null ? !leaderLatchPath.equals(that.leaderLatchPath) : that.leaderLatchPath != null) { - return false; - } if (statusPath != null ? !statusPath.equals(that.statusPath) : that.statusPath != null) { return false; } @@ -144,7 +131,6 @@ public int hashCode() result = 31 * result + (announcementsPath != null ? announcementsPath.hashCode() : 0); result = 31 * result + (tasksPath != null ? tasksPath.hashCode() : 0); result = 31 * result + (statusPath != null ? statusPath.hashCode() : 0); - result = 31 * result + (leaderLatchPath != null ? leaderLatchPath.hashCode() : 0); return result; } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index aeff81a1a17a..3b6f68c4fa5c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -125,7 +125,7 @@ public String getBase() { return basePath; } - }, null, null, null, null, null + }, null, null, null, null ), cf, new PathChildrenCacheFactory.Builder(), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 2f2a79b09908..94b2fb1f2830 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -31,6 +31,7 @@ import io.druid.concurrent.Execs; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.discovery.NoopServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -154,11 +155,10 @@ public void setUp() throws Exception taskCompletionCountDownLatches[0] = new CountDownLatch(1); taskCompletionCountDownLatches[1] = new CountDownLatch(1); announcementLatch = new CountDownLatch(1); - IndexerZkConfig indexerZkConfig = new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null, null); + IndexerZkConfig indexerZkConfig = new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null); setupServerAndCurator(); curator.start(); curator.blockUntilConnected(); - curator.create().creatingParentsIfNeeded().forPath(indexerZkConfig.getLeaderLatchPath()); druidNode = new DruidNode("hey", "what", 1234, null, new ServerConfig()); ServiceEmitter serviceEmitter = new NoopServiceEmitter(); taskMaster = new TaskMaster( @@ -167,7 +167,6 @@ public void setUp() throws Exception taskStorage, taskActionClientFactory, druidNode, - indexerZkConfig, new TaskRunnerFactory() { @Override @@ -176,7 +175,6 @@ public MockTaskRunner build() return new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches); } }, - curator, new NoopServiceAnnouncer() { @Override @@ -188,7 +186,8 @@ public void announce(DruidNode node) new CoordinatorOverlordServiceConfig(null, null), serviceEmitter, supervisorManager, - EasyMock.createNiceMock(OverlordHelperManager.class) + EasyMock.createNiceMock(OverlordHelperManager.class), + new TestDruidLeaderSelector() ); EmittingLogger.registerEmitter(serviceEmitter); } @@ -426,4 +425,44 @@ public void start() //Do nothing } } + + private static class TestDruidLeaderSelector implements DruidLeaderSelector + { + private volatile Listener listener; + private volatile String leader; + + @Override + public String getCurrentLeader() + { + return leader; + } + + @Override + public boolean isLeader() + { + return leader != null; + } + + @Override + public int localTerm() + { + return 0; + } + + @Override + public void registerListener(Listener listener) + { + this.listener = listener; + + leader = "what:1234"; + listener.becomeLeader(); + } + + @Override + public void unregisterListener() + { + leader = null; + listener.stopBeingLeader(); + } + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 82cacf88dcde..2e8cee71a460 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -126,7 +126,7 @@ public String getBase() { return basePath; } - }, null, null, null, null, null + }, null, null, null, null ), new TestRemoteTaskRunnerConfig(new Period("PT1S")), cf, diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java index 44e83fc8d529..8e8b672636d9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java @@ -87,7 +87,7 @@ public String getBase() { return basePath; } - }, null, null, null, null, null), + }, null, null, null, null), new RemoteTaskRunnerConfig(), cf, worker diff --git a/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java b/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java index df45c1b99e09..ac682383abd2 100644 --- a/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java +++ b/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java @@ -156,7 +156,7 @@ public void testNullConfig() ); indexerZkConfig.inject(propertyValues, configurator); - Assert.assertEquals("/druid/indexer/leaderLatchPath", indexerZkConfig.get().get().getLeaderLatchPath()); + Assert.assertEquals("/druid/indexer/tasks", indexerZkConfig.get().get().getTasksPath()); } @Test @@ -245,7 +245,7 @@ public void testExactConfig() ZkPathsConfig zkPathsConfig1 = zkPathsConfig.get().get(); - IndexerZkConfig indexerZkConfig = new IndexerZkConfig(zkPathsConfig1, null, null, null, null, null); + IndexerZkConfig indexerZkConfig = new IndexerZkConfig(zkPathsConfig1, null, null, null, null); Assert.assertEquals("/druid/metrics/indexer", indexerZkConfig.getBase()); Assert.assertEquals("/druid/metrics/indexer/announcements", indexerZkConfig.getAnnouncementsPath()); @@ -262,8 +262,7 @@ public void testFullOverride() throws Exception "/druid/prod", "/druid/prod/a", "/druid/prod/t", - "/druid/prod/s", - "/druid/prod/l" + "/druid/prod/s" ); Map value = mapper.readValue( @@ -276,8 +275,7 @@ public void testFullOverride() throws Exception value.get("base"), value.get("announcementsPath"), value.get("tasksPath"), - value.get("statusPath"), - value.get("leaderLatchPath") + value.get("statusPath") ); Assert.assertEquals(indexerZkConfig, newConfig); diff --git a/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java index 05836d66b714..14b48da5f7d5 100644 --- a/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java +++ b/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java @@ -35,6 +35,9 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Client; +import io.druid.guice.annotations.Self; +import io.druid.server.DruidNode; +import io.druid.server.initialization.ServerConfig; import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfigProvider; import io.druid.testing.IntegrationTestingCuratorConfig; @@ -52,6 +55,11 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.test.config", IntegrationTestingConfigProvider.class); binder.bind(CuratorConfig.class).to(IntegrationTestingCuratorConfig.class); + + // Bind DruidNode instance to make Guice happy. This instance is currently unused. + binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance( + new DruidNode("integration-tests", "localhost", 9191, null, null, new ServerConfig()) + ); } @Provides diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java new file mode 100644 index 000000000000..415360c6f170 --- /dev/null +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -0,0 +1,206 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator.discovery; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; +import io.druid.concurrent.LifecycleLock; +import io.druid.discovery.DruidLeaderSelector; +import io.druid.guice.annotations.Self; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.server.DruidNode; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.leader.Participant; + +import javax.annotation.Nullable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class CuratorDruidLeaderSelector implements DruidLeaderSelector +{ + private static final EmittingLogger log = new EmittingLogger(CuratorDruidLeaderSelector.class); + + private final LifecycleLock lifecycleLock = new LifecycleLock(); + + private final DruidNode self; + private final CuratorFramework curator; + private final String latchPath; + + private ExecutorService listenerExecutor; + + private DruidLeaderSelector.Listener listener = null; + private final AtomicReference leaderLatch = new AtomicReference<>(); + + private volatile boolean leader = false; + private volatile int term = 0; + + public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self, String latchPath) + { + this.curator = curator; + this.self = self; + this.latchPath = latchPath; + } + + private LeaderLatch createNewLeaderLatch() + { + final LeaderLatch newLeaderLatch = new LeaderLatch( + curator, latchPath, self.getHostAndPortToUse() + ); + + newLeaderLatch.addListener( + new LeaderLatchListener() + { + @Override + public void isLeader() + { + try { + if (leader) { + log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); + return; + } + + leader = true; + term++; + listener.becomeLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); + + // give others a chance to become leader. + final LeaderLatch oldLatch = createNewLeaderLatch(); + CloseQuietly.close(oldLatch); + leader = false; + try { + //Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + leaderLatch.get().start(); + } + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); + } + } + } + + @Override + public void notLeader() + { + try { + if (!leader) { + log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); + return; + } + + leader = false; + listener.stopBeingLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); + } + } + }, + listenerExecutor + ); + + return leaderLatch.getAndSet(newLeaderLatch); + } + + @Nullable + @Override + public String getCurrentLeader() + { + if (!lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + throw new ISE("not started"); + } + + try { + final LeaderLatch latch = leaderLatch.get(); + + Participant participant = latch.getLeader(); + if (participant.isLeader()) { + return participant.getId(); + } + + return null; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean isLeader() + { + return leader; + } + + @Override + public int localTerm() + { + return term; + } + + @Override + public void registerListener(DruidLeaderSelector.Listener listener) + { + Preconditions.checkArgument(listener != null, "listener is null."); + + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + try { + this.listener = listener; + this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath)); + + createNewLeaderLatch(); + leaderLatch.get().start(); + + lifecycleLock.started(); + } + catch (Exception ex) { + throw Throwables.propagate(ex); + } + finally { + lifecycleLock.exitStart(); + } + } + + @Override + public void unregisterListener() + { + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + CloseQuietly.close(leaderLatch.get()); + listenerExecutor.shutdownNow(); + } +} diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index f30a2060e420..086e166d403f 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -23,13 +23,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.TypeLiteral; import com.google.inject.name.Named; import com.google.inject.name.Names; +import io.druid.client.coordinator.Coordinator; +import io.druid.client.indexing.IndexingService; +import io.druid.discovery.DruidLeaderSelector; import io.druid.discovery.DruidNodeAnnouncer; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.DruidBinders; @@ -38,11 +43,14 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.PolyBind; +import io.druid.guice.annotations.Self; import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.server.DruidNode; import io.druid.server.initialization.CuratorDiscoveryConfig; +import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.DownInstancePolicy; import org.apache.curator.x.discovery.InstanceFilter; import org.apache.curator.x.discovery.ProviderStrategy; @@ -64,6 +72,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.function.Function; /** * The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be @@ -161,6 +170,14 @@ public void configure(Binder binder) binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidNodeDiscoveryProvider.class), CURATOR_KEY ); + PolyBind.createChoiceWithDefault( + binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidLeaderSelector.class, () -> Coordinator.class), CURATOR_KEY + ); + + PolyBind.createChoiceWithDefault( + binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidLeaderSelector.class, () -> IndexingService.class), CURATOR_KEY + ); + PolyBind.optionBinder(binder, Key.get(DruidNodeDiscoveryProvider.class)) .addBinding(CURATOR_KEY) .to(CuratorDruidNodeDiscoveryProvider.class) @@ -170,6 +187,20 @@ public void configure(Binder binder) .addBinding(CURATOR_KEY) .to(CuratorDruidNodeAnnouncer.class) .in(LazySingleton.class); + + PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, Coordinator.class)) + .addBinding(CURATOR_KEY) + .toProvider(new DruidLeaderSelectorProvider( + (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getCoordinatorPath(), "_COORDINATOR")) + ) + .in(LazySingleton.class); + + PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class)) + .addBinding(CURATOR_KEY) + .toProvider(new DruidLeaderSelectorProvider( + (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "_OVERLORD")) + ) + .in(LazySingleton.class); } @Provides @@ -476,4 +507,34 @@ public void close() throws IOException // nothing } } + + private static class DruidLeaderSelectorProvider implements Provider + { + @Inject + private CuratorFramework curatorFramework; + + @Inject + @Self + private DruidNode druidNode; + + @Inject + private ZkPathsConfig zkPathsConfig; + + private final Function latchPathFn; + + DruidLeaderSelectorProvider(Function latchPathFn) + { + this.latchPathFn = latchPathFn; + } + + @Override + public DruidLeaderSelector get() + { + return new CuratorDruidLeaderSelector( + curatorFramework, + druidNode, + latchPathFn.apply(zkPathsConfig) + ); + } + } } diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java new file mode 100644 index 000000000000..02d63dd32a8f --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java @@ -0,0 +1,87 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +import javax.annotation.Nullable; + +/** + * Interface for supporting Overlord and Coordinator Leader Elections in TaskMaster and DruidCoordinator + * which expect appropriate implementation available in guice annotated with @IndexingService and @Coordinator + * respectively. + * + * Usage is as follow. + * On lifecycle start: + * druidLeaderSelector.registerListener(myListener); + * + * On lifecycle stop: + * druidLeaderSelector.unregisterListener(); + */ +public interface DruidLeaderSelector +{ + + /** + * Get ID of current Leader. Returns NULL if it can't find the leader. + * Note that it is possible for leadership to change right after this call returns, so caller would get wrong + * leader. + */ + @Nullable + String getCurrentLeader(); + + /** + * Returns true if this node is elected leader from underlying system's point of view. For example if curator + * is used to implement this then true would be returned when curator believes this node to be the leader. + * Note that it is possible for leadership to change right after this call returns, so caller would get wrong + * status. + */ + boolean isLeader(); + + /** + * Implementation would increment it everytime it becomes leader. This allows users to start a long running + * task when they become leader and be able to intermittently check that they are still leader from same + * term when they started. DruidCoordinator class uses it to do intermittent checks and stop the activity + * as needed. + */ + int localTerm(); + + /** + * Register the listener for watching leadership notifications. It should only be called once. + */ + void registerListener(Listener listener); + + /** + * Unregisters the listener. + */ + void unregisterListener(); + + interface Listener + { + /** + * Notification that this node should start activities to be done by the leader. if this method throws + * exception then implementation would try to resign its leadership in the underlying system such as curator. + */ + void becomeLeader(); + + /** + * Notification that shid node should stop acitivities which are done by the leader. If this method throws + * exception then an alert is created. + */ + void stopBeingLeader(); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 59200171ed7a..a30d623a0a1a 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -38,10 +38,12 @@ import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.client.ServerInventoryView; +import io.druid.client.coordinator.Coordinator; import io.druid.client.indexing.IndexingServiceClient; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; import io.druid.curator.discovery.ServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.CoordinatorIndexingServiceHelper; import io.druid.guice.annotations.Self; @@ -50,7 +52,6 @@ import io.druid.java.util.common.Pair; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; import io.druid.java.util.common.concurrent.ScheduledExecutors; -import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.FunctionalIterable; import io.druid.java.util.common.lifecycle.LifecycleStart; @@ -73,15 +74,11 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderLatch; -import org.apache.curator.framework.recipes.leader.LeaderLatchListener; -import org.apache.curator.framework.recipes.leader.Participant; import org.apache.curator.utils.ZKPaths; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; -import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -90,15 +87,12 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicReference; /** */ @ManageLifecycle public class DruidCoordinator { - public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR"; - public static Comparator SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart()) .onResultOf( new Function() @@ -126,16 +120,14 @@ public Interval apply(DataSegment segment) private final ScheduledExecutorService exec; private final LoadQueueTaskMaster taskMaster; private final Map loadManagementPeons; - private final AtomicReference leaderLatch; private final ServiceAnnouncer serviceAnnouncer; private final DruidNode self; private final Set indexingServiceHelpers; private volatile boolean started = false; - private volatile int leaderCounter = 0; - private volatile boolean leader = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; private final BalancerStrategyFactory factory; private final LookupCoordinatorManager lookupCoordinatorManager; + private final DruidLeaderSelector coordLeaderSelector; @Inject public DruidCoordinator( @@ -154,7 +146,8 @@ public DruidCoordinator( @Self DruidNode self, @CoordinatorIndexingServiceHelper Set indexingServiceHelpers, BalancerStrategyFactory factory, - LookupCoordinatorManager lookupCoordinatorManager + LookupCoordinatorManager lookupCoordinatorManager, + @Coordinator DruidLeaderSelector coordLeaderSelector ) { this( @@ -174,7 +167,8 @@ public DruidCoordinator( Maps.newConcurrentMap(), indexingServiceHelpers, factory, - lookupCoordinatorManager + lookupCoordinatorManager, + coordLeaderSelector ); } @@ -195,7 +189,8 @@ public DruidCoordinator( ConcurrentMap loadQueuePeonMap, Set indexingServiceHelpers, BalancerStrategyFactory factory, - LookupCoordinatorManager lookupCoordinatorManager + LookupCoordinatorManager lookupCoordinatorManager, + DruidLeaderSelector coordLeaderSelector ) { this.config = config; @@ -215,15 +210,15 @@ public DruidCoordinator( this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); - this.leaderLatch = new AtomicReference<>(null); this.loadManagementPeons = loadQueuePeonMap; this.factory = factory; this.lookupCoordinatorManager = lookupCoordinatorManager; + this.coordLeaderSelector = coordLeaderSelector; } public boolean isLeader() { - return leader; + return coordLeaderSelector.isLeader(); } public Map getLoadManagementPeons() @@ -265,7 +260,6 @@ public Map getLoadManagementPeons() return retVal; } - public Object2LongMap getSegmentAvailability() { final Object2LongOpenHashMap retVal = new Object2LongOpenHashMap<>(); @@ -345,23 +339,7 @@ public void enableDatasource(String ds) public String getCurrentLeader() { - try { - final LeaderLatch latch = leaderLatch.get(); - - if (latch == null) { - return null; - } - - Participant participant = latch.getLeader(); - if (participant.isLeader()) { - return participant.getId(); - } - - return null; - } - catch (Exception e) { - throw Throwables.propagate(e); - } + return coordLeaderSelector.getCurrentLeader(); } public void moveSegment( @@ -498,41 +476,23 @@ public void start() } started = true; - createNewLeaderLatch(); - try { - leaderLatch.get().start(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - - private LeaderLatch createNewLeaderLatch() - { - final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHostAndPortToUse() - ); - - newLeaderLatch.addListener( - new LeaderLatchListener() - { - @Override - public void isLeader() + coordLeaderSelector.registerListener( + new DruidLeaderSelector.Listener() { - DruidCoordinator.this.becomeLeader(); - } + @Override + public void becomeLeader() + { + DruidCoordinator.this.becomeLeader(); + } - @Override - public void notLeader() - { - DruidCoordinator.this.stopBeingLeader(); + @Override + public void stopBeingLeader() + { + DruidCoordinator.this.stopBeingLeader(); + } } - }, - Execs.singleThreaded("CoordinatorLeader-%s") - ); - - return leaderLatch.getAndSet(newLeaderLatch); + ); + } } @LifecycleStop @@ -543,14 +503,7 @@ public void stop() return; } - stopBeingLeader(); - - try { - leaderLatch.get().close(); - } - catch (IOException e) { - log.warn(e, "Unable to close leaderLatch, ignoring"); - } + coordLeaderSelector.unregisterListener(); started = false; @@ -567,103 +520,76 @@ private void becomeLeader() log.info("I am the leader of the coordinators, all must bow!"); log.info("Starting coordination in [%s]", config.getCoordinatorStartDelay()); - try { - leaderCounter++; - leader = true; - metadataSegmentManager.start(); - metadataRuleManager.start(); - serviceAnnouncer.announce(self); - final int startingLeaderCounter = leaderCounter; - - final List> coordinatorRunnables = Lists.newArrayList(); + + metadataSegmentManager.start(); + metadataRuleManager.start(); + serviceAnnouncer.announce(self); + final int startingLeaderCounter = coordLeaderSelector.localTerm(); + + final List> coordinatorRunnables = Lists.newArrayList(); + coordinatorRunnables.add( + Pair.of( + new CoordinatorHistoricalManagerRunnable(startingLeaderCounter), + config.getCoordinatorPeriod() + ) + ); + if (indexingServiceClient != null) { coordinatorRunnables.add( Pair.of( - new CoordinatorHistoricalManagerRunnable(startingLeaderCounter), - config.getCoordinatorPeriod() + new CoordinatorIndexingServiceRunnable( + makeIndexingServiceHelpers(), + startingLeaderCounter + ), + config.getCoordinatorIndexingPeriod() ) ); - if (indexingServiceClient != null) { - coordinatorRunnables.add( - Pair.of( - new CoordinatorIndexingServiceRunnable( - makeIndexingServiceHelpers(), - startingLeaderCounter - ), - config.getCoordinatorIndexingPeriod() - ) - ); - } + } - for (final Pair coordinatorRunnable : coordinatorRunnables) { - ScheduledExecutors.scheduleWithFixedDelay( - exec, - config.getCoordinatorStartDelay(), - coordinatorRunnable.rhs, - new Callable() - { - private final CoordinatorRunnable theRunnable = coordinatorRunnable.lhs; + for (final Pair coordinatorRunnable : coordinatorRunnables) { + ScheduledExecutors.scheduleWithFixedDelay( + exec, + config.getCoordinatorStartDelay(), + coordinatorRunnable.rhs, + new Callable() + { + private final CoordinatorRunnable theRunnable = coordinatorRunnable.lhs; - @Override - public ScheduledExecutors.Signal call() - { - if (leader && startingLeaderCounter == leaderCounter) { - theRunnable.run(); - } - if (leader && startingLeaderCounter == leaderCounter) { // (We might no longer be leader) - return ScheduledExecutors.Signal.REPEAT; - } else { - return ScheduledExecutors.Signal.STOP; - } + @Override + public ScheduledExecutors.Signal call() + { + if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { + theRunnable.run(); + } + if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader) + return ScheduledExecutors.Signal.REPEAT; + } else { + return ScheduledExecutors.Signal.STOP; } } - ); - } - - lookupCoordinatorManager.start(); - } - catch (Exception e) { - log.makeAlert(e, "Unable to become leader") - .emit(); - final LeaderLatch oldLatch = createNewLeaderLatch(); - CloseQuietly.close(oldLatch); - try { - leaderLatch.get().start(); - } - catch (Exception e1) { - // If an exception gets thrown out here, then the coordinator will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e1, "I am a zombie") - .emit(); - } + } + ); } + + lookupCoordinatorManager.start(); } } private void stopBeingLeader() { synchronized (lock) { - try { - leaderCounter++; - - log.info("I am no longer the leader..."); - - for (String server : loadManagementPeons.keySet()) { - LoadQueuePeon peon = loadManagementPeons.remove(server); - peon.stop(); - } - loadManagementPeons.clear(); - serviceAnnouncer.unannounce(self); - metadataRuleManager.stop(); - metadataSegmentManager.stop(); - lookupCoordinatorManager.stop(); + log.info("I am no longer the leader..."); - leader = false; - } - catch (Exception e) { - log.makeAlert(e, "Unable to stopBeingLeader").emit(); + for (String server : loadManagementPeons.keySet()) { + LoadQueuePeon peon = loadManagementPeons.remove(server); + peon.stop(); } + loadManagementPeons.clear(); + + serviceAnnouncer.unannounce(self); + metadataRuleManager.stop(); + metadataSegmentManager.stop(); + lookupCoordinatorManager.stop(); } } @@ -695,9 +621,8 @@ public void run() ListeningExecutorService balancerExec = null; try { synchronized (lock) { - final LeaderLatch latch = leaderLatch.get(); - if (latch == null || !latch.hasLeadership()) { - log.info("LEGGO MY EGGO. [%s] is leader.", latch == null ? null : latch.getLeader().getId()); + if (!coordLeaderSelector.isLeader()) { + log.info("LEGGO MY EGGO. [%s] is leader.", coordLeaderSelector.getCurrentLeader()); stopBeingLeader(); return; } @@ -732,7 +657,7 @@ public void run() .build(); for (DruidCoordinatorHelper helper : helpers) { // Don't read state and run state in the same helper otherwise racy conditions may exist - if (leader && startingLeaderCounter == leaderCounter) { + if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { params = helper.run(params); } } diff --git a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java index 3e8006a029d4..879876ee6d53 100644 --- a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java @@ -80,6 +80,11 @@ public String getCoordinatorPath() return (null == coordinatorPath) ? defaultPath("coordinator") : coordinatorPath; } + public String getOverlordPath() + { + return defaultPath("overlord"); + } + public String getLoadQueuePath() { return (null == loadQueuePath) ? defaultPath("loadQueue") : loadQueuePath; diff --git a/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java b/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java new file mode 100644 index 000000000000..c8b79d0d5ef7 --- /dev/null +++ b/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator.discovery; + +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.curator.CuratorTestBase; +import io.druid.discovery.DruidLeaderSelector; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.DruidNode; +import io.druid.server.initialization.ServerConfig; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class CuratorDruidLeaderSelectorTest extends CuratorTestBase +{ + private static final Logger logger = new Logger(CuratorDruidLeaderSelectorTest.class); + + @Before + public void setUp() throws Exception + { + EmittingLogger.registerEmitter(EasyMock.createNiceMock(ServiceEmitter.class)); + setupServerAndCurator(); + } + + @Test(timeout = 5000) + public void testSimple() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + + AtomicReference currLeader = new AtomicReference<>(); + + String latchPath = "/testlatchPath"; + + CuratorDruidLeaderSelector leaderSelector1 = new CuratorDruidLeaderSelector( + curator, + new DruidNode("s1", "h1", 8080, null, new ServerConfig()), + latchPath + ); + leaderSelector1.registerListener( + new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + logger.info("listener1.becomeLeader()."); + currLeader.set("h1:8080"); + throw new RuntimeException("I am Rogue."); + } + + @Override + public void stopBeingLeader() + { + logger.info("listener1.stopBeingLeader()."); + throw new RuntimeException("I said I am Rogue."); + } + } + ); + + while (!"h1:8080".equals(currLeader.get())) { + logger.info("current leader = [%s]", currLeader.get()); + Thread.sleep(100); + } + + Assert.assertTrue(leaderSelector1.localTerm() >= 1); + + CuratorDruidLeaderSelector leaderSelector2 = new CuratorDruidLeaderSelector( + curator, + new DruidNode("s2", "h2", 8080, null, new ServerConfig()), + latchPath + ); + leaderSelector2.registerListener( + new DruidLeaderSelector.Listener() + { + private AtomicInteger attemptCount = new AtomicInteger(0); + + @Override + public void becomeLeader() + { + logger.info("listener2.becomeLeader()."); + + if (attemptCount.getAndIncrement() < 1) { + throw new RuntimeException("will become leader on next attempt."); + } + + currLeader.set("h2:8080"); + } + + @Override + public void stopBeingLeader() + { + logger.info("listener2.stopBeingLeader()."); + throw new RuntimeException("I am broken."); + } + } + ); + + while (!"h2:8080".equals(currLeader.get())) { + logger.info("current leader = [%s]", currLeader.get()); + Thread.sleep(100); + } + + Assert.assertTrue(leaderSelector2.isLeader()); + Assert.assertEquals("h2:8080", leaderSelector1.getCurrentLeader()); + Assert.assertEquals(2, leaderSelector2.localTerm()); + + CuratorDruidLeaderSelector leaderSelector3 = new CuratorDruidLeaderSelector( + curator, + new DruidNode("s3", "h3", 8080, null, new ServerConfig()), + latchPath + ); + leaderSelector3.registerListener( + new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + logger.info("listener3.becomeLeader()."); + currLeader.set("h3:8080"); + } + + @Override + public void stopBeingLeader() + { + logger.info("listener3.stopBeingLeader()."); + } + } + ); + + leaderSelector2.unregisterListener(); + while (!"h3:8080".equals(currLeader.get())) { + logger.info("current leader = [%s]", currLeader.get()); + Thread.sleep(100); + } + + Assert.assertTrue(leaderSelector3.isLeader()); + Assert.assertEquals("h3:8080", leaderSelector1.getCurrentLeader()); + Assert.assertEquals(1, leaderSelector3.localTerm()); + } + + @After + public void tearDown() + { + tearDownServerAndCurator(); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index d28757dc4760..0ca43b3be9f9 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -35,6 +35,7 @@ import io.druid.concurrent.Execs; import io.druid.curator.CuratorTestBase; import io.druid.curator.discovery.NoopServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -193,7 +194,8 @@ public void unannounce(DruidNode node) loadManagementPeons, null, new CostBalancerStrategyFactory(), - EasyMock.createNiceMock(LookupCoordinatorManager.class) + EasyMock.createNiceMock(LookupCoordinatorManager.class), + new TestDruidLeaderSelector() ); } @@ -420,4 +422,43 @@ private DataSegment getSegment(String dataSource, Interval interval) ); return segment; } + + private static class TestDruidLeaderSelector implements DruidLeaderSelector + { + private volatile Listener listener; + private volatile String leader; + + @Override + public String getCurrentLeader() + { + return leader; + } + + @Override + public boolean isLeader() + { + return leader != null; + } + + @Override + public int localTerm() + { + return 0; + } + + @Override + public void registerListener(Listener listener) + { + this.listener = listener; + leader = "what:1234"; + listener.becomeLeader(); + } + + @Override + public void unregisterListener() + { + leader = null; + listener.stopBeingLeader(); + } + } }