From 9800e088754f082467255c61575a8d61f9a6677b Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 29 Mar 2018 17:54:24 -0700 Subject: [PATCH 1/5] add stopped check and handling to HttpLoadQueuePeon load and drop segment methods --- .../server/coordinator/HttpLoadQueuePeon.java | 52 +++--- .../coordinator/HttpLoadQueuePeonTest.java | 151 ++++++++++-------- 2 files changed, 121 insertions(+), 82 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java index dbeeb7386018..4e6910978e52 100644 --- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -28,16 +28,16 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.RE; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.http.client.HttpClient; import io.druid.java.util.http.client.Request; import io.druid.java.util.http.client.io.AppendableByteArrayInputStream; import io.druid.java.util.http.client.response.ClientResponse; import io.druid.java.util.http.client.response.InputStreamResponseHandler; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.server.coordination.DataSegmentChangeCallback; import io.druid.server.coordination.DataSegmentChangeHandler; import io.druid.server.coordination.DataSegmentChangeRequest; @@ -61,7 +61,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; @@ -106,7 +105,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon private final DruidCoordinatorConfig config; private final ObjectMapper jsonMapper; - private final HttpClient httpClient; + public HttpClient httpClient; private final URL changeRequestURL; private final String serverId; @@ -261,6 +260,7 @@ public void onSuccess(InputStream result) public void onFailure(Throwable t) { try { + responseHandler.description = t.getMessage(); logRequestFailure(t); } finally { @@ -333,20 +333,15 @@ public void start() ScheduledExecutors.scheduleAtFixedRate( processingExecutor, new Duration(config.getHttpLoadQueuePeonRepeatDelay()), - new Callable() - { - @Override - public ScheduledExecutors.Signal call() - { - if (!stopped) { - doSegmentManagement(); - } + () -> { + if (!stopped) { + doSegmentManagement(); + } - if (stopped) { - return ScheduledExecutors.Signal.STOP; - } else { - return ScheduledExecutors.Signal.REPEAT; - } + if (stopped) { + return ScheduledExecutors.Signal.STOP; + } else { + return ScheduledExecutors.Signal.REPEAT; } } ); @@ -364,11 +359,11 @@ public void stop() stopped = true; for (SegmentHolder holder : segmentsToDrop.values()) { - holder.requestSucceeded(); + holder.requestFailed("Stopping load queue peon."); } for (SegmentHolder holder : segmentsToLoad.values()) { - holder.requestSucceeded(); + holder.requestFailed("Stopping load queue peon."); } segmentsToDrop.clear(); @@ -382,6 +377,16 @@ public void stop() public void loadSegment(DataSegment segment, LoadPeonCallback callback) { synchronized (lock) { + if (stopped) { + log.warn( + "Server[%s] failed to load segment[%s] because load queue peon is stopped.", + serverId, + segment.getIdentifier() + ); + callback.execute(); + return; + } + SegmentHolder holder = segmentsToLoad.get(segment); if (holder == null) { @@ -398,6 +403,15 @@ public void loadSegment(DataSegment segment, LoadPeonCallback callback) public void dropSegment(DataSegment segment, LoadPeonCallback callback) { synchronized (lock) { + if (stopped) { + log.warn( + "Server[%s] failed to drop segment[%s] because load queue peon is stopped.", + serverId, + segment.getIdentifier() + ); + callback.execute(); + return; + } SegmentHolder holder = segmentsToDrop.get(segment); if (holder == null) { diff --git a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java index 72fb9a36a5d5..c2388359a1ec 100644 --- a/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java +++ b/server/src/test/java/io/druid/server/coordinator/HttpLoadQueuePeonTest.java @@ -24,14 +24,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.druid.java.util.http.client.HttpClient; -import io.druid.java.util.http.client.Request; -import io.druid.java.util.http.client.response.HttpResponseHandler; import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscovery; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.RE; import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.HttpResponseHandler; import io.druid.server.ServerTestHelper; import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.SegmentLoadDropHandler; @@ -57,40 +57,92 @@ */ public class HttpLoadQueuePeonTest { + final DataSegment segment1 = new DataSegment( + "test1", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment2 = new DataSegment( + "test2", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment3 = new DataSegment( + "test3", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final DataSegment segment4 = new DataSegment( + "test4", Intervals.of("2014/2015"), "v1", + null, null, null, null, 0, 0 + ); + + final TestDruidCoordinatorConfig config = new TestDruidCoordinatorConfig( + null, + null, + null, + null, + null, + null, + 10, + null, + false, + false, + Duration.ZERO + ) + { + @Override + public int getHttpLoadQueuePeonBatchSize() + { + return 2; + } + }; + @Test(timeout = 10000) public void testSimple() throws Exception { - final DataSegment segment1 = new DataSegment( - "test1", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 + HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon( + "http://dummy:4000", + ServerTestHelper.MAPPER, + new TestHttpClient(), + config, + Executors.newScheduledThreadPool( + 2, + Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s") + ), + Execs.singleThreaded("HttpLoadQueuePeonTest") ); - final DataSegment segment2 = new DataSegment( - "test2", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + httpLoadQueuePeon.start(); - final DataSegment segment3 = new DataSegment( - "test3", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 + Map latches = ImmutableMap.of( + segment1.getIdentifier(), new CountDownLatch(1), + segment2.getIdentifier(), new CountDownLatch(1), + segment3.getIdentifier(), new CountDownLatch(1), + segment4.getIdentifier(), new CountDownLatch(1) ); - final DataSegment segment4 = new DataSegment( - "test4", Intervals.of("2014/2015"), "v1", - null, null, null, null, 0, 0 - ); + httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown()); + httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown()); + + latches.get(segment1.getIdentifier()).await(); + latches.get(segment2.getIdentifier()).await(); + latches.get(segment3.getIdentifier()).await(); + latches.get(segment4.getIdentifier()).await(); + httpLoadQueuePeon.stop(); + } + + @Test(timeout = 10000) + public void testLoadDropAfterStop() throws Exception + { HttpLoadQueuePeon httpLoadQueuePeon = new HttpLoadQueuePeon( "http://dummy:4000", ServerTestHelper.MAPPER, new TestHttpClient(), - new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO) { - @Override - public int getHttpLoadQueuePeonBatchSize() - { - return 2; - } - }, + config, Executors.newScheduledThreadPool( 2, Execs.makeThreadFactory("HttpLoadQueuePeonTest-%s") @@ -107,48 +159,16 @@ public int getHttpLoadQueuePeonBatchSize() segment4.getIdentifier(), new CountDownLatch(1) ); - httpLoadQueuePeon.dropSegment(segment1, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment1.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.loadSegment(segment2, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment2.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.dropSegment(segment3, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment3.getIdentifier()).countDown(); - } - }); - - httpLoadQueuePeon.loadSegment(segment4, new LoadPeonCallback() - { - @Override - public void execute() - { - latches.get(segment4.getIdentifier()).countDown(); - } - }); - + httpLoadQueuePeon.dropSegment(segment1, () -> latches.get(segment1.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment2, () -> latches.get(segment2.getIdentifier()).countDown()); latches.get(segment1.getIdentifier()).await(); latches.get(segment2.getIdentifier()).await(); + httpLoadQueuePeon.stop(); + httpLoadQueuePeon.dropSegment(segment3, () -> latches.get(segment3.getIdentifier()).countDown()); + httpLoadQueuePeon.loadSegment(segment4, () -> latches.get(segment4.getIdentifier()).countDown()); latches.get(segment3.getIdentifier()).await(); latches.get(segment4.getIdentifier()).await(); - httpLoadQueuePeon.stop(); } private static class TestDruidNodeDiscovery implements DruidNodeDiscovery @@ -191,12 +211,17 @@ public ListenableFuture go( httpResponseHandler.handleResponse(httpResponse); try { List changeRequests = ServerTestHelper.MAPPER.readValue( - request.getContent().array(), new TypeReference>() {} + request.getContent().array(), new TypeReference>() + { + } ); List statuses = new ArrayList<>(changeRequests.size()); for (DataSegmentChangeRequest cr : changeRequests) { - statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(cr, SegmentLoadDropHandler.Status.SUCCESS)); + statuses.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus( + cr, + SegmentLoadDropHandler.Status.SUCCESS + )); } return (ListenableFuture) Futures.immediateFuture( new ByteArrayInputStream( From 3a64f1fcd327ae805e3822b5cb38740cd2e3bbe9 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 29 Mar 2018 18:43:17 -0700 Subject: [PATCH 2/5] fix unrelated timeout :( --- .../druid/server/coordinator/CuratorDruidCoordinatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java index 19e9c3a1bcf4..256d819cd908 100644 --- a/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/CuratorDruidCoordinatorTest.java @@ -240,7 +240,7 @@ public void tearDown() throws Exception tearDownServerAndCurator(); } - @Test(timeout = 5_000) + @Test(timeout = 10_000) public void testMoveSegment() throws Exception { segmentViewInitLatch = new CountDownLatch(1); From 6355ea41683ea0cb02eda3a2ece8d31cc9dfe29a Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 30 Mar 2018 00:24:09 -0700 Subject: [PATCH 3/5] revert unintended change --- .../java/io/druid/server/coordinator/HttpLoadQueuePeon.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java index 4e6910978e52..859c638cb619 100644 --- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -105,7 +105,7 @@ public class HttpLoadQueuePeon extends LoadQueuePeon private final DruidCoordinatorConfig config; private final ObjectMapper jsonMapper; - public HttpClient httpClient; + private final HttpClient httpClient; private final URL changeRequestURL; private final String serverId; From b615846013c73f842f078388be44865740ccfab0 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 2 Apr 2018 12:16:00 -0700 Subject: [PATCH 4/5] PR feedback: change logging --- .../java/io/druid/server/coordinator/HttpLoadQueuePeon.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java index 859c638cb619..12f416bb78e1 100644 --- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -260,7 +260,7 @@ public void onSuccess(InputStream result) public void onFailure(Throwable t) { try { - responseHandler.description = t.getMessage(); + responseHandler.description = t.toString(); logRequestFailure(t); } finally { @@ -379,7 +379,7 @@ public void loadSegment(DataSegment segment, LoadPeonCallback callback) synchronized (lock) { if (stopped) { log.warn( - "Server[%s] failed to load segment[%s] because load queue peon is stopped.", + "Server[%s] cannot load segment[%s] because load queue peon is stopped.", serverId, segment.getIdentifier() ); @@ -405,7 +405,7 @@ public void dropSegment(DataSegment segment, LoadPeonCallback callback) synchronized (lock) { if (stopped) { log.warn( - "Server[%s] failed to drop segment[%s] because load queue peon is stopped.", + "Server[%s] cannot to drop segment[%s] because load queue peon is stopped.", serverId, segment.getIdentifier() ); From c82ff349ca418aeb59ff85453968e495a22d1134 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 2 Apr 2018 12:51:44 -0700 Subject: [PATCH 5/5] fix dumb --- .../java/io/druid/server/coordinator/HttpLoadQueuePeon.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java index 12f416bb78e1..ece1d4884fa1 100644 --- a/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java +++ b/server/src/main/java/io/druid/server/coordinator/HttpLoadQueuePeon.java @@ -405,7 +405,7 @@ public void dropSegment(DataSegment segment, LoadPeonCallback callback) synchronized (lock) { if (stopped) { log.warn( - "Server[%s] cannot to drop segment[%s] because load queue peon is stopped.", + "Server[%s] cannot drop segment[%s] because load queue peon is stopped.", serverId, segment.getIdentifier() );