Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import io.druid.java.util.http.client.response.ClientResponse;
import io.druid.java.util.http.client.response.InputStreamResponseHandler;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.server.coordination.DataSegmentChangeCallback;
import io.druid.server.coordination.DataSegmentChangeHandler;
import io.druid.server.coordination.DataSegmentChangeRequest;
Expand All @@ -61,7 +61,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -261,6 +260,7 @@ public void onSuccess(InputStream result)
public void onFailure(Throwable t)
{
try {
responseHandler.description = t.toString();
logRequestFailure(t);
}
finally {
Expand Down Expand Up @@ -333,20 +333,15 @@ public void start()
ScheduledExecutors.scheduleAtFixedRate(
processingExecutor,
new Duration(config.getHttpLoadQueuePeonRepeatDelay()),
new Callable<ScheduledExecutors.Signal>()
{
@Override
public ScheduledExecutors.Signal call()
{
if (!stopped) {
doSegmentManagement();
}
() -> {
if (!stopped) {
doSegmentManagement();
}

if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
if (stopped) {
return ScheduledExecutors.Signal.STOP;
} else {
return ScheduledExecutors.Signal.REPEAT;
}
}
);
Expand All @@ -364,11 +359,11 @@ public void stop()
stopped = true;

for (SegmentHolder holder : segmentsToDrop.values()) {
holder.requestSucceeded();
holder.requestFailed("Stopping load queue peon.");
}

for (SegmentHolder holder : segmentsToLoad.values()) {
holder.requestSucceeded();
holder.requestFailed("Stopping load queue peon.");
}

segmentsToDrop.clear();
Expand All @@ -382,6 +377,16 @@ public void stop()
public void loadSegment(DataSegment segment, LoadPeonCallback callback)
{
synchronized (lock) {
if (stopped) {
log.warn(
"Server[%s] cannot load segment[%s] because load queue peon is stopped.",
serverId,
segment.getIdentifier()
);
callback.execute();
return;
}

SegmentHolder holder = segmentsToLoad.get(segment);

if (holder == null) {
Expand All @@ -398,6 +403,15 @@ public void loadSegment(DataSegment segment, LoadPeonCallback callback)
public void dropSegment(DataSegment segment, LoadPeonCallback callback)
{
synchronized (lock) {
if (stopped) {
log.warn(
"Server[%s] cannot drop segment[%s] because load queue peon is stopped.",
serverId,
segment.getIdentifier()
);
callback.execute();
return;
}
SegmentHolder holder = segmentsToDrop.get(segment);

if (holder == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ public void tearDown() throws Exception
tearDownServerAndCurator();
}

@Test(timeout = 5_000)
@Test(timeout = 10_000)
public void testMoveSegment() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.response.HttpResponseHandler;
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscovery;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.response.HttpResponseHandler;
import io.druid.server.ServerTestHelper;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.SegmentLoadDropHandler;
Expand All @@ -57,40 +57,92 @@
*/
public class HttpLoadQueuePeonTest
{
final DataSegment segment1 = new DataSegment(
"test1", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);

final DataSegment segment2 = new DataSegment(
"test2", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);

final DataSegment segment3 = new DataSegment(
"test3", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);

final DataSegment segment4 = new DataSegment(
"test4", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);

final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig(
null,
null,
null,
null,
null,
null,
10,
null,
false,
false,
Duration.ZERO
)
{
@Override
public int getHttpLoadQueuePeonBatchSize()
{
return 2;
}
};

@Test(timeout = 10000)
public void testSimple() throws Exception
{
final DataSegment segment1 = new DataSegment(
"test1", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
"http://dummy:4000",
ServerTestHelper.MAPPER,
new TestHttpClient(),
config,
Executors.newScheduledThreadPool(
2,
Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
),
Execs.singleThreaded("HttpLoadQueuePeonTest")
);

final DataSegment segment2 = new DataSegment(
"test2", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
httpLoadQueuePeon.start();

final DataSegment segment3 = new DataSegment(
"test3", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
Map<String, CountDownLatch> latches = ImmutableMap.of(
segment1.getIdentifier(), new CountDownLatch(1),
segment2.getIdentifier(), new CountDownLatch(1),
segment3.getIdentifier(), new CountDownLatch(1),
segment4.getIdentifier(), new CountDownLatch(1)
);

final DataSegment segment4 = new DataSegment(
"test4", Intervals.of("2014/2015"), "v1",
null, null, null, null, 0, 0
);
httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown());
httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown());
httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown());
httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown());

latches.get(segment1.getIdentifier()).await();
latches.get(segment2.getIdentifier()).await();
latches.get(segment3.getIdentifier()).await();
latches.get(segment4.getIdentifier()).await();

httpLoadQueuePeon.stop();
}

@Test(timeout = 10000)
public void testLoadDropAfterStop() throws Exception
{
HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon(
"http://dummy:4000",
ServerTestHelper.MAPPER,
new TestHttpClient(),
new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO) {
@Override
public int getHttpLoadQueuePeonBatchSize()
{
return 2;
}
},
config,
Executors.newScheduledThreadPool(
2,
Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s")
Expand All @@ -107,48 +159,16 @@ public int getHttpLoadQueuePeonBatchSize()
segment4.getIdentifier(), new CountDownLatch(1)
);

httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback()
{
@Override
public void execute()
{
latches.get(segment1.getIdentifier()).countDown();
}
});

httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback()
{
@Override
public void execute()
{
latches.get(segment2.getIdentifier()).countDown();
}
});

httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback()
{
@Override
public void execute()
{
latches.get(segment3.getIdentifier()).countDown();
}
});

httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback()
{
@Override
public void execute()
{
latches.get(segment4.getIdentifier()).countDown();
}
});

httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown());
httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown());
latches.get(segment1.getIdentifier()).await();
latches.get(segment2.getIdentifier()).await();
httpLoadQueuePeon.stop();
httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown());
httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown());
latches.get(segment3.getIdentifier()).await();
latches.get(segment4.getIdentifier()).await();

httpLoadQueuePeon.stop();
}

private static class TestDruidNodeDiscovery implements DruidNodeDiscovery
Expand Down Expand Up @@ -191,12 +211,17 @@ public <Intermediate, Final> ListenableFuture<Final> go(
httpResponseHandler.handleResponse(httpResponse);
try {
List<DataSegmentChangeRequest> changeRequests = ServerTestHelper.MAPPER.readValue(
request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>() {}
request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>()
{
}
);

List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> statuses = new ArrayList<>(changeRequests.size());
for (DataSegmentChangeRequest cr : changeRequests) {
statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr, SegmentLoadDropHandler.Status.SUCCESS));
statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(
cr,
SegmentLoadDropHandler.Status.SUCCESS
));
}
return (ListenableFuture) Futures.immediateFuture(
new ByteArrayInputStream(
Expand Down