Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.druid.server.coordinator.duty.MarkAsUnusedOvershadowedSegments;
import org.apache.druid.server.coordinator.duty.RunRules;
import org.apache.druid.server.coordinator.duty.UnloadUnusedSegments;
import org.apache.druid.server.coordinator.rules.BroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.LoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.initialization.ZkPathsConfig;
Expand Down Expand Up @@ -262,13 +263,25 @@ public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataS
}

/**
* segmentReplicantLookup use in this method could potentially be stale since it is only updated on coordinator runs.
* However, this is ok as long as the {@param dataSegments} is refreshed/latest as this would at least still ensure
* that the stale data in segmentReplicantLookup would be under counting replication levels,
* rather than potentially falsely reporting that everything is available.
*
* @return tier -> { dataSource -> underReplicationCount } map
*/
public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegments(
Iterable<DataSegment> dataSegments
)
{
final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
final Set<String> decommissioningServers = getDynamicConfigs().getDecommissioningNodes();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a previous PR there was a discussion about why it's ok for segmentReplicantLookup to be stale in this method: https://github.com/apache/druid/pull/9965/files#r440541949

What do you think about having that explanation as a code comment for this method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

final List<ImmutableDruidServer> broadcastTargetServers = serverInventoryView
.getInventory()
.stream()
.filter(druidServer -> druidServer.isSegmentBroadcastTarget() && !decommissioningServers.contains(druidServer.getHost()))
.map(DruidServer::toImmutableDruidServer)
.collect(Collectors.toList());

if (segmentReplicantLookup == null) {
return underReplicationCountsPerDataSourcePerTier;
Expand All @@ -280,20 +293,38 @@ public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataS
final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());

for (final Rule rule : rules) {
if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) {
if (!rule.appliesTo(segment, now)) {
continue;
}

((LoadRule) rule)
.getTieredReplicants()
.forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
if (rule instanceof LoadRule) {
((LoadRule) rule)
.getTieredReplicants()
.forEach((final String tier, final Integer ruleReplicants) -> {
int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
});
}

if (rule instanceof BroadcastDistributionRule) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Rule subclasses are added in the future and should be considered in this method, is there a test that will fail?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not right now. A Rule subclass may not always be needed to be considered in this method. Also not sure how the test will be able to automatically create new Rule subclass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding some comments to somewhere like Rule will be sufficient for now. Not sure how likely we'll add future Rules, but if we do I think there's a good chance we'll forget to update this method if it's needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (ImmutableDruidServer server : broadcastTargetServers) {
Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
.computeIfAbsent(server.getTier(), ignored -> new Object2LongOpenHashMap<>());
if (server.getSegment(segment.getId()) == null) {
((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
.addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
});
break; // only the first matching rule applies
.addTo(segment.getDataSource(), 1);
} else {
// This make sure that every datasource has a entry even if the all segments are loaded
underReplicationPerDataSource.putIfAbsent(segment.getDataSource(), 0);
}
}
}

// only the first matching rule applies
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.client.DataSourcesSnapshot;
import org.apache.druid.client.DruidDataSource;
import org.apache.druid.client.DruidServer;
Expand All @@ -52,6 +50,7 @@
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
Expand Down Expand Up @@ -104,10 +103,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
private ObjectMapper objectMapper;
private DruidNode druidNode;
private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
private boolean serverAddedCountExpected = true;

@Before
public void setUp() throws Exception
{
serverAddedCountExpected = true;
druidServer = EasyMock.createMock(DruidServer.class);
serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
segmentsMetadataManager = EasyMock.createNiceMock(SegmentsMetadataManager.class);
Expand Down Expand Up @@ -375,37 +376,22 @@ public void testCoordinatorRun() throws Exception
// This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());

final CountDownLatch assignSegmentLatch = new CountDownLatch(1);
pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
{
if (CuratorUtils.isChildAdded(event)) {
if (assignSegmentLatch.getCount() > 0) {
//Coordinator should try to assign segment to druidServer historical
//Simulate historical loading segment
druidServer.addDataSegment(dataSegment);
assignSegmentLatch.countDown();
} else {
Assert.fail("The same segment is assigned to the same server multiple times");
}
}
}
}
);
pathChildrenCache.start();

