Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public Map<String, LoadQueuePeon> getLoadManagementPeons()
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>())
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
});
break; // only the first matching rule applies
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.emitter.core.Event;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.lookup.cache.LookupCoordinatorManager;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -96,6 +98,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
private ObjectMapper objectMapper;
private JacksonConfigManager configManager;
private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100;
Expand Down Expand Up @@ -178,7 +181,7 @@ public String getBase()
serverInventoryView,
metadataRuleManager,
curator,
new NoopServiceEmitter(),
serviceEmitter,
scheduledExecutorFactory,
null,
null,
Expand Down Expand Up @@ -378,27 +381,17 @@ public void childEvent(

assignSegmentLatch.await();

final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();

Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getIdentifier()));
// Wait for coordinator thread to run so that replication status is updated
while (coordinator.getSegmentAvailability().getLong(dataSource) != 0) {
Thread.sleep(50);
}

Map segmentAvailability = coordinator.getSegmentAvailability();
Assert.assertEquals(1, segmentAvailability.size());
Assert.assertEquals(0L, segmentAvailability.get(dataSource));

while (coordinator.hasLoadPending(dataSource)) {
Thread.sleep(50);
}

// wait historical data to be updated
long startMillis = System.currentTimeMillis();
long coordinatorRunPeriodMillis = druidCoordinatorConfig.getCoordinatorPeriod().getMillis();
while (System.currentTimeMillis() - startMillis < coordinatorRunPeriodMillis) {
Thread.sleep(100);
}

Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
Assert.assertNotNull(replicationStatus);
Assert.assertEquals(1, replicationStatus.entrySet().size());
Expand All @@ -421,6 +414,127 @@ public void childEvent(
EasyMock.verify(metadataRuleManager);
}

@Test(timeout = 60_000L)
public void testCoordinatorTieredRun() throws Exception
{
final String dataSource = "dataSource", hotTierName = "hot", coldTierName = "cold";
final Rule hotTier = new IntervalLoadRule(Intervals.of("2018-01-01/P1M"), ImmutableMap.of(hotTierName, 1));
final Rule coldTier = new ForeverLoadRule(ImmutableMap.of(coldTierName, 1));
final String loadPathCold = "/druid/loadqueue/cold:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);

final Map<String, DataSegment> dataSegments = ImmutableMap.of(
"2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
"2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
"2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);

final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
curator,
loadPathCold,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
druidCoordinatorConfig
);
final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
curator, loadPathCold, true, true, Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
);
loadManagementPeons.putAll(ImmutableMap.of("hot", loadQueuePeon, "cold", loadQueuePeonCold));

loadQueuePeonCold.start();
pathChildrenCache.start();
pathChildrenCacheCold.start();

DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);

EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce();
EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
EasyMock.expect(databaseSegmentManager.getInventory()).andReturn(
ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
).atLeastOnce();
EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();

EasyMock.replay(metadataRuleManager, databaseSegmentManager, serverInventoryView);

coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader

final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2);
pathChildrenCache.getListenable().addListener(
(client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
DataSegment segment = dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);

if (segment != null) {
hotServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}

assignSegmentLatchHot.countDown();
}
}
);

final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1);
pathChildrenCacheCold.getListenable().addListener(
(client, event) -> {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
DataSegment segment = dataSegments
.entrySet()
.stream()
.filter(x -> event.getData().getPath().contains(x.getKey()))
.map(Map.Entry::getValue)
.findFirst()
.orElse(null);

if (segment != null) {
coldServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}

assignSegmentLatchCold.countDown();
}
}
);

assignSegmentLatchHot.await();
assignSegmentLatchCold.await();

final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();

Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());

Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
Assert.assertEquals(2, replicationStatus.entrySet().size());
Assert.assertEquals(0L, replicationStatus.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, replicationStatus.get(coldTierName).getLong(dataSource));

coordinator.stop();
leaderUnannouncerLatch.await();

EasyMock.verify(serverInventoryView);
EasyMock.verify(databaseSegmentManager);
EasyMock.verify(metadataRuleManager);
}

@Test
public void testOrderedAvailableDataSegments()
{
Expand Down Expand Up @@ -500,4 +614,22 @@ public void unregisterListener()
listener.stopBeingLeader();
}
}

private static class LatchableServiceEmitter extends ServiceEmitter
{
private CountDownLatch latch;

private LatchableServiceEmitter()
{
super("", "", null);
}

@Override
public void emit(Event event)
{
if (latch != null && "segment/count".equals(event.toMap().get("metric"))) {
latch.countDown();
}
}
}
}