Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public Object2IntMap<String> getDatasourceToUnavailableSegmentCount()
final Iterable<DataSegment> dataSegments = metadataManager.segments().iterateAllUsedSegments();
for (DataSegment segment : dataSegments) {
SegmentReplicaCount replicaCount = segmentReplicationStatus.getReplicaCountsInCluster(segment.getId());
if (replicaCount != null && replicaCount.totalLoaded() > 0) {
if (replicaCount != null && (replicaCount.totalLoaded() > 0 || replicaCount.required() == 0)) {
Copy link
Copy Markdown
Contributor

@cryptoe cryptoe Mar 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also document this change here :
/docs/operations/metrics.md#L333. Thinking more about it on how the doc would look, should we add a new metric type if replicaCount.required==0 ?
The reason I am saying this is that users on the regular sql/native endpoint will never see the data from used segment in deep storage ierequired==0 . They might get thrown off because of this.

I feel the current doc change for the metric fails to capture this nuance.
If we try to explain this nuance, it becomes complicated.
A new metric segment/deepStorageOnly/count would probably make more sense
no ?
Wdyt ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I did have similar thoughts.

Following the approach used in this PR, no segment would ever truly be considered unavailable since it can always be queried using MSQ-query-from-deep-storage.

The source of the confusion is the definition of unavailable/count itself.
Does unavailable denote a segment

  1. that should be loaded but is not?
    OR
  2. that should be queryable but is not?

The metric underReplicated/count corresponds with option 1 and this PR makes unavailable/count use option 1 too.

The docs currently include both the flavors in the definition of this metric (which is correct for the native SQL engine):

Number of unique segments left to load until all used segments are available for queries.

@georgew5656 , to eliminate the confusion, as @cryptoe suggests, I think we should:

  • keep this metric unchanged (in case there are downstream apps relying on its values being reported correctly for the native engine)
  • add a new metric for MSQ cases
  • update the new info in the docs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think this is changing the existing behavior afaik. before this change, if a segment was not loaded via a load rule it would not be included in segment/unavailable/count, now the behavior would be the same.

the difference would be that a segment that is not loaded onto a historical (hot rule) but is covered by a cold rule is no longer counted in segment/unavailable/count.

datasourceToUnavailableSegments.addTo(segment.getDataSource(), 0);
} else {
datasourceToUnavailableSegments.addTo(segment.getDataSource(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,113 @@ public void testCoordinatorCustomDutyGroupsRunAsExpected() throws Exception
latch2.await();
}

@Test(timeout = 60_000L)
public void testCoordinatorRun_queryFromDeepStorage() throws Exception
{
String dataSource = "dataSource1";

String coldTier = "coldTier";
String hotTier = "hotTier";

// Setup MetadataRuleManager
Rule intervalLoadRule = new IntervalLoadRule(Intervals.of("2010-02-01/P1M"), ImmutableMap.of(hotTier, 1), null);
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(coldTier, 0), null);
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(intervalLoadRule, foreverLoadRule)).atLeastOnce();

metadataRuleManager.stop();
EasyMock.expectLastCall().once();

EasyMock.replay(metadataRuleManager);

// Setup SegmentsMetadataManager
DruidDataSource[] dataSources = {
new DruidDataSource(dataSource, Collections.emptyMap())

};
final DataSegment dataSegment = new DataSegment(
dataSource,
Intervals.of("2010-01-01/P1D"),
"v1",
null,
null,
null,
null,
0x9,
0
);
final DataSegment dataSegmentHot = new DataSegment(
dataSource,
Intervals.of("2010-02-01/P1D"),
"v1",
null,
null,
null,
null,
0x9,
0
);
dataSources[0].addSegment(dataSegment).addSegment(dataSegmentHot);

setupSegmentsMetadataMock(dataSources[0]);
ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
EasyMock.expect(immutableDruidDataSource.getSegments())
.andReturn(ImmutableSet.of(dataSegment, dataSegmentHot)).atLeastOnce();
EasyMock.replay(immutableDruidDataSource);

// Setup ServerInventoryView
druidServer = new DruidServer("server1", "localhost", null, 5L, ServerType.HISTORICAL, hotTier, 0);
DruidServer druidServer2 = new DruidServer("server2", "localhost", null, 5L, ServerType.HISTORICAL, coldTier, 0);
setupPeons(ImmutableMap.of("server1", loadQueuePeon, "server2", loadQueuePeon));
EasyMock.expect(serverInventoryView.getInventory()).andReturn(
ImmutableList.of(druidServer, druidServer2)
).atLeastOnce();
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
EasyMock.replay(serverInventoryView, loadQueueTaskMaster);

coordinator.start();

// Wait for this coordinator to become leader
leaderAnnouncerLatch.await();

// This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());
pathChildrenCache.start();

final CountDownLatch coordinatorRunLatch = new CountDownLatch(2);
serviceEmitter.latch = coordinatorRunLatch;
coordinatorRunLatch.await();

Object2IntMap<String> numsUnavailableUsedSegmentsPerDataSource =
coordinator.getDatasourceToUnavailableSegmentCount();
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.size());
// The cold tier segment should not be unavailable, the hot one should be unavailable
Assert.assertEquals(1, numsUnavailableUsedSegmentsPerDataSource.getInt(dataSource));

Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
coordinator.getTierToDatasourceToUnderReplicatedCount(false);
Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());

Object2LongMap<String> underRepliicationCountsPerDataSourceHotTier = underReplicationCountsPerDataSourcePerTier.get(hotTier);
Assert.assertNotNull(underRepliicationCountsPerDataSourceHotTier);
Assert.assertEquals(1, underRepliicationCountsPerDataSourceHotTier.getLong(dataSource));

Object2LongMap<String> underRepliicationCountsPerDataSourceColdTier = underReplicationCountsPerDataSourcePerTier.get(coldTier);
Assert.assertNotNull(underRepliicationCountsPerDataSourceColdTier);
Assert.assertEquals(0, underRepliicationCountsPerDataSourceColdTier.getLong(dataSource));

coordinator.stop();
leaderUnannouncerLatch.await();

Assert.assertFalse(coordinator.isLeader());
Assert.assertNull(coordinator.getCurrentLeader());

EasyMock.verify(serverInventoryView);
EasyMock.verify(metadataRuleManager);
}

private CountDownLatch createCountDownLatchAndSetPathChildrenCacheListenerWithLatch(
int latchCount,
PathChildrenCache pathChildrenCache,
Expand Down