From bdabcf996b2786bb22648abb0b93077c202fe12e Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Sun, 19 Apr 2026 20:01:21 -0700 Subject: [PATCH] Fix flaky MergeRollupMinionClusterIntegrationTest gauge assertions Extends the polling pattern introduced in #18253 (for mergeRollupTaskNumBucketsToProcess) to the remaining five mergeRollupTaskDelayInNumBuckets.* gaugeExists checks in the same test class. The gauge is registered by MergeRollupTaskGenerator.createOrUpdateDelayMetrics and removed by resetDelayMetrics when a scheduleTasks call observes no eligible segments. The per-iteration body's assertNull(scheduleTasks(context).get(RealtimeToOfflineSegmentsTask)) probe triggers an extra synchronized scheduleTasks that can race with the previous merge task's segment-lineage commit, transiently resetting the gauge and causing the post-loop assertTrue(gaugeExists(...)) to flake on the same window that #18253 addressed. A new waitForGaugesToExist(String...) helper polls via TestUtils.waitForCondition with the existing TIMEOUT_IN_MS, and is used in testOfflineTableSingleLevelConcat, testOfflineTableSingleLevelConcatWithMetadataPush, testOfflineTableSingleLevelRollup, testOfflineTableMultiLevelConcat (both 45days + 90days atomically), and testRealtimeTableSingleLevelConcat. Co-Authored-By: Claude Opus 4.7 (1M context) --- ...rgeRollupMinionClusterIntegrationTest.java | 43 +++++++++++++------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java index c22929be8768..5709a72b3c5c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java @@ -464,8 +464,7 @@ public void testOfflineTableSingleLevelConcat() // Check total tasks assertEquals(numTasks, 5); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable1_OFFLINE.100days"); // Drop the table dropOfflineTable(SINGLE_LEVEL_CONCAT_TEST_TABLE); @@ -582,8 +581,7 @@ public void testOfflineTableSingleLevelConcatWithMetadataPush() // Check total tasks assertEquals(numTasks, 5); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable4_OFFLINE.100days"); // Drop the table dropOfflineTable(SINGLE_LEVEL_CONCAT_METADATA_TEST_TABLE); @@ -706,8 +704,7 @@ public void testOfflineTableSingleLevelRollup() // Check total tasks assertEquals(numTasks, 3); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable2_OFFLINE.150days"); } /** @@ -853,10 +850,8 @@ public void testOfflineTableMultiLevelConcat() // Check total tasks assertEquals(numTasks, 8); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days")); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.45days", + "mergeRollupTaskDelayInNumBuckets.myTable3_OFFLINE.90days"); } protected void verifyTableDelete(String tableNameWithType) { @@ -911,6 +906,31 @@ private void waitForExpectedNumBucketsToProcess(String tableNameWithType, long e }, TIMEOUT_IN_MS, "Timeout while waiting for expected num buckets to process metrics on " + tableNameWithType); } + /** + * Poll until all of the named gauges exist on the controller. Used here for + * {@code mergeRollupTaskDelayInNumBuckets.*} after each test's scheduling loop completes. + * + *

Those gauges are (re)registered by {@link PinotTaskManager#scheduleTasks} via + * {@code MergeRollupTaskGenerator.createOrUpdateDelayMetrics}. They are removed by + * {@code resetDelayMetrics} when a {@code scheduleTasks} call observes no eligible segments for the + * table — which can happen transiently if a {@code scheduleTasks} call (e.g. the per-iteration + * {@code RealtimeToOfflineSegmentsTask} probe inside the for-loop body) lands while the previous + * merge task's segment-lineage commit is still in flight. Polling here mirrors + * {@link #waitForExpectedNumBucketsToProcess} so the post-loop assertion does not flake on the same + * race window. + */ + private void waitForGaugesToExist(String... metricNames) { + TestUtils.waitForCondition(aVoid -> { + ControllerMetrics controllerMetrics = _controllerStarter.getControllerMetrics(); + for (String metricName : metricNames) { + if (!MetricValueUtils.gaugeExists(controllerMetrics, metricName)) { + return false; + } + } + return true; + }, TIMEOUT_IN_MS, "Timeout while waiting for gauges to exist: " + String.join(", ", metricNames)); + } + // The use case is similar as the one defined in offline table @Test public void testRealtimeTableSingleLevelConcat() @@ -1010,8 +1030,7 @@ public void testRealtimeTableSingleLevelConcat() // Check total tasks assertEquals(numTasks, 5); - assertTrue(MetricValueUtils.gaugeExists(_controllerStarter.getControllerMetrics(), - "mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days")); + waitForGaugesToExist("mergeRollupTaskDelayInNumBuckets.myTable5_REALTIME.100days"); // Drop the table dropRealtimeTable(tableName);