Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,7 @@ public void testOfflineTableSingleLevelConcat()
// Check total tasks
assertEquals(numTasks, 5);

assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
"mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days"));
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days");
// Drop the table
dropOfflineTable(SINGLE_LEVEL_CONCAT_TEST_TABLE);

Expand Down Expand Up @@ -582,8 +581,7 @@ public void testOfflineTableSingleLevelConcatWithMetadataPush()
// Check total tasks
assertEquals(numTasks, 5);

assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
"mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days"));
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days");

// Drop the table
dropOfflineTable(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE);
Expand Down Expand Up @@ -706,8 +704,7 @@ public void testOfflineTableSingleLevelRollup()
// Check total tasks
assertEquals(numTasks, 3);

assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
"mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days"));
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days");
}

/**
Expand Down Expand Up @@ -853,10 +850,8 @@ public void testOfflineTableMultiLevelConcat()
// Check total tasks
assertEquals(numTasks, 8);

assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
"mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days"));
assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
"mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days"));
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days",
"mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days");
}

protected void verifyTableDelete(String tableNameWithType) {
Expand Down Expand Up @@ -911,6 +906,31 @@ private void waitForExpectedNumBucketsToProcess(String tableNameWithType, long e
}, TIMEOUT_IN_MS, "Timeout while waiting for expected num buckets to process metrics on " + tableNameWithType);
}

/**
* Poll until all of the named gauges exist on the controller. Used here for
* {@code mergeRollupTaskDelayInNumBuckets.*} after each test's scheduling loop completes.
*
* <p>Those gauges are (re)registered by {@link PinotTaskManager#scheduleTasks} via
* {@code MergeRollupTaskGenerator.createOrUpdateDelayMetrics}. They are removed by
* {@code resetDelayMetrics} when a {@code scheduleTasks} call observes no eligible segments for the
* table — which can happen transiently if a {@code scheduleTasks} call (e.g. the per-iteration
* {@code RealtimeToOfflineSegmentsTask} probe inside the for-loop body) lands while the previous
* merge task's segment-lineage commit is still in flight. Polling here mirrors
* {@link #waitForExpectedNumBucketsToProcess} so the post-loop assertion does not flake on the same
* race window.
*/
private void waitForGaugesToExist(String... metricNames) {
TestUtils.waitForCondition(aVoid -> {
ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics();
for (String metricName : metricNames) {
if (!MetricValueUtils.gaugeExists(controllerMetrics, metricName)) {
return false;
}
}
return true;
}, TIMEOUT_IN_MS, "Timeout while waiting for gauges to exist: " + String.join(", ", metricNames));
}

// The use case is similar as the one defined in offline table
@Test
public void testRealtimeTableSingleLevelConcat()
Expand Down Expand Up @@ -1010,8 +1030,7 @@ public void testRealtimeTableSingleLevelConcat()
// Check total tasks
assertEquals(numTasks, 5);

assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(),
"mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days"));
waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days");

// Drop the table
dropRealtimeTable(tableName);
Expand Down
Loading