Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void configure(Binder binder)
JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class);
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(LazySingleton.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleAnnouncements.class);

if (isZkEnabled) {
binder.bind(DataSegmentServerAnnouncer.class).to(CuratorDataSegmentServerAnnouncer.class).in(LazySingleton.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
Expand Down Expand Up @@ -129,6 +130,13 @@ public BatchDataSegmentAnnouncer(
this(server, config, zkPaths, () -> announcer, jsonMapper, ZkEnablementConfig.ENABLED);
}

@LifecycleStop
public void stop()
{
changes.stop();
}


@Override
public void announceSegment(DataSegment segment) throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
Expand All @@ -43,6 +44,7 @@
*
* Clients call {@link #getRequestsSince} to get updates since given counter.
*/

public class ChangeRequestHistory<T>
{
private static int MAX_SIZE = 1000;
Expand Down Expand Up @@ -74,11 +76,24 @@ public ChangeRequestHistory(int maxSize)
this.singleThreadedExecutor = Execs.singleThreaded("SegmentChangeRequestHistory");
}

public void stop()
{
singleThreadedExecutor.shutdownNow();
final LinkedHashSet<CustomSettableFuture<?>> futures = new LinkedHashSet<>(waitingFutures.keySet());
waitingFutures.clear();
for (CustomSettableFuture<?> theFuture : futures) {
theFuture.setException(new IllegalStateException("Server is shutting down."));
}
}

/**
* Add batch of segment changes update.
*/
public synchronized void addChangeRequests(List<T> requests)
{
if (singleThreadedExecutor.isShutdown()) {
return;
}
for (T request : requests) {
changes.add(new Holder<>(request, getLastCounter().inc()));
}
Expand Down Expand Up @@ -108,6 +123,10 @@ public synchronized void addChangeRequest(T request)
public synchronized ListenableFuture<ChangeRequestsSnapshot<T>> getRequestsSince(final Counter counter)
{
final CustomSettableFuture<T> future = new CustomSettableFuture<>(waitingFutures);
if (singleThreadedExecutor.isShutdown()) {
future.setException(new IllegalStateException("Server is shutting down."));
return future;
}

if (counter.counter < 0) {
future.setException(new IAE("counter[%s] must be >= 0", counter));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -171,4 +172,44 @@ public void testNonImmediateFuture() throws Exception
Assert.assertEquals(1, snapshot.getCounter().getCounter());
Assert.assertEquals(1, snapshot.getRequests().size());
}

@Test
public void testStop()
{
final ChangeRequestHistory<DataSegmentChangeRequest> history = new ChangeRequestHistory();

ListenableFuture<ChangeRequestsSnapshot<DataSegmentChangeRequest>> future = history.getRequestsSince(
ChangeRequestHistory.Counter.ZERO
);
Assert.assertEquals(1, history.waitingFutures.size());

final AtomicBoolean callbackExcecuted = new AtomicBoolean(false);
Futures.addCallback(
future,
new FutureCallback<ChangeRequestsSnapshot<DataSegmentChangeRequest>>()
{
@Override
public void onSuccess(ChangeRequestsSnapshot result)
{
callbackExcecuted.set(true);
}

@Override
public void onFailure(Throwable t)
{
callbackExcecuted.set(true);
}
}
);

history.stop();
// any new change requests should be ignored, there should be no waiting futures, and open futures should be resolved
history.addChangeRequest(new SegmentChangeRequestNoop());
Assert.assertEquals(0, history.waitingFutures.size());
Assert.assertTrue(callbackExcecuted.get());
Assert.assertTrue(future.isDone());

Throwable thrown = Assert.assertThrows(ExecutionException.class, future::get);
Assert.assertEquals("java.lang.IllegalStateException: Server is shutting down.", thrown.getMessage());
}
}