diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java index c22929be8768..5709a72b3c5c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java @@ -464,8 +464,7 @@ public void testOfflineTableSingleLevelConcat() // Check total tasks assertEquals(numTasks, 5); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days"); // Drop the table dropOfflineTable(SINGLE_LEVEL_CONCAT_TEST_TABLE); @@ -582,8 +581,7 @@ public void testOfflineTableSingleLevelConcatWithMetadataPush() // Check total tasks assertEquals(numTasks, 5); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days"); // Drop the table dropOfflineTable(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE); @@ -706,8 +704,7 @@ public void testOfflineTableSingleLevelRollup() // Check total tasks assertEquals(numTasks, 3); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days"); } /** @@ -853,10 +850,8 @@ public void testOfflineTableMultiLevelConcat() // Check total tasks assertEquals(numTasks, 8); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days")); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days", + "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days"); } protected void verifyTableDelete(String tableNameWithType) { @@ -911,6 +906,31 @@ private void waitForExpectedNumBucketsToProcess(String tableNameWithType, long e }, TIMEOUT_IN_MS, "Timeout while waiting for expected num buckets to process metrics on " + tableNameWithType); } + /** + * Poll until all of the named gauges exist on the controller. Used here for + * {@code mergeRollupTaskDelayInNumBuckets.*} after each test's scheduling loop completes. + * + *
Those gauges are (re)registered by {@link PinotTaskManager#scheduleTasks} via + * {@code MergeRollupTaskGenerator.createOrUpdateDelayMetrics}. They are removed by + * {@code resetDelayMetrics} when a {@code scheduleTasks} call observes no eligible segments for the + * table — which can happen transiently if a {@code scheduleTasks} call (e.g. the per-iteration + * {@code RealtimeToOfflineSegmentsTask} probe inside the for-loop body) lands while the previous + * merge task's segment-lineage commit is still in flight. Polling here mirrors + * {@link #waitForExpectedNumBucketsToProcess} so the post-loop assertion does not flake on the same + * race window. + */ + private void waitForGaugesToExist(String... metricNames) { + TestUtils.waitForCondition(aVoid -> { + ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics(); + for (String metricName : metricNames) { + if (!MetricValueUtils.gaugeExists(controllerMetrics, metricName)) { + return false; + } + } + return true; + }, TIMEOUT_IN_MS, "Timeout while waiting for gauges to exist: " + String.join(", ", metricNames)); + } + // The use case is similar as the one defined in offline table @Test public void testRealtimeTableSingleLevelConcat() @@ -1010,8 +1030,7 @@ public void testRealtimeTableSingleLevelConcat() // Check total tasks assertEquals(numTasks, 5); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days"); // Drop the table dropRealtimeTable(tableName);