From 15bc860c470a7722642c7102a5fe385cdc9eb362 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 16 Aug 2017 12:18:16 -0500 Subject: [PATCH 1/9] DruidLeaderSelector interface for leader election and Curator based impl. DruidCoordinator/TaskMaster are updated to use the new interface. --- .../druid/indexing/overlord/TaskMaster.java | 124 +++------ .../indexing/overlord/http/OverlordTest.java | 50 +++- .../discovery/CuratorDruidLeaderSelector.java | 203 +++++++++++++++ .../curator/discovery/DiscoveryModule.java | 61 +++++ .../druid/discovery/DruidLeaderSelector.java | 40 +++ .../server/coordinator/DruidCoordinator.java | 238 ++++++------------ .../server/initialization/ZkPathsConfig.java | 5 + .../CuratorDruidLeaderSelectorTest.java | 167 ++++++++++++ .../coordinator/DruidCoordinatorTest.java | 48 +++- 9 files changed, 688 insertions(+), 248 deletions(-) create mode 100644 server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java create mode 100644 server/src/main/java/io/druid/discovery/DruidLeaderSelector.java create mode 100644 server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index f46b627ebb6e..2842a94dcab2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -24,7 +24,9 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.indexing.IndexingService; import io.druid.curator.discovery.ServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; import io.druid.guice.annotations.Self; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -38,15 +40,8 @@ import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.server.DruidNode; import io.druid.server.coordinator.CoordinatorOverlordServiceConfig; -import io.druid.server.initialization.IndexerZkConfig; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderSelector; -import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; -import org.apache.curator.framework.recipes.leader.Participant; -import org.apache.curator.framework.state.ConnectionState; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** @@ -54,15 +49,13 @@ */ public class TaskMaster { - private final LeaderSelector leaderSelector; + private final DruidLeaderSelector overlordLeaderSelector; private final ReentrantLock giant = new ReentrantLock(true); - private final Condition mayBeStopped = giant.newCondition(); private final TaskActionClientFactory taskActionClientFactory; private final SupervisorManager supervisorManager; private final AtomicReference leaderLifecycleRef = new AtomicReference<>(null); - private volatile boolean leading = false; private volatile TaskRunner taskRunner; private volatile TaskQueue taskQueue; @@ -75,38 +68,35 @@ public TaskMaster( final TaskStorage taskStorage, final TaskActionClientFactory taskActionClientFactory, @Self final DruidNode selfNode, - final IndexerZkConfig zkPaths, final TaskRunnerFactory runnerFactory, - final CuratorFramework curator, final ServiceAnnouncer serviceAnnouncer, final CoordinatorOverlordServiceConfig coordinatorOverlordServiceConfig, final ServiceEmitter emitter, final SupervisorManager supervisorManager, - final OverlordHelperManager overlordHelperManager - ) + final OverlordHelperManager overlordHelperManager, + @IndexingService final DruidLeaderSelector overlordLeaderSelector + ) { this.supervisorManager = supervisorManager; this.taskActionClientFactory = taskActionClientFactory; + this.overlordLeaderSelector = overlordLeaderSelector; + final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode : selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService()); - this.leaderSelector = new LeaderSelector( - curator, - zkPaths.getLeaderLatchPath(), - new LeaderSelectorListener() + this.overlordLeaderSelector.registerListener( + new DruidLeaderSelector.Listener() { @Override - public void takeLeadership(CuratorFramework client) throws Exception + public void becomeLeader() { giant.lock(); - try { - // Make sure the previous leadership cycle is really, really over. - stopLeading(); + // I AM THE MASTER OF THE UNIVERSE. + log.info("By the power of Grayskull, I have the power!"); - // I AM THE MASTER OF THE UNIVERSE. - log.info("By the power of Grayskull, I have the power!"); + try { taskLockbox.syncFromStorage(); taskRunner = runnerFactory.build(); taskQueue = new TaskQueue( @@ -146,24 +136,10 @@ public void stop() } } ); - try { - leaderLifecycle.start(); - leading = true; - while (leading && !Thread.currentThread().isInterrupted()) { - mayBeStopped.await(); - } - } - catch (InterruptedException e) { - log.debug("Interrupted while waiting"); - // Suppress so we can bow out gracefully - } - finally { - log.info("Bowing out!"); - stopLeading(); - } + + leaderLifecycle.start(); } catch (Exception e) { - log.makeAlert(e, "Failed to lead").emit(); throw Throwables.propagate(e); } finally { @@ -172,18 +148,21 @@ public void stop() } @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) + public void stopBeingLeader() { - if (newState == ConnectionState.LOST || newState == ConnectionState.SUSPENDED) { - // disconnected from zk. assume leadership is gone - stopLeading(); + giant.lock(); + try { + final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); + if (leaderLifecycle != null) { + leaderLifecycle.stop(); + } + } + finally { + giant.unlock(); } } } ); - - leaderSelector.setId(node.getHostAndPortToUse()); - leaderSelector.autoRequeue(); } /** @@ -195,7 +174,7 @@ public void start() giant.lock(); try { - leaderSelector.start(); + overlordLeaderSelector.start(); } finally { giant.unlock(); @@ -212,30 +191,7 @@ public void stop() giant.lock(); try { - leaderSelector.close(); - stopLeading(); - } - finally { - giant.unlock(); - } - } - - /** - * Relinquish leadership. May be called multiple times, even when not currently the leader. - */ - private void stopLeading() - { - giant.lock(); - - try { - if (leading) { - leading = false; - mayBeStopped.signalAll(); - final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); - if (leaderLifecycle != null) { - leaderLifecycle.stop(); - } - } + overlordLeaderSelector.stop(); } finally { giant.unlock(); @@ -244,27 +200,17 @@ private void stopLeading() public boolean isLeader() { - return leading; + return overlordLeaderSelector.isLeader(); } public String getCurrentLeader() { - try { - final Participant leader = leaderSelector.getLeader(); - if (leader != null && leader.isLeader()) { - return leader.getId(); - } else { - return null; - } - } - catch (Exception e) { - throw Throwables.propagate(e); - } + return overlordLeaderSelector.getCurrentLeader(); } public Optional getTaskRunner() { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return Optional.of(taskRunner); } else { return Optional.absent(); @@ -273,7 +219,7 @@ public Optional getTaskRunner() public Optional getTaskQueue() { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return Optional.of(taskQueue); } else { return Optional.absent(); @@ -282,7 +228,7 @@ public Optional getTaskQueue() public Optional getTaskActionClient(Task task) { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return Optional.of(taskActionClientFactory.create(task)); } else { return Optional.absent(); @@ -291,7 +237,7 @@ public Optional getTaskActionClient(Task task) public Optional getScalingStats() { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return taskRunner.getScalingStats(); } else { return Optional.absent(); @@ -300,7 +246,7 @@ public Optional getScalingStats() public Optional getSupervisorManager() { - if (leading) { + if (overlordLeaderSelector.isLeader()) { return Optional.of(supervisorManager); } else { return Optional.absent(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 2f2a79b09908..b134183ba7ca 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -31,6 +31,7 @@ import io.druid.concurrent.Execs; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.discovery.NoopServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; import io.druid.indexing.common.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClientFactory; @@ -167,7 +168,6 @@ public void setUp() throws Exception taskStorage, taskActionClientFactory, druidNode, - indexerZkConfig, new TaskRunnerFactory() { @Override @@ -176,7 +176,6 @@ public MockTaskRunner build() return new MockTaskRunner(runTaskCountDownLatches, taskCompletionCountDownLatches); } }, - curator, new NoopServiceAnnouncer() { @Override @@ -188,7 +187,8 @@ public void announce(DruidNode node) new CoordinatorOverlordServiceConfig(null, null), serviceEmitter, supervisorManager, - EasyMock.createNiceMock(OverlordHelperManager.class) + EasyMock.createNiceMock(OverlordHelperManager.class), + new TestDruidLeaderSelector() ); EmittingLogger.registerEmitter(serviceEmitter); } @@ -426,4 +426,48 @@ public void start() //Do nothing } } + + private static class TestDruidLeaderSelector implements DruidLeaderSelector + { + private volatile Listener listener; + private volatile String leader; + + @Override + public String getCurrentLeader() + { + return leader; + } + + @Override + public boolean isLeader() + { + return leader != null; + } + + @Override + public int localTerm() + { + return 0; + } + + @Override + public void registerListener(Listener listener) + { + this.listener = listener; + } + + @Override + public void start() + { + leader = "what:1234"; + listener.becomeLeader(); + } + + @Override + public void stop() + { + leader = null; + listener.stopBeingLeader(); + } + } } diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java new file mode 100644 index 000000000000..f6f70ab3e0a8 --- /dev/null +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -0,0 +1,203 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator.discovery; + +import com.google.common.base.Throwables; +import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; +import io.druid.concurrent.LifecycleLock; +import io.druid.discovery.DruidLeaderSelector; +import io.druid.guice.annotations.Self; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.server.DruidNode; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderLatch; +import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.curator.framework.recipes.leader.Participant; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class CuratorDruidLeaderSelector implements DruidLeaderSelector +{ + private static final EmittingLogger log = new EmittingLogger(CuratorDruidLeaderSelector.class); + + private final LifecycleLock lifecycleLock = new LifecycleLock(); + + private final DruidNode self; + private final CuratorFramework curator; + private final String latchPath; + + private DruidLeaderSelector.Listener listener = null; + private final AtomicReference leaderLatch = new AtomicReference<>(); + + private volatile boolean leader = false; + private volatile int term = 0; + + public CuratorDruidLeaderSelector(CuratorFramework curator, @Self DruidNode self, String latchPath) + { + this.curator = curator; + this.self = self; + this.latchPath = latchPath; + } + + private LeaderLatch createNewLeaderLatch() + { + final LeaderLatch newLeaderLatch = new LeaderLatch( + curator, latchPath, self.getHostAndPortToUse() + ); + + newLeaderLatch.addListener( + new LeaderLatchListener() + { + @Override + public void isLeader() + { + try { + if (leader) { + log.warn("I'm being asked to become leader. But I am already the leader. Ignored event."); + return; + } + + leader = true; + term++; + listener.becomeLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener becomeLeader() failed. Unable to become leader").emit(); + + // give others a chance to become leader. + final LeaderLatch oldLatch = createNewLeaderLatch(); + CloseQuietly.close(oldLatch); + leader = false; + try { + //Small delay before starting the latch so that others waiting are chosen to become leader. + Thread.sleep(1000); + + leaderLatch.get().start(); + } + catch (Exception e) { + // If an exception gets thrown out here, then the node will zombie out 'cause it won't be looking for + // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but + // Curator likes to have "throws Exception" on methods so it might happen... + log.makeAlert(e, "I am a zombie").emit(); + } + } + } + + @Override + public void notLeader() + { + try { + if (!leader) { + log.warn("I'm being asked to stop being leader. But I am not the leader. Ignored event."); + return; + } + + leader = false; + listener.stopBeingLeader(); + } + catch (Exception ex) { + log.makeAlert(ex, "listener.stopBeingLeader() failed. Unable to stopBeingLeader").emit(); + } + } + }, + Execs.singleThreaded(String.format("LeaderSelector[%s]", latchPath)) + ); + + return leaderLatch.getAndSet(newLeaderLatch); + } + + @Override + public String getCurrentLeader() + { + if (!lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + throw new ISE("not started"); + } + + try { + final LeaderLatch latch = leaderLatch.get(); + + Participant participant = latch.getLeader(); + if (participant.isLeader()) { + return participant.getId(); + } + + return null; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + @Override + public boolean isLeader() + { + return leader; + } + + @Override + public int localTerm() + { + return term; + } + + @Override + public void registerListener(DruidLeaderSelector.Listener listener) + { + this.listener = listener; + } + + @Override + public void start() + { + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + try { + if (listener == null) { + throw new ISE("listener is not registered yet"); + } + + createNewLeaderLatch(); + leaderLatch.get().start(); + + lifecycleLock.started(); + } + catch (Exception ex) { + throw Throwables.propagate(ex); + } + finally { + lifecycleLock.exitStart(); + } + } + + @Override + public void stop() + { + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + CloseQuietly.close(leaderLatch.get()); + } +} diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index f30a2060e420..65a6c0bec836 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -23,13 +23,18 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.inject.Binder; +import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.google.inject.Provider; import com.google.inject.Provides; import com.google.inject.TypeLiteral; import com.google.inject.name.Named; import com.google.inject.name.Names; +import io.druid.client.coordinator.Coordinator; +import io.druid.client.indexing.IndexingService; +import io.druid.discovery.DruidLeaderSelector; import io.druid.discovery.DruidNodeAnnouncer; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.guice.DruidBinders; @@ -38,11 +43,14 @@ import io.druid.guice.LazySingleton; import io.druid.guice.LifecycleModule; import io.druid.guice.PolyBind; +import io.druid.guice.annotations.Self; import io.druid.java.util.common.lifecycle.Lifecycle; import io.druid.server.DruidNode; import io.druid.server.initialization.CuratorDiscoveryConfig; +import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.utils.CloseableExecutorService; +import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.discovery.DownInstancePolicy; import org.apache.curator.x.discovery.InstanceFilter; import org.apache.curator.x.discovery.ProviderStrategy; @@ -64,6 +72,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; +import java.util.function.Function; /** * The DiscoveryModule allows for the registration of Keys of DruidNode objects, which it intends to be @@ -161,6 +170,14 @@ public void configure(Binder binder) binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidNodeDiscoveryProvider.class), CURATOR_KEY ); + PolyBind.createChoiceWithDefault( + binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidLeaderSelector.class, () -> Coordinator.class), CURATOR_KEY + ); + + PolyBind.createChoiceWithDefault( + binder, INTERNAL_DISCOVERY_PROP, Key.get(DruidLeaderSelector.class, () -> IndexingService.class), CURATOR_KEY + ); + PolyBind.optionBinder(binder, Key.get(DruidNodeDiscoveryProvider.class)) .addBinding(CURATOR_KEY) .to(CuratorDruidNodeDiscoveryProvider.class) @@ -170,6 +187,20 @@ public void configure(Binder binder) .addBinding(CURATOR_KEY) .to(CuratorDruidNodeAnnouncer.class) .in(LazySingleton.class); + + PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, Coordinator.class)) + .addBinding(CURATOR_KEY) + .toProvider(new DruidLeaderSelectorProvider( + (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getCoordinatorPath(), "druid:coordinator")) + ) + .in(LazySingleton.class); + + PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class)) + .addBinding(CURATOR_KEY) + .toProvider(new DruidLeaderSelectorProvider( + (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "druid:overlord")) + ) + .in(LazySingleton.class); } @Provides @@ -476,4 +507,34 @@ public void close() throws IOException // nothing } } + + private static class DruidLeaderSelectorProvider implements Provider + { + @Inject + private CuratorFramework curatorFramework; + + @Inject + @Self + private DruidNode druidNode; + + @Inject + private ZkPathsConfig zkPathsConfig; + + private final Function latchPathFn; + + DruidLeaderSelectorProvider(Function latchPathFn) + { + this.latchPathFn = latchPathFn; + } + + @Override + public DruidLeaderSelector get() + { + return new CuratorDruidLeaderSelector( + curatorFramework, + druidNode, + latchPathFn.apply(zkPathsConfig) + ); + } + } } diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java new file mode 100644 index 000000000000..217659f6dde5 --- /dev/null +++ b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java @@ -0,0 +1,40 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.discovery; + +/** + */ +public interface DruidLeaderSelector +{ + String getCurrentLeader(); + boolean isLeader(); + int localTerm(); + + void registerListener(Listener listener); + + void start(); + void stop(); + + interface Listener + { + void becomeLeader(); + void stopBeingLeader(); + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 59200171ed7a..8cfc0ac68df8 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -38,10 +38,12 @@ import io.druid.client.ImmutableDruidDataSource; import io.druid.client.ImmutableDruidServer; import io.druid.client.ServerInventoryView; +import io.druid.client.coordinator.Coordinator; import io.druid.client.indexing.IndexingServiceClient; import io.druid.common.config.JacksonConfigManager; import io.druid.concurrent.Execs; import io.druid.curator.discovery.ServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.CoordinatorIndexingServiceHelper; import io.druid.guice.annotations.Self; @@ -50,7 +52,6 @@ import io.druid.java.util.common.Pair; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; import io.druid.java.util.common.concurrent.ScheduledExecutors; -import io.druid.java.util.common.guava.CloseQuietly; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.FunctionalIterable; import io.druid.java.util.common.lifecycle.LifecycleStart; @@ -73,15 +74,11 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap; import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.leader.LeaderLatch; -import org.apache.curator.framework.recipes.leader.LeaderLatchListener; -import org.apache.curator.framework.recipes.leader.Participant; import org.apache.curator.utils.ZKPaths; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; -import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -90,7 +87,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicReference; /** */ @@ -126,16 +122,14 @@ public Interval apply(DataSegment segment) private final ScheduledExecutorService exec; private final LoadQueueTaskMaster taskMaster; private final Map loadManagementPeons; - private final AtomicReference leaderLatch; private final ServiceAnnouncer serviceAnnouncer; private final DruidNode self; private final Set indexingServiceHelpers; private volatile boolean started = false; - private volatile int leaderCounter = 0; - private volatile boolean leader = false; private volatile SegmentReplicantLookup segmentReplicantLookup = null; private final BalancerStrategyFactory factory; private final LookupCoordinatorManager lookupCoordinatorManager; + private final DruidLeaderSelector coordLeaderSelector; @Inject public DruidCoordinator( @@ -154,7 +148,8 @@ public DruidCoordinator( @Self DruidNode self, @CoordinatorIndexingServiceHelper Set indexingServiceHelpers, BalancerStrategyFactory factory, - LookupCoordinatorManager lookupCoordinatorManager + LookupCoordinatorManager lookupCoordinatorManager, + @Coordinator DruidLeaderSelector coordLeaderSelector ) { this( @@ -174,7 +169,8 @@ public DruidCoordinator( Maps.newConcurrentMap(), indexingServiceHelpers, factory, - lookupCoordinatorManager + lookupCoordinatorManager, + coordLeaderSelector ); } @@ -195,7 +191,8 @@ public DruidCoordinator( ConcurrentMap loadQueuePeonMap, Set indexingServiceHelpers, BalancerStrategyFactory factory, - LookupCoordinatorManager lookupCoordinatorManager + LookupCoordinatorManager lookupCoordinatorManager, + DruidLeaderSelector coordLeaderSelector ) { this.config = config; @@ -215,15 +212,15 @@ public DruidCoordinator( this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d"); - this.leaderLatch = new AtomicReference<>(null); this.loadManagementPeons = loadQueuePeonMap; this.factory = factory; this.lookupCoordinatorManager = lookupCoordinatorManager; + this.coordLeaderSelector = coordLeaderSelector; } public boolean isLeader() { - return leader; + return coordLeaderSelector.isLeader(); } public Map getLoadManagementPeons() @@ -265,7 +262,6 @@ public Map getLoadManagementPeons() return retVal; } - public Object2LongMap getSegmentAvailability() { final Object2LongOpenHashMap retVal = new Object2LongOpenHashMap<>(); @@ -345,23 +341,7 @@ public void enableDatasource(String ds) public String getCurrentLeader() { - try { - final LeaderLatch latch = leaderLatch.get(); - - if (latch == null) { - return null; - } - - Participant participant = latch.getLeader(); - if (participant.isLeader()) { - return participant.getId(); - } - - return null; - } - catch (Exception e) { - throw Throwables.propagate(e); - } + return coordLeaderSelector.getCurrentLeader(); } public void moveSegment( @@ -498,41 +478,24 @@ public void start() } started = true; - createNewLeaderLatch(); - try { - leaderLatch.get().start(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - - private LeaderLatch createNewLeaderLatch() - { - final LeaderLatch newLeaderLatch = new LeaderLatch( - curator, ZKPaths.makePath(zkPaths.getCoordinatorPath(), COORDINATOR_OWNER_NODE), self.getHostAndPortToUse() - ); - - newLeaderLatch.addListener( - new LeaderLatchListener() - { - @Override - public void isLeader() + coordLeaderSelector.registerListener( + new DruidLeaderSelector.Listener() { - DruidCoordinator.this.becomeLeader(); - } + @Override + public void becomeLeader() + { + DruidCoordinator.this.becomeLeader(); + } - @Override - public void notLeader() - { - DruidCoordinator.this.stopBeingLeader(); + @Override + public void stopBeingLeader() + { + DruidCoordinator.this.stopBeingLeader(); + } } - }, - Execs.singleThreaded("CoordinatorLeader-%s") - ); - - return leaderLatch.getAndSet(newLeaderLatch); + ); + coordLeaderSelector.start(); + } } @LifecycleStop @@ -543,14 +506,7 @@ public void stop() return; } - stopBeingLeader(); - - try { - leaderLatch.get().close(); - } - catch (IOException e) { - log.warn(e, "Unable to close leaderLatch, ignoring"); - } + coordLeaderSelector.stop(); started = false; @@ -567,103 +523,76 @@ private void becomeLeader() log.info("I am the leader of the coordinators, all must bow!"); log.info("Starting coordination in [%s]", config.getCoordinatorStartDelay()); - try { - leaderCounter++; - leader = true; - metadataSegmentManager.start(); - metadataRuleManager.start(); - serviceAnnouncer.announce(self); - final int startingLeaderCounter = leaderCounter; - - final List> coordinatorRunnables = Lists.newArrayList(); + + metadataSegmentManager.start(); + metadataRuleManager.start(); + serviceAnnouncer.announce(self); + final int startingLeaderCounter = coordLeaderSelector.localTerm(); + + final List> coordinatorRunnables = Lists.newArrayList(); + coordinatorRunnables.add( + Pair.of( + new CoordinatorHistoricalManagerRunnable(startingLeaderCounter), + config.getCoordinatorPeriod() + ) + ); + if (indexingServiceClient != null) { coordinatorRunnables.add( Pair.of( - new CoordinatorHistoricalManagerRunnable(startingLeaderCounter), - config.getCoordinatorPeriod() + new CoordinatorIndexingServiceRunnable( + makeIndexingServiceHelpers(), + startingLeaderCounter + ), + config.getCoordinatorIndexingPeriod() ) ); - if (indexingServiceClient != null) { - coordinatorRunnables.add( - Pair.of( - new CoordinatorIndexingServiceRunnable( - makeIndexingServiceHelpers(), - startingLeaderCounter - ), - config.getCoordinatorIndexingPeriod() - ) - ); - } + } - for (final Pair coordinatorRunnable : coordinatorRunnables) { - ScheduledExecutors.scheduleWithFixedDelay( - exec, - config.getCoordinatorStartDelay(), - coordinatorRunnable.rhs, - new Callable() - { - private final CoordinatorRunnable theRunnable = coordinatorRunnable.lhs; + for (final Pair coordinatorRunnable : coordinatorRunnables) { + ScheduledExecutors.scheduleWithFixedDelay( + exec, + config.getCoordinatorStartDelay(), + coordinatorRunnable.rhs, + new Callable() + { + private final CoordinatorRunnable theRunnable = coordinatorRunnable.lhs; - @Override - public ScheduledExecutors.Signal call() - { - if (leader && startingLeaderCounter == leaderCounter) { - theRunnable.run(); - } - if (leader && startingLeaderCounter == leaderCounter) { // (We might no longer be leader) - return ScheduledExecutors.Signal.REPEAT; - } else { - return ScheduledExecutors.Signal.STOP; - } + @Override + public ScheduledExecutors.Signal call() + { + if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { + theRunnable.run(); + } + if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader) + return ScheduledExecutors.Signal.REPEAT; + } else { + return ScheduledExecutors.Signal.STOP; } } - ); - } - - lookupCoordinatorManager.start(); - } - catch (Exception e) { - log.makeAlert(e, "Unable to become leader") - .emit(); - final LeaderLatch oldLatch = createNewLeaderLatch(); - CloseQuietly.close(oldLatch); - try { - leaderLatch.get().start(); - } - catch (Exception e1) { - // If an exception gets thrown out here, then the coordinator will zombie out 'cause it won't be looking for - // the latch anymore. I don't believe it's actually possible for an Exception to throw out here, but - // Curator likes to have "throws Exception" on methods so it might happen... - log.makeAlert(e1, "I am a zombie") - .emit(); - } + } + ); } + + lookupCoordinatorManager.start(); } } private void stopBeingLeader() { synchronized (lock) { - try { - leaderCounter++; - - log.info("I am no longer the leader..."); - - for (String server : loadManagementPeons.keySet()) { - LoadQueuePeon peon = loadManagementPeons.remove(server); - peon.stop(); - } - loadManagementPeons.clear(); - serviceAnnouncer.unannounce(self); - metadataRuleManager.stop(); - metadataSegmentManager.stop(); - lookupCoordinatorManager.stop(); + log.info("I am no longer the leader..."); - leader = false; - } - catch (Exception e) { - log.makeAlert(e, "Unable to stopBeingLeader").emit(); + for (String server : loadManagementPeons.keySet()) { + LoadQueuePeon peon = loadManagementPeons.remove(server); + peon.stop(); } + loadManagementPeons.clear(); + + serviceAnnouncer.unannounce(self); + metadataRuleManager.stop(); + metadataSegmentManager.stop(); + lookupCoordinatorManager.stop(); } } @@ -695,9 +624,8 @@ public void run() ListeningExecutorService balancerExec = null; try { synchronized (lock) { - final LeaderLatch latch = leaderLatch.get(); - if (latch == null || !latch.hasLeadership()) { - log.info("LEGGO MY EGGO. [%s] is leader.", latch == null ? null : latch.getLeader().getId()); + if (!coordLeaderSelector.isLeader()) { + log.info("LEGGO MY EGGO. [%s] is leader.", coordLeaderSelector.getCurrentLeader()); stopBeingLeader(); return; } @@ -732,7 +660,7 @@ public void run() .build(); for (DruidCoordinatorHelper helper : helpers) { // Don't read state and run state in the same helper otherwise racy conditions may exist - if (leader && startingLeaderCounter == leaderCounter) { + if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) { params = helper.run(params); } } diff --git a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java index 3e8006a029d4..879876ee6d53 100644 --- a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java @@ -80,6 +80,11 @@ public String getCoordinatorPath() return (null == coordinatorPath) ? defaultPath("coordinator") : coordinatorPath; } + public String getOverlordPath() + { + return defaultPath("overlord"); + } + public String getLoadQueuePath() { return (null == loadQueuePath) ? defaultPath("loadQueue") : loadQueuePath; diff --git a/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java b/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java new file mode 100644 index 000000000000..234a96f17df6 --- /dev/null +++ b/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.curator.discovery; + +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.curator.CuratorTestBase; +import io.druid.discovery.DruidLeaderSelector; +import io.druid.java.util.common.logger.Logger; +import io.druid.server.DruidNode; +import io.druid.server.initialization.ServerConfig; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class CuratorDruidLeaderSelectorTest extends CuratorTestBase +{ + private static final Logger logger = new Logger(CuratorDruidLeaderSelectorTest.class); + + @Before + public void setUp() throws Exception + { + EmittingLogger.registerEmitter(EasyMock.createNiceMock(ServiceEmitter.class)); + setupServerAndCurator(); + } + + @Test(timeout = 5000) + public void testSimple() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + + AtomicReference currLeader = new AtomicReference<>(); + + String latchPath = "/testlatchPath"; + + CuratorDruidLeaderSelector leaderSelector1 = new CuratorDruidLeaderSelector( + curator, + new DruidNode("s1", "h1", 8080, null, new ServerConfig()), + latchPath + ); + leaderSelector1.registerListener( + new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + logger.info("listener1.becomeLeader()."); + currLeader.set("h1:8080"); + throw new RuntimeException("I am Rogue."); + } + + @Override + public void stopBeingLeader() + { + logger.info("listener1.stopBeingLeader()."); + throw new RuntimeException("I said I am Rogue."); + } + } + ); + + CuratorDruidLeaderSelector leaderSelector2 = new CuratorDruidLeaderSelector( + curator, + new DruidNode("s2", "h2", 8080, null, new ServerConfig()), + latchPath + ); + leaderSelector2.registerListener( + new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + logger.info("listener2.becomeLeader()."); + currLeader.set("h2:8080"); + } + + @Override + public void stopBeingLeader() + { + logger.info("listener2.stopBeingLeader()."); + throw new RuntimeException("I am broken."); + } + } + ); + + CuratorDruidLeaderSelector leaderSelector3 = new CuratorDruidLeaderSelector( + curator, + new DruidNode("s3", "h3", 8080, null, new ServerConfig()), + latchPath + ); + leaderSelector3.registerListener( + new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + logger.info("listener3.becomeLeader()."); + currLeader.set("h3:8080"); + } + + @Override + public void stopBeingLeader() + { + logger.info("listener3.stopBeingLeader()."); + } + } + ); + + leaderSelector1.start(); + while (!"h1:8080".equals(currLeader.get())) { + logger.info("current leader = [%s]", currLeader.get()); + Thread.sleep(100); + } + + Assert.assertTrue(leaderSelector1.localTerm() >= 1); + + leaderSelector2.start(); + while (!"h2:8080".equals(currLeader.get())) { + logger.info("current leader = [%s]", currLeader.get()); + Thread.sleep(100); + } + + Assert.assertTrue(leaderSelector2.isLeader()); + Assert.assertEquals("h2:8080", leaderSelector1.getCurrentLeader()); + Assert.assertEquals(1, leaderSelector2.localTerm()); + + leaderSelector3.start(); + leaderSelector2.stop(); + while (!"h3:8080".equals(currLeader.get())) { + logger.info("current leader = [%s]", currLeader.get()); + Thread.sleep(100); + } + + Assert.assertTrue(leaderSelector3.isLeader()); + Assert.assertEquals("h3:8080", leaderSelector1.getCurrentLeader()); + Assert.assertEquals(1, leaderSelector3.localTerm()); + } + + @After + public void tearDown() + { + tearDownServerAndCurator(); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index d28757dc4760..43e26649c5bf 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -35,6 +35,7 @@ import io.druid.concurrent.Execs; import io.druid.curator.CuratorTestBase; import io.druid.curator.discovery.NoopServiceAnnouncer; +import io.druid.discovery.DruidLeaderSelector; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -193,7 +194,8 @@ public void unannounce(DruidNode node) loadManagementPeons, null, new CostBalancerStrategyFactory(), - EasyMock.createNiceMock(LookupCoordinatorManager.class) + EasyMock.createNiceMock(LookupCoordinatorManager.class), + new TestDruidLeaderSelector() ); } @@ -420,4 +422,48 @@ private DataSegment getSegment(String dataSource, Interval interval) ); return segment; } + + private static class TestDruidLeaderSelector implements DruidLeaderSelector + { + private volatile Listener listener; + private volatile String leader; + + @Override + public String getCurrentLeader() + { + return leader; + } + + @Override + public boolean isLeader() + { + return leader != null; + } + + @Override + public int localTerm() + { + return 0; + } + + @Override + public void registerListener(Listener listener) + { + this.listener = listener; + } + + @Override + public void start() + { + leader = "what:1234"; + listener.becomeLeader(); + } + + @Override + public void stop() + { + leader = null; + listener.stopBeingLeader(); + } + } } From 26600e4f430e39ac9a39948494745ebbd7a2e40b Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 21 Aug 2017 14:39:56 -0500 Subject: [PATCH 2/9] add fake DruidNode binding in integration-tests module --- .../main/java/io/druid/testing/guice/DruidTestModule.java | 8 ++++++++ .../curator/discovery/CuratorDruidLeaderSelector.java | 3 ++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java index 05836d66b714..14b48da5f7d5 100644 --- a/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java +++ b/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java @@ -35,6 +35,9 @@ import io.druid.guice.JsonConfigProvider; import io.druid.guice.ManageLifecycle; import io.druid.guice.annotations.Client; +import io.druid.guice.annotations.Self; +import io.druid.server.DruidNode; +import io.druid.server.initialization.ServerConfig; import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfigProvider; import io.druid.testing.IntegrationTestingCuratorConfig; @@ -52,6 +55,11 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.test.config", IntegrationTestingConfigProvider.class); binder.bind(CuratorConfig.class).to(IntegrationTestingCuratorConfig.class); + + // Bind DruidNode instance to make Guice happy. This instance is currently unused. + binder.bind(DruidNode.class).annotatedWith(Self.class).toInstance( + new DruidNode("integration-tests", "localhost", 9191, null, null, new ServerConfig()) + ); } @Provides diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java index f6f70ab3e0a8..e7736635672a 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -26,6 +26,7 @@ import io.druid.discovery.DruidLeaderSelector; import io.druid.guice.annotations.Self; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.CloseQuietly; import io.druid.server.DruidNode; import org.apache.curator.framework.CuratorFramework; @@ -122,7 +123,7 @@ public void notLeader() } } }, - Execs.singleThreaded(String.format("LeaderSelector[%s]", latchPath)) + Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath)) ); return leaderLatch.getAndSet(newLeaderLatch); From 8b3d46b3231852f1f8f257f2eae8ef998d8eaae4 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 24 Aug 2017 09:39:35 -0500 Subject: [PATCH 3/9] add docs on DruidLeaderSelector interface --- .../discovery/CuratorDruidLeaderSelector.java | 4 ++ .../druid/discovery/DruidLeaderSelector.java | 49 +++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java index e7736635672a..0247161d3935 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -19,6 +19,7 @@ package io.druid.curator.discovery; +import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.metamx.emitter.EmittingLogger; import io.druid.concurrent.Execs; @@ -34,6 +35,7 @@ import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.Participant; +import javax.annotation.Nullable; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -129,6 +131,7 @@ public void notLeader() return leaderLatch.getAndSet(newLeaderLatch); } + @Nullable @Override public String getCurrentLeader() { @@ -166,6 +169,7 @@ public int localTerm() @Override public void registerListener(DruidLeaderSelector.Listener listener) { + Preconditions.checkArgument(this.listener == null, "listener is already registered."); this.listener = listener; } diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java index 217659f6dde5..e87f805a847b 100644 --- a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java +++ b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java @@ -19,22 +19,71 @@ package io.druid.discovery; +import javax.annotation.Nullable; + /** + * Interface for supporting Overlord and Coordinator Leader Elections in TaskMaster and DruidCoordinator + * which expect appropriate implementation available in guice annotated with @IndexingService and @Coordinator + * respectively. + * + * Usage is as follow. + * On lifecycle start: + * druidLeaderSelector.registerListener(myListener); + * druidLeaderSelector.start(); + * + * On lifecycle stop: + * druidLeaderSelector.stop(); */ public interface DruidLeaderSelector { + + /** + * Get ID of current Leader. + */ + @Nullable String getCurrentLeader(); + + /** + * Returns true if this node is elected leader from underlying system's point of view. For example if curator + * is used to implement this then true would be returned when curator believes this node to be the leader. + */ boolean isLeader(); + + /** + * Implementation would increment it everytime it becomes leader. This allows users to start a long running + * task when they become leader and be able to intermittently check that they are still leader from same + * term when they started. DruidCoordinator class uses it to do intermittent checks and stop the activity + * as needed. + */ int localTerm(); + /** + * Register the listener for watching leadership notifications. + */ void registerListener(Listener listener); + /** + * Must be called right after registerLeader(Listener). + */ void start(); + + /** + * Must be called when the Druid process is stopping. + */ void stop(); interface Listener { + /** + * Notification that this node should start activities to be done by the leader. if this method throws + * exception then implementation would try to resign its leadership in the underlying system such as curator. + */ void becomeLeader(); + + /** + * Notification that shid node should stop acitivities which are done by the leader. If this method throws + * exception then an alert is created. + */ void stopBeingLeader(); } } From 758e3c9e3119c986256d334001f2ed4b4c76c685 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 28 Aug 2017 15:25:34 -0500 Subject: [PATCH 4/9] remove start/stop and keep register/unregister Listener in DruidLeaderSelector interface --- .../druid/indexing/overlord/TaskMaster.java | 150 +++++++++--------- .../indexing/overlord/http/OverlordTest.java | 6 +- .../discovery/CuratorDruidLeaderSelector.java | 13 +- .../druid/discovery/DruidLeaderSelector.java | 16 +- .../server/coordinator/DruidCoordinator.java | 3 +- .../CuratorDruidLeaderSelectorTest.java | 37 ++--- .../coordinator/DruidCoordinatorTest.java | 7 +- 7 files changed, 103 insertions(+), 129 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java index 2842a94dcab2..989b97b0a2ac 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskMaster.java @@ -50,6 +50,8 @@ public class TaskMaster { private final DruidLeaderSelector overlordLeaderSelector; + private final DruidLeaderSelector.Listener leadershipListener; + private final ReentrantLock giant = new ReentrantLock(true); private final TaskActionClientFactory taskActionClientFactory; private final SupervisorManager supervisorManager; @@ -85,84 +87,82 @@ public TaskMaster( final DruidNode node = coordinatorOverlordServiceConfig.getOverlordService() == null ? selfNode : selfNode.withService(coordinatorOverlordServiceConfig.getOverlordService()); - this.overlordLeaderSelector.registerListener( - new DruidLeaderSelector.Listener() - { - @Override - public void becomeLeader() - { - giant.lock(); - - // I AM THE MASTER OF THE UNIVERSE. - log.info("By the power of Grayskull, I have the power!"); - - try { - taskLockbox.syncFromStorage(); - taskRunner = runnerFactory.build(); - taskQueue = new TaskQueue( - taskQueueConfig, - taskStorage, - taskRunner, - taskActionClientFactory, - taskLockbox, - emitter - ); - - // Sensible order to start stuff: - final Lifecycle leaderLifecycle = new Lifecycle(); - if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { - log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") - .emit(); - } - - leaderLifecycle.addManagedInstance(taskRunner); - leaderLifecycle.addManagedInstance(taskQueue); - leaderLifecycle.addManagedInstance(supervisorManager); - leaderLifecycle.addManagedInstance(overlordHelperManager); - - leaderLifecycle.addHandler( - new Lifecycle.Handler() - { - @Override - public void start() throws Exception - { - serviceAnnouncer.announce(node); - } - - @Override - public void stop() - { - serviceAnnouncer.unannounce(node); - } - } - ); - - leaderLifecycle.start(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - giant.unlock(); - } + this.leadershipListener = new DruidLeaderSelector.Listener() + { + @Override + public void becomeLeader() + { + giant.lock(); + + // I AM THE MASTER OF THE UNIVERSE. + log.info("By the power of Grayskull, I have the power!"); + + try { + taskLockbox.syncFromStorage(); + taskRunner = runnerFactory.build(); + taskQueue = new TaskQueue( + taskQueueConfig, + taskStorage, + taskRunner, + taskActionClientFactory, + taskLockbox, + emitter + ); + + // Sensible order to start stuff: + final Lifecycle leaderLifecycle = new Lifecycle(); + if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { + log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") + .emit(); } - @Override - public void stopBeingLeader() - { - giant.lock(); - try { - final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); - if (leaderLifecycle != null) { - leaderLifecycle.stop(); + leaderLifecycle.addManagedInstance(taskRunner); + leaderLifecycle.addManagedInstance(taskQueue); + leaderLifecycle.addManagedInstance(supervisorManager); + leaderLifecycle.addManagedInstance(overlordHelperManager); + + leaderLifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + serviceAnnouncer.announce(node); + } + + @Override + public void stop() + { + serviceAnnouncer.unannounce(node); + } } - } - finally { - giant.unlock(); - } + ); + + leaderLifecycle.start(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + finally { + giant.unlock(); + } + } + + @Override + public void stopBeingLeader() + { + giant.lock(); + try { + final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); + if (leaderLifecycle != null) { + leaderLifecycle.stop(); } } - ); + finally { + giant.unlock(); + } + } + }; } /** @@ -174,7 +174,7 @@ public void start() giant.lock(); try { - overlordLeaderSelector.start(); + overlordLeaderSelector.registerListener(leadershipListener); } finally { giant.unlock(); @@ -191,7 +191,7 @@ public void stop() giant.lock(); try { - overlordLeaderSelector.stop(); + overlordLeaderSelector.unregisterListener(); } finally { giant.unlock(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index b134183ba7ca..69d39765a1ee 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -454,17 +454,13 @@ public int localTerm() public void registerListener(Listener listener) { this.listener = listener; - } - @Override - public void start() - { leader = "what:1234"; listener.becomeLeader(); } @Override - public void stop() + public void unregisterListener() { leader = null; listener.stopBeingLeader(); diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java index 0247161d3935..ce68665dda5e 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -169,20 +169,13 @@ public int localTerm() @Override public void registerListener(DruidLeaderSelector.Listener listener) { - Preconditions.checkArgument(this.listener == null, "listener is already registered."); - this.listener = listener; - } + Preconditions.checkArgument(listener != null, "listener is null."); - @Override - public void start() - { if (!lifecycleLock.canStart()) { throw new ISE("can't start."); } try { - if (listener == null) { - throw new ISE("listener is not registered yet"); - } + this.listener = listener; createNewLeaderLatch(); leaderLatch.get().start(); @@ -198,7 +191,7 @@ public void start() } @Override - public void stop() + public void unregisterListener() { if (!lifecycleLock.canStop()) { throw new ISE("can't stop."); diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java index e87f805a847b..6cdb1eb0b12a 100644 --- a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java +++ b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java @@ -29,16 +29,15 @@ * Usage is as follow. * On lifecycle start: * druidLeaderSelector.registerListener(myListener); - * druidLeaderSelector.start(); * * On lifecycle stop: - * druidLeaderSelector.stop(); + * druidLeaderSelector.unregisterListener(); */ public interface DruidLeaderSelector { /** - * Get ID of current Leader. + * Get ID of current Leader. Returns NULL if it can't find the leader. */ @Nullable String getCurrentLeader(); @@ -58,19 +57,14 @@ public interface DruidLeaderSelector int localTerm(); /** - * Register the listener for watching leadership notifications. + * Register the listener for watching leadership notifications. It should only be called once. */ void registerListener(Listener listener); /** - * Must be called right after registerLeader(Listener). + * Unregisters the listener. */ - void start(); - - /** - * Must be called when the Druid process is stopping. - */ - void stop(); + void unregisterListener(); interface Listener { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 8cfc0ac68df8..05f5b167fb18 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -494,7 +494,6 @@ public void stopBeingLeader() } } ); - coordLeaderSelector.start(); } } @@ -506,7 +505,7 @@ public void stop() return; } - coordLeaderSelector.stop(); + coordLeaderSelector.unregisterListener(); started = false; diff --git a/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java b/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java index 234a96f17df6..d425d0f2c9cb 100644 --- a/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java +++ b/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java @@ -82,6 +82,13 @@ public void stopBeingLeader() } ); + while (!"h1:8080".equals(currLeader.get())) { + logger.info("current leader = [%s]", currLeader.get()); + Thread.sleep(100); + } + + Assert.assertTrue(leaderSelector1.localTerm() >= 1); + CuratorDruidLeaderSelector leaderSelector2 = new CuratorDruidLeaderSelector( curator, new DruidNode("s2", "h2", 8080, null, new ServerConfig()), @@ -106,6 +113,15 @@ public void stopBeingLeader() } ); + while (!"h2:8080".equals(currLeader.get())) { + logger.info("current leader = [%s]", currLeader.get()); + Thread.sleep(100); + } + + Assert.assertTrue(leaderSelector2.isLeader()); + Assert.assertEquals("h2:8080", leaderSelector1.getCurrentLeader()); + Assert.assertEquals(1, leaderSelector2.localTerm()); + CuratorDruidLeaderSelector leaderSelector3 = new CuratorDruidLeaderSelector( curator, new DruidNode("s3", "h3", 8080, null, new ServerConfig()), @@ -129,26 +145,7 @@ public void stopBeingLeader() } ); - leaderSelector1.start(); - while (!"h1:8080".equals(currLeader.get())) { - logger.info("current leader = [%s]", currLeader.get()); - Thread.sleep(100); - } - - Assert.assertTrue(leaderSelector1.localTerm() >= 1); - - leaderSelector2.start(); - while (!"h2:8080".equals(currLeader.get())) { - logger.info("current leader = [%s]", currLeader.get()); - Thread.sleep(100); - } - - Assert.assertTrue(leaderSelector2.isLeader()); - Assert.assertEquals("h2:8080", leaderSelector1.getCurrentLeader()); - Assert.assertEquals(1, leaderSelector2.localTerm()); - - leaderSelector3.start(); - leaderSelector2.stop(); + leaderSelector2.unregisterListener(); while (!"h3:8080".equals(currLeader.get())) { logger.info("current leader = [%s]", currLeader.get()); Thread.sleep(100); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 43e26649c5bf..0ca43b3be9f9 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -450,17 +450,12 @@ public int localTerm() public void registerListener(Listener listener) { this.listener = listener; - } - - @Override - public void start() - { leader = "what:1234"; listener.becomeLeader(); } @Override - public void stop() + public void unregisterListener() { leader = null; listener.stopBeingLeader(); From b2c665218049fb4627981364cb26718a8a3eafd6 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 30 Aug 2017 14:01:18 -0500 Subject: [PATCH 5/9] updated comments on DruidLeaderSelector --- .../src/main/java/io/druid/discovery/DruidLeaderSelector.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java index 6cdb1eb0b12a..02d63dd32a8f 100644 --- a/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java +++ b/server/src/main/java/io/druid/discovery/DruidLeaderSelector.java @@ -38,6 +38,8 @@ public interface DruidLeaderSelector /** * Get ID of current Leader. Returns NULL if it can't find the leader. + * Note that it is possible for leadership to change right after this call returns, so caller would get wrong + * leader. */ @Nullable String getCurrentLeader(); @@ -45,6 +47,8 @@ public interface DruidLeaderSelector /** * Returns true if this node is elected leader from underlying system's point of view. For example if curator * is used to implement this then true would be returned when curator believes this node to be the leader. + * Note that it is possible for leadership to change right after this call returns, so caller would get wrong + * status. */ boolean isLeader(); From 065faa696c83c52000c34dd8254a20044ba4b0a0 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 30 Aug 2017 14:32:58 -0500 Subject: [PATCH 6/9] cache the listener executor in CuratorDruidLeaderSelector --- .../curator/discovery/CuratorDruidLeaderSelector.java | 7 ++++++- .../discovery/CuratorDruidLeaderSelectorTest.java | 10 +++++++++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java index ce68665dda5e..f977f4b453ff 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -36,6 +36,7 @@ import org.apache.curator.framework.recipes.leader.Participant; import javax.annotation.Nullable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -51,6 +52,8 @@ public class CuratorDruidLeaderSelector implements DruidLeaderSelector private final CuratorFramework curator; private final String latchPath; + private ExecutorService listenerExecutor; + private DruidLeaderSelector.Listener listener = null; private final AtomicReference leaderLatch = new AtomicReference<>(); @@ -125,7 +128,7 @@ public void notLeader() } } }, - Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath)) + listenerExecutor ); return leaderLatch.getAndSet(newLeaderLatch); @@ -176,6 +179,7 @@ public void registerListener(DruidLeaderSelector.Listener listener) } try { this.listener = listener; + this.listenerExecutor = Execs.singleThreaded(StringUtils.format("LeaderSelector[%s]", latchPath)); createNewLeaderLatch(); leaderLatch.get().start(); @@ -197,5 +201,6 @@ public void unregisterListener() throw new ISE("can't stop."); } CloseQuietly.close(leaderLatch.get()); + listenerExecutor.shutdownNow(); } } diff --git a/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java b/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java index d425d0f2c9cb..c8b79d0d5ef7 100644 --- a/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java +++ b/server/src/test/java/io/druid/curator/discovery/CuratorDruidLeaderSelectorTest.java @@ -32,6 +32,7 @@ import org.junit.Before; import org.junit.Test; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** @@ -97,10 +98,17 @@ public void stopBeingLeader() leaderSelector2.registerListener( new DruidLeaderSelector.Listener() { + private AtomicInteger attemptCount = new AtomicInteger(0); + @Override public void becomeLeader() { logger.info("listener2.becomeLeader()."); + + if (attemptCount.getAndIncrement() < 1) { + throw new RuntimeException("will become leader on next attempt."); + } + currLeader.set("h2:8080"); } @@ -120,7 +128,7 @@ public void stopBeingLeader() Assert.assertTrue(leaderSelector2.isLeader()); Assert.assertEquals("h2:8080", leaderSelector1.getCurrentLeader()); - Assert.assertEquals(1, leaderSelector2.localTerm()); + Assert.assertEquals(2, leaderSelector2.localTerm()); CuratorDruidLeaderSelector leaderSelector3 = new CuratorDruidLeaderSelector( curator, From 7538f28d1ca0822ebc044ece6868068202ecf45a Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 30 Aug 2017 14:42:50 -0500 Subject: [PATCH 7/9] use same latch owner name that was used before --- .../main/java/io/druid/curator/discovery/DiscoveryModule.java | 4 ++-- .../java/io/druid/server/coordinator/DruidCoordinator.java | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java index 65a6c0bec836..086e166d403f 100644 --- a/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java +++ b/server/src/main/java/io/druid/curator/discovery/DiscoveryModule.java @@ -191,14 +191,14 @@ public void configure(Binder binder) PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, Coordinator.class)) .addBinding(CURATOR_KEY) .toProvider(new DruidLeaderSelectorProvider( - (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getCoordinatorPath(), "druid:coordinator")) + (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getCoordinatorPath(), "_COORDINATOR")) ) .in(LazySingleton.class); PolyBind.optionBinder(binder, Key.get(DruidLeaderSelector.class, IndexingService.class)) .addBinding(CURATOR_KEY) .toProvider(new DruidLeaderSelectorProvider( - (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "druid:overlord")) + (zkPathsConfig) -> ZKPaths.makePath(zkPathsConfig.getOverlordPath(), "_OVERLORD")) ) .in(LazySingleton.class); } diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 05f5b167fb18..a30d623a0a1a 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -93,8 +93,6 @@ @ManageLifecycle public class DruidCoordinator { - public static final String COORDINATOR_OWNER_NODE = "_COORDINATOR"; - public static Comparator SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart()) .onResultOf( new Function() From 3d8b8334f8ec15cca58e05ee1020892e90aa82ba Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 1 Sep 2017 10:13:12 -0500 Subject: [PATCH 8/9] remove stuff related to druid.zk.paths.indexer.leaderLatchPath config --- docs/content/configuration/index.md | 1 - .../server/initialization/IndexerZkConfig.java | 16 +--------------- .../overlord/RemoteTaskRunnerTestUtils.java | 2 +- .../indexing/overlord/http/OverlordTest.java | 3 +-- .../indexing/worker/WorkerTaskMonitorTest.java | 2 +- .../indexing/worker/http/WorkerResourceTest.java | 2 +- .../initialization/IndexerZkConfigTest.java | 10 ++++------ 7 files changed, 9 insertions(+), 27 deletions(-) diff --git a/docs/content/configuration/index.md b/docs/content/configuration/index.md index f17f6858a014..8ae9cd7a1fce 100644 --- a/docs/content/configuration/index.md +++ b/docs/content/configuration/index.md @@ -71,7 +71,6 @@ The indexing service also uses its own set of paths. These configs can be includ |`druid.zk.paths.indexer.announcementsPath`|Middle managers announce themselves here.|`${druid.zk.paths.indexer.base}/announcements`| |`druid.zk.paths.indexer.tasksPath`|Used to assign tasks to middle managers.|`${druid.zk.paths.indexer.base}/tasks`| |`druid.zk.paths.indexer.statusPath`|Parent path for announcement of task statuses.|`${druid.zk.paths.indexer.base}/status`| -|`druid.zk.paths.indexer.leaderLatchPath`|Used for Overlord leader election.|`${druid.zk.paths.indexer.base}/leaderLatchPath`| If `druid.zk.paths.base` and `druid.zk.paths.indexer.base` are both set, and none of the other `druid.zk.paths.*` or `druid.zk.paths.indexer.*` values are set, then the other properties will be evaluated relative to their respective `base`. For example, if `druid.zk.paths.base` is set to `/druid1` and `druid.zk.paths.indexer.base` is set to `/druid2` then `druid.zk.paths.announcementsPath` will default to `/druid1/announcements` while `druid.zk.paths.indexer.announcementsPath` will default to `/druid2/announcements`. diff --git a/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java b/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java index 50c45f443f61..eee927116d04 100644 --- a/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java +++ b/indexing-service/src/main/java/io/druid/server/initialization/IndexerZkConfig.java @@ -35,8 +35,7 @@ public IndexerZkConfig( @JsonProperty("base") String base, @JsonProperty("announcementsPath") String announcementsPath, @JsonProperty("tasksPath") String tasksPath, - @JsonProperty("statusPath") String statusPath, - @JsonProperty("leaderLatchPath") String leaderLatchPath + @JsonProperty("statusPath") String statusPath ) { this.zkPathsConfig = zkPathsConfig; @@ -44,7 +43,6 @@ public IndexerZkConfig( this.announcementsPath = announcementsPath; this.tasksPath = tasksPath; this.statusPath = statusPath; - this.leaderLatchPath = leaderLatchPath; } @JacksonInject @@ -62,9 +60,6 @@ public IndexerZkConfig( @JsonProperty private final String statusPath; - @JsonProperty - private final String leaderLatchPath; - private String defaultIndexerPath(final String subPath) { return ZKPaths.makePath(getBase(), subPath); @@ -90,11 +85,6 @@ public String getStatusPath() return statusPath == null ? defaultIndexerPath("status") : statusPath; } - public String getLeaderLatchPath() - { - return leaderLatchPath == null ? defaultIndexerPath("leaderLatchPath") : leaderLatchPath; - } - public ZkPathsConfig getZkPathsConfig() { return zkPathsConfig; @@ -120,9 +110,6 @@ public boolean equals(Object o) if (base != null ? !base.equals(that.base) : that.base != null) { return false; } - if (leaderLatchPath != null ? !leaderLatchPath.equals(that.leaderLatchPath) : that.leaderLatchPath != null) { - return false; - } if (statusPath != null ? !statusPath.equals(that.statusPath) : that.statusPath != null) { return false; } @@ -144,7 +131,6 @@ public int hashCode() result = 31 * result + (announcementsPath != null ? announcementsPath.hashCode() : 0); result = 31 * result + (tasksPath != null ? tasksPath.hashCode() : 0); result = 31 * result + (statusPath != null ? statusPath.hashCode() : 0); - result = 31 * result + (leaderLatchPath != null ? leaderLatchPath.hashCode() : 0); return result; } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index aeff81a1a17a..3b6f68c4fa5c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -125,7 +125,7 @@ public String getBase() { return basePath; } - }, null, null, null, null, null + }, null, null, null, null ), cf, new PathChildrenCacheFactory.Builder(), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 69d39765a1ee..94b2fb1f2830 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -155,11 +155,10 @@ public void setUp() throws Exception taskCompletionCountDownLatches[0] = new CountDownLatch(1); taskCompletionCountDownLatches[1] = new CountDownLatch(1); announcementLatch = new CountDownLatch(1); - IndexerZkConfig indexerZkConfig = new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null, null); + IndexerZkConfig indexerZkConfig = new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null); setupServerAndCurator(); curator.start(); curator.blockUntilConnected(); - curator.create().creatingParentsIfNeeded().forPath(indexerZkConfig.getLeaderLatchPath()); druidNode = new DruidNode("hey", "what", 1234, null, new ServerConfig()); ServiceEmitter serviceEmitter = new NoopServiceEmitter(); taskMaster = new TaskMaster( diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 82cacf88dcde..2e8cee71a460 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -126,7 +126,7 @@ public String getBase() { return basePath; } - }, null, null, null, null, null + }, null, null, null, null ), new TestRemoteTaskRunnerConfig(new Period("PT1S")), cf, diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java index 44e83fc8d529..8e8b672636d9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/http/WorkerResourceTest.java @@ -87,7 +87,7 @@ public String getBase() { return basePath; } - }, null, null, null, null, null), + }, null, null, null, null), new RemoteTaskRunnerConfig(), cf, worker diff --git a/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java b/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java index df45c1b99e09..ac682383abd2 100644 --- a/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java +++ b/indexing-service/src/test/java/io/druid/server/initialization/IndexerZkConfigTest.java @@ -156,7 +156,7 @@ public void testNullConfig() ); indexerZkConfig.inject(propertyValues, configurator); - Assert.assertEquals("/druid/indexer/leaderLatchPath", indexerZkConfig.get().get().getLeaderLatchPath()); + Assert.assertEquals("/druid/indexer/tasks", indexerZkConfig.get().get().getTasksPath()); } @Test @@ -245,7 +245,7 @@ public void testExactConfig() ZkPathsConfig zkPathsConfig1 = zkPathsConfig.get().get(); - IndexerZkConfig indexerZkConfig = new IndexerZkConfig(zkPathsConfig1, null, null, null, null, null); + IndexerZkConfig indexerZkConfig = new IndexerZkConfig(zkPathsConfig1, null, null, null, null); Assert.assertEquals("/druid/metrics/indexer", indexerZkConfig.getBase()); Assert.assertEquals("/druid/metrics/indexer/announcements", indexerZkConfig.getAnnouncementsPath()); @@ -262,8 +262,7 @@ public void testFullOverride() throws Exception "/druid/prod", "/druid/prod/a", "/druid/prod/t", - "/druid/prod/s", - "/druid/prod/l" + "/druid/prod/s" ); Map value = mapper.readValue( @@ -276,8 +275,7 @@ public void testFullOverride() throws Exception value.get("base"), value.get("announcementsPath"), value.get("tasksPath"), - value.get("statusPath"), - value.get("leaderLatchPath") + value.get("statusPath") ); Assert.assertEquals(indexerZkConfig, newConfig); From e0c1e95256be373b530067bad996a825ee944288 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 1 Sep 2017 10:18:28 -0500 Subject: [PATCH 9/9] randomize the delay when giving up leadership and restarting leader latch --- .../druid/curator/discovery/CuratorDruidLeaderSelector.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java index f977f4b453ff..415360c6f170 100644 --- a/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java +++ b/server/src/main/java/io/druid/curator/discovery/CuratorDruidLeaderSelector.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -98,8 +99,7 @@ public void isLeader() leader = false; try { //Small delay before starting the latch so that others waiting are chosen to become leader. - Thread.sleep(1000); - + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); leaderLatch.get().start(); } catch (Exception e) {