final CountDownLatch assignSegmentLatch = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
1,
pathChildrenCache,
ImmutableMap.of("2010-01-01T00:00:00.000Z_2010-01-02T00:00:00.000Z", dataSegment),
druidServer
);
assignSegmentLatch.await();
Assert.assertTrue(serverAddedCountExpected);

final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();

Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getId().toString()));

Object2IntMap<String> numsUnavailableUsedSegmentsPerDataSource =
coordinator.computeNumsUnavailableUsedSegmentsPerDataSource();
Expand Down Expand Up @@ -496,39 +482,171 @@ public void testCoordinatorTieredRun() throws Exception
coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader

final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2);
pathChildrenCache.getListenable().addListener(
(client, event) -> {
if (CuratorUtils.isChildAdded(event)) {
DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
if (segment != null) {
hotServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}
final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(2, pathChildrenCache, dataSegments, hotServer);
final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(1, pathChildrenCacheCold, dataSegments, coldServer);
assignSegmentLatchHot.await();
assignSegmentLatchCold.await();
Assert.assertTrue(serverAddedCountExpected);

assignSegmentLatchHot.countDown();
}
}
final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();

Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());

Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));

coordinator.stop();
leaderUnannouncerLatch.await();

EasyMock.verify(serverInventoryView);
EasyMock.verify(segmentsMetadataManager);
EasyMock.verify(metadataRuleManager);
}

@Test(timeout = 60_000L)
public void testComputeUnderReplicationCountsPerDataSourcePerTierForSegmentsWithBroadcastRule() throws Exception
{
final String dataSource = "dataSource";
final String hotTierName = "hot";
final String coldTierName = "cold";
final String tierName1 = "tier1";
final String tierName2 = "tier2";
final Rule broadcastDistributionRule = new ForeverBroadcastDistributionRule();
final String loadPathCold = "/druid/loadqueue/cold:1234";
final String loadPathBroker1 = "/druid/loadqueue/broker1:1234";
final String loadPathBroker2 = "/druid/loadqueue/broker2:1234";
final String loadPathPeon = "/druid/loadqueue/peon:1234";
final DruidServer hotServer = new DruidServer("hot", "hot", null, 5L, ServerType.HISTORICAL, hotTierName, 0);
final DruidServer coldServer = new DruidServer("cold", "cold", null, 5L, ServerType.HISTORICAL, coldTierName, 0);
final DruidServer brokerServer1 = new DruidServer("broker1", "broker1", null, 5L, ServerType.BROKER, tierName1, 0);
final DruidServer brokerServer2 = new DruidServer("broker2", "broker2", null, 5L, ServerType.BROKER, tierName2, 0);
final DruidServer peonServer = new DruidServer("peon", "peon", null, 5L, ServerType.INDEXER_EXECUTOR, tierName2, 0);

final Map<String, DataSegment> dataSegments = ImmutableMap.of(
"2018-01-02T00:00:00.000Z_2018-01-03T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-02/P1D"), "v1", null, null, null, null, 0x9, 0),
"2018-01-03T00:00:00.000Z_2018-01-04T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2018-01-03/P1D"), "v1", null, null, null, null, 0x9, 0),
"2017-01-01T00:00:00.000Z_2017-01-02T00:00:00.000Z",
new DataSegment(dataSource, Intervals.of("2017-01-01/P1D"), "v1", null, null, null, null, 0x9, 0)
);

