diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index 8868c7673316..a4085048c227 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -252,6 +252,7 @@ public Map getLoadManagementPeons() .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>()) .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0)); }); + break; // only the first matching rule applies } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index d0e171759be1..f2d220923495 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -37,16 +37,18 @@ import io.druid.java.util.common.Intervals; import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import io.druid.java.util.emitter.core.Event; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.metadata.MetadataRuleManager; import io.druid.metadata.MetadataSegmentManager; import io.druid.server.DruidNode; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.ServerType; import io.druid.server.coordinator.rules.ForeverLoadRule; +import io.druid.server.coordinator.rules.IntervalLoadRule; import io.druid.server.coordinator.rules.Rule; import io.druid.server.initialization.ZkPathsConfig; import io.druid.server.lookup.cache.LookupCoordinatorManager; -import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import it.unimi.dsi.fastutil.objects.Object2LongMap; import org.apache.curator.framework.CuratorFramework; @@ -96,6 +98,7 @@ public class DruidCoordinatorTest extends CuratorTestBase private ObjectMapper objectMapper; private JacksonConfigManager configManager; private DruidNode druidNode; + private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter(); private static final String LOADPATH = "/druid/loadqueue/localhost:1234"; private static final long COORDINATOR_START_DELAY = 1; private static final long COORDINATOR_PERIOD = 100; @@ -185,7 +188,7 @@ public String getBase() serverInventoryView, metadataRuleManager, curator, - new NoopServiceEmitter(), + serviceEmitter, scheduledExecutorFactory, null, null, @@ -385,27 +388,17 @@ public void childEvent( assignSegmentLatch.await(); + final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); + serviceEmitter.latch = coordinatorRunLatch; + coordinatorRunLatch.await(); + Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus()); curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getIdentifier())); - // Wait for coordinator thread to run so that replication status is updated - while (coordinator.getSegmentAvailability().getLong(dataSource) != 0) { - Thread.sleep(50); - } + Map segmentAvailability = coordinator.getSegmentAvailability(); Assert.assertEquals(1, segmentAvailability.size()); Assert.assertEquals(0L, segmentAvailability.get(dataSource)); - while (coordinator.hasLoadPending(dataSource)) { - Thread.sleep(50); - } - - // wait historical data to be updated - long startMillis = System.currentTimeMillis(); - long coordinatorRunPeriodMillis = druidCoordinatorConfig.getCoordinatorPeriod().getMillis(); - while (System.currentTimeMillis() - startMillis < coordinatorRunPeriodMillis) { - Thread.sleep(100); - } - Map> replicationStatus = coordinator.getReplicationStatus(); Assert.assertNotNull(replicationStatus); Assert.assertEquals(1, replicationStatus.entrySet().size()); @@ -428,6 +421,127 @@ public void childEvent( EasyMock.verify(metadataRuleManager); } + @Test(timeout = 60_000L) + public void testCoordinatorTieredRun() throws Exception + { + final String dataSource = "dataSource", hotTierName = "hot", coldTierName = "cold"; + final Rule hotTier = new IntervalLoadRule(Intervals.of("2018-01-01/P1M"), ImmutableMap.of(hotTierName, 1)); + final Rule coldTier = new ForeverLoadRule(ImmutableMap.of(coldTierName, 1)); + final String loadPathCold = "/druid/loadqueue/cold:1234"; + final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0); + final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0); + + final Map dataSegments = ImmutableMap.of( + "2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z", + new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0), + "2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z", + new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0), + "2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z", + new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0) + ); + + final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon( + curator, + loadPathCold, + objectMapper, + Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"), + Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"), + druidCoordinatorConfig + ); + final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache( + curator, loadPathCold, true, true, Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d") + ); + loadManagementPeons.putAll(ImmutableMap.of("hot", loadQueuePeon, "cold", loadQueuePeonCold)); + + loadQueuePeonCold.start(); + pathChildrenCache.start(); + pathChildrenCacheCold.start(); + + DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())}; + dataSegments.values().forEach(druidDataSources[0]::addSegment); + + EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString())) + .andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce(); + EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes(); + EasyMock.expect(databaseSegmentManager.getInventory()).andReturn( + ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource()) + ).atLeastOnce(); + EasyMock.expect(serverInventoryView.getInventory()) + .andReturn(ImmutableList.of(hotServer, coldServer)) + .atLeastOnce(); + EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); + + EasyMock.replay(metadataRuleManager, databaseSegmentManager, serverInventoryView); + + coordinator.start(); + leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader + + final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2); + pathChildrenCache.getListenable().addListener( + (client, event) -> { + if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { + DataSegment segment = dataSegments + .entrySet() + .stream() + .filter(x -> event.getData().getPath().contains(x.getKey())) + .map(Map.Entry::getValue) + .findFirst() + .orElse(null); + + if (segment != null) { + hotServer.addDataSegment(segment); + curator.delete().guaranteed().forPath(event.getData().getPath()); + } + + assignSegmentLatchHot.countDown(); + } + } + ); + + final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1); + pathChildrenCacheCold.getListenable().addListener( + (client, event) -> { + if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { + DataSegment segment = dataSegments + .entrySet() + .stream() + .filter(x -> event.getData().getPath().contains(x.getKey())) + .map(Map.Entry::getValue) + .findFirst() + .orElse(null); + + if (segment != null) { + coldServer.addDataSegment(segment); + curator.delete().guaranteed().forPath(event.getData().getPath()); + } + + assignSegmentLatchCold.countDown(); + } + } + ); + + assignSegmentLatchHot.await(); + assignSegmentLatchCold.await(); + + final CountDownLatch coordinatorRunLatch = new CountDownLatch(2); + serviceEmitter.latch = coordinatorRunLatch; + coordinatorRunLatch.await(); + + Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus()); + + Map> replicationStatus = coordinator.getReplicationStatus(); + Assert.assertEquals(2, replicationStatus.entrySet().size()); + Assert.assertEquals(0L, replicationStatus.get(hotTierName).getLong(dataSource)); + Assert.assertEquals(0L, replicationStatus.get(coldTierName).getLong(dataSource)); + + coordinator.stop(); + leaderUnannouncerLatch.await(); + + EasyMock.verify(serverInventoryView); + EasyMock.verify(databaseSegmentManager); + EasyMock.verify(metadataRuleManager); + } + @Test public void testOrderedAvailableDataSegments() { @@ -507,4 +621,22 @@ public void unregisterListener() listener.stopBeingLeader(); } } + + private static class LatchableServiceEmitter extends ServiceEmitter + { + private CountDownLatch latch; + + private LatchableServiceEmitter() + { + super("", "", null); + } + + @Override + public void emit(Event event) + { + if (latch != null && "segment/count".equals(event.toMap().get("metric"))) { + latch.countDown(); + } + } + } }