Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public class LifecycleModule implements Module
// the 'stop' method, either failing silently or failing violently and throwing an exception causing an ungraceful exit
private final LifecycleScope initScope = new LifecycleScope(Lifecycle.Stage.INIT);
private final LifecycleScope scope = new LifecycleScope(Lifecycle.Stage.NORMAL);
private final LifecycleScope lastScope = new LifecycleScope(Lifecycle.Stage.LAST);
private final LifecycleScope serverScope = new LifecycleScope(Lifecycle.Stage.SERVER);
private final LifecycleScope annoucementsScope = new LifecycleScope(Lifecycle.Stage.ANNOUNCEMENTS);

/**
* Registers a class to instantiate eagerly. Classes mentioned here will be pulled out of
Expand Down Expand Up @@ -118,7 +119,8 @@ public void configure(Binder binder)

binder.bindScope(ManageLifecycleInit.class, initScope);
binder.bindScope(ManageLifecycle.class, scope);
binder.bindScope(ManageLifecycleLast.class, lastScope);
binder.bindScope(ManageLifecycleServer.class, serverScope);
binder.bindScope(ManageLifecycleAnnouncements.class, annoucementsScope);
}

@Provides @LazySingleton
Expand All @@ -140,7 +142,8 @@ public void start() throws Exception
};
initScope.setLifecycle(lifecycle);
scope.setLifecycle(lifecycle);
lastScope.setLifecycle(lifecycle);
serverScope.setLifecycle(lifecycle);
annoucementsScope.setLifecycle(lifecycle);

return lifecycle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
import java.lang.annotation.Target;

/**
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.LAST
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.ANNOUNCEMENTS
*
* This Scope gets defined by {@link LifecycleModule}
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@ScopeAnnotation
@PublicApi
public @interface ManageLifecycleLast
public @interface ManageLifecycleAnnouncements
{
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.google.inject.ScopeAnnotation;
import org.apache.druid.guice.annotations.PublicApi;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Marks the object to be managed by {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} and set to be on Stage.SERVER
*
* This Scope gets defined by {@link LifecycleModule}
*/
@Target({ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@ScopeAnnotation
@PublicApi
public @interface ManageLifecycleServer
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,30 @@

/**
* A manager of object Lifecycles.
* <p/>
*
* This object has methods for registering objects that should be started and stopped. The Lifecycle allows for
* three stages: Stage.INIT, Stage.NORMAL, and Stage.LAST.
* <p/>
* four stages: Stage.INIT, Stage.NORMAL, Stage.SERVER, and Stage.ANNOUNCEMENTS.
*
* Things added at Stage.INIT will be started first (in the order that they are added to the Lifecycle instance) and
* then things added at Stage.NORMAL, and finally, Stage.LAST will be started.
* <p/>
* The close operation goes in reverse order, starting with the last thing added at Stage.LAST and working backwards.
* <p/>
* then things added at Stage.NORMAL, then Stage.SERVER, and finally, Stage.ANNOUNCEMENTS will be started.
*
* The close operation goes in reverse order, starting with the last thing added at Stage.ANNOUNCEMENTS and working
* backwards.
*
* Conceptually, the stages have the following purposes:
* - Stage.INIT: Currently, this stage is used exclusively for log4j initialization, since almost everything needs
* logging and it should be the last thing to shutdown. Any sort of bootstrapping object that provides something that
* should be initialized before nearly all other Lifecycle objects could also belong here (if it doesn't need
* logging during start or stop).
* - Stage.NORMAL: This is the default stage. Most objects will probably make the most sense to be registered at
* this level, with the exception of any form of server or service announcements
* - Stage.SERVER: This lifecycle stage is intended for all 'server' objects, and currently only contains the Jetty
* module, but any sort of 'server' that expects most Lifecycle objects to be initialized by the time it starts, and
* still available at the time it stops can logically live in this stage.
* - Stage.ANNOUNCENTS: Any object which announces to a cluster this servers location belongs in this stage. By being
* last, we can be sure that all servers are initialized before we advertise the endpoint locations, and also can be
* sure that we un-announce these advertisements prior to the Stage.SERVER objects stop.
*
* There are two sets of methods to add things to the Lifecycle. One set that will just add instances and enforce that
* start() has not been called yet. The other set will add instances and, if the lifecycle is already started, start
* them.
Expand All @@ -61,7 +76,8 @@ public enum Stage
{
INIT,
NORMAL,
LAST
SERVER,
ANNOUNCEMENTS
}

private enum State
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,25 +172,26 @@ public void testSanity() throws Exception
lifecycle.addManagedInstance(new ObjectToBeLifecycled(0, startOrder, stopOrder));
lifecycle.addManagedInstance(new ObjectToBeLifecycled(1, startOrder, stopOrder), Lifecycle.Stage.NORMAL);
lifecycle.addManagedInstance(new ObjectToBeLifecycled(2, startOrder, stopOrder), Lifecycle.Stage.NORMAL);
lifecycle.addManagedInstance(new ObjectToBeLifecycled(3, startOrder, stopOrder), Lifecycle.Stage.LAST);
lifecycle.addManagedInstance(new ObjectToBeLifecycled(3, startOrder, stopOrder), Lifecycle.Stage.ANNOUNCEMENTS);
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(4, startOrder, stopOrder));
lifecycle.addManagedInstance(new ObjectToBeLifecycled(5, startOrder, stopOrder));
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(6, startOrder, stopOrder), Lifecycle.Stage.LAST);
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(6, startOrder, stopOrder), Lifecycle.Stage.ANNOUNCEMENTS);
lifecycle.addManagedInstance(new ObjectToBeLifecycled(7, startOrder, stopOrder));
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(8, startOrder, stopOrder), Lifecycle.Stage.INIT);
lifecycle.addStartCloseInstance(new ObjectToBeLifecycled(9, startOrder, stopOrder), Lifecycle.Stage.SERVER);

final List<Integer> expectedOrder = Arrays.asList(8, 0, 1, 2, 4, 5, 7, 3, 6);
final List<Integer> expectedOrder = Arrays.asList(8, 0, 1, 2, 4, 5, 7, 9, 3, 6);

lifecycle.start();

Assert.assertEquals(9, startOrder.size());
Assert.assertEquals(10, startOrder.size());
Assert.assertEquals(0, stopOrder.size());
Assert.assertEquals(expectedOrder, startOrder);

lifecycle.stop();

Assert.assertEquals(9, startOrder.size());
Assert.assertEquals(9, stopOrder.size());
Assert.assertEquals(10, startOrder.size());
Assert.assertEquals(10, stopOrder.size());
Assert.assertEquals(Lists.reverse(expectedOrder), stopOrder);
}

Expand All @@ -210,20 +211,28 @@ public void testAddToLifecycleInStartMethod() throws Exception
public void start() throws Exception
{
lifecycle.addMaybeStartManagedInstance(
new ObjectToBeLifecycled(1, startOrder, stopOrder), Lifecycle.Stage.NORMAL
new ObjectToBeLifecycled(1, startOrder, stopOrder),
Lifecycle.Stage.NORMAL
);
lifecycle.addMaybeStartManagedInstance(
new ObjectToBeLifecycled(2, startOrder, stopOrder), Lifecycle.Stage.INIT
new ObjectToBeLifecycled(2, startOrder, stopOrder),
Lifecycle.Stage.INIT
);
lifecycle.addMaybeStartManagedInstance(
new ObjectToBeLifecycled(3, startOrder, stopOrder), Lifecycle.Stage.LAST
new ObjectToBeLifecycled(3, startOrder, stopOrder),
Lifecycle.Stage.ANNOUNCEMENTS
);
lifecycle.addMaybeStartStartCloseInstance(new ObjectToBeLifecycled(4, startOrder, stopOrder));
lifecycle.addMaybeStartManagedInstance(new ObjectToBeLifecycled(5, startOrder, stopOrder));
lifecycle.addMaybeStartStartCloseInstance(
new ObjectToBeLifecycled(6, startOrder, stopOrder), Lifecycle.Stage.LAST
new ObjectToBeLifecycled(6, startOrder, stopOrder),
Lifecycle.Stage.ANNOUNCEMENTS
);
lifecycle.addMaybeStartManagedInstance(new ObjectToBeLifecycled(7, startOrder, stopOrder));
lifecycle.addMaybeStartManagedInstance(
new ObjectToBeLifecycled(8, startOrder, stopOrder),
Lifecycle.Stage.SERVER
);
}

@Override
Expand All @@ -234,8 +243,8 @@ public void stop()
}
);

final List<Integer> expectedOrder = Arrays.asList(0, 1, 2, 4, 5, 7, 3, 6);
final List<Integer> expectedStopOrder = Arrays.asList(6, 3, 7, 5, 4, 1, 0, 2);
final List<Integer> expectedOrder = Arrays.asList(0, 1, 2, 4, 5, 7, 8, 3, 6);
final List<Integer> expectedStopOrder = Arrays.asList(6, 3, 8, 7, 5, 4, 1, 0, 2);

lifecycle.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycleLast;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.materializedview.DerivativeDataSourceMetadata;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.java.util.common.DateTimes;
Expand Down Expand Up @@ -65,7 +65,7 @@
* Read and store derivatives information from dataSource table frequently.
* When optimize query, DerivativesManager offers the information about derivatives.
*/
@ManageLifecycleLast
@ManageLifecycle
public class DerivativeDataSourceManager
{
private static final EmittingLogger log = new EmittingLogger(DerivativeDataSourceManager.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,16 @@ private Stat updateAnnouncement(final String path, final byte[] value) throws Ex
*/
public void unannounce(String path)
{
log.info("unannouncing [%s]", path);
final ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode(path);
final String parentPath = pathAndNode.getPath();

final ConcurrentMap<String, byte[]> subPaths = announcements.get(parentPath);

if (subPaths == null || subPaths.remove(pathAndNode.getNode()) == null) {
log.error("Path[%s] not announced, cannot unannounce.", path);
log.debug("Path[%s] not announced, cannot unannounce.", path);
return;
}
log.info("unannouncing [%s]", path);

try {
curator.inTransaction().delete().forPath(path).and().commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class DiscoveryModule implements Module
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
* Announcement will happen in the ANNOUNCEMENTS stage of the Lifecycle
*
* @param binder the Binder to register with
*/
Expand All @@ -106,7 +106,7 @@ public static void registerDefault(Binder binder)
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
* Announcement will happen in the ANNOUNCEMENTS stage of the Lifecycle
*
* @param annotation The annotation instance to use in finding the DruidNode instance, usually a Named annotation
*/
Expand All @@ -120,7 +120,7 @@ public static void register(Binder binder, Annotation annotation)
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
* Announcement will happen in the ANNOUNCEMENTS stage of the Lifecycle
*
* @param binder the Binder to register with
* @param annotation The annotation class to use in finding the DruidNode instance
Expand All @@ -135,7 +135,7 @@ public static void register(Binder binder, Class<? extends Annotation> annotatio
*
* That is, this module will announce the DruidNode instance returned by
* injector.getInstance(Key.get(DruidNode.class, annotation)) automatically.
* Announcement will happen in the LAST stage of the Lifecycle
* Announcement will happen in the ANNOUNCEMENTS stage of the Lifecycle
*
* @param binder the Binder to register with
* @param key The key to use in finding the DruidNode instance
Expand Down Expand Up @@ -251,7 +251,7 @@ public void stop()
}
}
},
Lifecycle.Stage.LAST
Lifecycle.Stage.ANNOUNCEMENTS
);

return announcer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void configure(Binder binder)
}

@Provides
@ManageLifecycle
@ManageLifecycleAnnouncements
public Announcer getAnnouncer(CuratorFramework curator)
{
return new Announcer(curator, Execs.singleThreaded("Announcer-%s"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void unannounce()
}

final String path = makeAnnouncementPath();
log.info("Unannouncing self[%s] at [%s]", server, path);
log.debug("Unannouncing self[%s] at [%s]", server, path);
announcer.unannounce(path);

announced = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ public void stop()
}
}
},
Lifecycle.Stage.LAST
Lifecycle.Stage.SERVER
);

return server;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static void bindAnnouncer(

/**
* This is a helper class used by CliXXX classes to announce {@link DiscoveryDruidNode}
* as part of {@link Lifecycle.Stage#LAST}.
* as part of {@link Lifecycle.Stage#ANNOUNCEMENTS}.
*/
protected static class DiscoverySideEffectsProvider implements Provider<DiscoverySideEffectsProvider.Child>
{
Expand Down Expand Up @@ -200,7 +200,7 @@ public void stop()
announcer.unannounce(discoveryDruidNode);
}
},
Lifecycle.Stage.LAST
Lifecycle.Stage.ANNOUNCEMENTS
);

return new Child();
Expand Down