final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1);
pathChildrenCacheCold.getListenable().addListener(
(CuratorFramework client, PathChildrenCacheEvent event) -> {
if (CuratorUtils.isChildAdded(event)) {
DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
final LoadQueuePeon loadQueuePeonCold = new CuratorLoadQueuePeon(
curator,
loadPathCold,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_cold_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_cold-%d"),
druidCoordinatorConfig
);

if (segment != null) {
coldServer.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}
final LoadQueuePeon loadQueuePeonBroker1 = new CuratorLoadQueuePeon(
curator,
loadPathBroker1,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker1_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_broker1-%d"),
druidCoordinatorConfig
);

assignSegmentLatchCold.countDown();
}
}
final LoadQueuePeon loadQueuePeonBroker2 = new CuratorLoadQueuePeon(
curator,
loadPathBroker2,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_broker2_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_broker2-%d"),
druidCoordinatorConfig
);

final LoadQueuePeon loadQueuePeonPoenServer = new CuratorLoadQueuePeon(
curator,
loadPathPeon,
objectMapper,
Execs.scheduledSingleThreaded("coordinator_test_load_queue_peon_peon_scheduled-%d"),
Execs.singleThreaded("coordinator_test_load_queue_peon_peon-%d"),
druidCoordinatorConfig
);
final PathChildrenCache pathChildrenCacheCold = new PathChildrenCache(
curator,
loadPathCold,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_cold-%d")
);
final PathChildrenCache pathChildrenCacheBroker1 = new PathChildrenCache(
curator,
loadPathBroker1,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_broker1-%d")
);
final PathChildrenCache pathChildrenCacheBroker2 = new PathChildrenCache(
curator,
loadPathBroker2,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_broker2-%d")
);
final PathChildrenCache pathChildrenCachePeon = new PathChildrenCache(
curator,
loadPathPeon,
true,
true,
Execs.singleThreaded("coordinator_test_path_children_cache_peon-%d")
);

loadManagementPeons.putAll(ImmutableMap.of("hot", loadQueuePeon,
"cold", loadQueuePeonCold,
"broker1", loadQueuePeonBroker1,
"broker2", loadQueuePeonBroker2,
"peon", loadQueuePeonPoenServer));

loadQueuePeonCold.start();
loadQueuePeonBroker1.start();
loadQueuePeonBroker2.start();
loadQueuePeonPoenServer.start();
pathChildrenCache.start();
pathChildrenCacheCold.start();
pathChildrenCacheBroker1.start();
pathChildrenCacheBroker2.start();
pathChildrenCachePeon.start();

DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
dataSegments.values().forEach(druidDataSources[0]::addSegment);

setupSegmentsMetadataMock(druidDataSources[0]);

EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(broadcastDistributionRule)).atLeastOnce();
EasyMock.expect(metadataRuleManager.getAllRules())
.andReturn(ImmutableMap.of(dataSource, ImmutableList.of(broadcastDistributionRule))).atLeastOnce();

EasyMock.expect(serverInventoryView.getInventory())
.andReturn(ImmutableList.of(hotServer, coldServer, brokerServer1, brokerServer2, peonServer))
.atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();

EasyMock.replay(metadataRuleManager, serverInventoryView);

coordinator.start();
leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader

final CountDownLatch assignSegmentLatchHot = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCache, dataSegments, hotServer);
final CountDownLatch assignSegmentLatchCold = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheCold, dataSegments, coldServer);
final CountDownLatch assignSegmentLatchBroker1 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker1, dataSegments, brokerServer1);
final CountDownLatch assignSegmentLatchBroker2 = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCacheBroker2, dataSegments, brokerServer2);
final CountDownLatch assignSegmentLatchPeon = createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(3, pathChildrenCachePeon, dataSegments, peonServer);
assignSegmentLatchHot.await();
assignSegmentLatchCold.await();
assignSegmentLatchBroker1.await();
assignSegmentLatchBroker2.await();
assignSegmentLatchPeon.await();
Assert.assertTrue(serverAddedCountExpected);

final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
Expand All @@ -538,9 +656,11 @@ public void testCoordinatorTieredRun() throws Exception

Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(4, underReplicationCountsPerDataSourcePerTier.size());
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName1).getLong(dataSource));
Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(tierName2).getLong(dataSource));

coordinator.stop();
leaderUnannouncerLatch.await();
Expand All @@ -550,6 +670,32 @@ public void testCoordinatorTieredRun() throws Exception
EasyMock.verify(metadataRuleManager);
}

private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(int latchCount,
PathChildrenCache pathChildrenCache,
Map<String, DataSegment> segments,
DruidServer server)
{
final CountDownLatch countDownLatch = new CountDownLatch(latchCount);
pathChildrenCache.getListenable().addListener(
(CuratorFramework client, PathChildrenCacheEvent event) -> {
if (CuratorUtils.isChildAdded(event)) {
if (countDownLatch.getCount() > 0) {
DataSegment segment = findSegmentRelatedToCuratorEvent(segments, event);
if (segment != null) {
server.addDataSegment(segment);
curator.delete().guaranteed().forPath(event.getData().getPath());
}
countDownLatch.countDown();
} else {
// The segment is assigned to the server more times than expected
serverAddedCountExpected = false;
}
}
}
);
return countDownLatch;
}

private void setupSegmentsMetadataMock(DruidDataSource dataSource)
{
EasyMock.expect(segmentsMetadataManager.isPollingDatabasePeriodically()).andReturn(true).anyTimes();
Expand Down