From 7d079fafc35f1b9fc3e5242d20333d6fa92905b2 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 20 Mar 2025 14:28:12 +0530 Subject: [PATCH 01/13] Do not use segment metadata cache until leader has synced --- .../SqlSegmentMetadataTransactionFactory.java | 4 +- .../cache/HeapMemorySegmentMetadataCache.java | 47 ++++-------- .../druid/metadata/segment/cache/Metric.java | 2 +- .../cache/NoopSegmentMetadataCache.java | 6 ++ .../segment/cache/SegmentMetadataCache.java | 5 ++ ...exerSQLMetadataStorageCoordinatorTest.java | 73 +++++++++++++++++-- .../HeapMemorySegmentMetadataCacheTest.java | 63 +++++++--------- 7 files changed, 120 insertions(+), 80 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java index 5a6adbdf8fff..f4b7f743c981 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java @@ -81,7 +81,7 @@ public T inReadOnlyDatasourceTransaction( final SegmentMetadataTransaction sqlTransaction = createSqlTransaction(dataSource, handle, status); - if (segmentMetadataCache.isEnabled()) { + if (segmentMetadataCache.isReady()) { final DatasourceSegmentCache datasourceCache = segmentMetadataCache.getDatasource(dataSource); final SegmentMetadataReadTransaction cachedTransaction @@ -108,7 +108,7 @@ public T inReadWriteDatasourceTransaction( final SegmentMetadataTransaction sqlTransaction = createSqlTransaction(dataSource, handle, status); - if (segmentMetadataCache.isEnabled()) { + if (segmentMetadataCache.isReady()) { final DatasourceSegmentCache datasourceCache = segmentMetadataCache.getDatasource(dataSource); final SegmentMetadataTransaction cachedTransaction diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java index 2d177509547b..7e9023116f9c 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java @@ -62,6 +62,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -105,6 +106,8 @@ private enum CacheState private ListenableFuture nextSyncFuture = null; private int consecutiveSyncFailures = 0; + private final AtomicBoolean isCacheReady = new AtomicBoolean(false); + private final ConcurrentHashMap datasourceToSegmentCache = new ConcurrentHashMap<>(); @@ -197,10 +200,16 @@ public boolean isEnabled() return isCacheEnabled; } + @Override + public boolean isReady() + { + return isCacheEnabled && isCacheReady.get(); + } + @Override public DatasourceSegmentCache getDatasource(String dataSource) { - verifyCacheIsUsableAndAwaitSync(); + verifyCacheIsUsable(); emitMetric(dataSource, Metric.TRANSACTION_COUNT, 1); return getCacheForDatasource(dataSource); } @@ -212,11 +221,10 @@ private HeapMemoryDatasourceSegmentCache getCacheForDatasource(String dataSource /** * Verifies that the cache is enabled, started and has become leader. - * Also waits for the cache to be synced with metadata store. * * @throws DruidException if the cache is disabled, stopped or not leader. */ - private void verifyCacheIsUsableAndAwaitSync() + private void verifyCacheIsUsable() { if (!isCacheEnabled) { throw DruidException.defensive("Segment metadata cache is not enabled."); @@ -230,40 +238,13 @@ private void verifyCacheIsUsableAndAwaitSync() throw InternalServerError.exception("Not leader yet. Segment metadata cache is not usable."); case LEADER_FIRST_SYNC_PENDING: case LEADER_FIRST_SYNC_STARTED: - waitForCacheToFinishSync(); - verifyCacheIsUsableAndAwaitSync(); + throw InternalServerError.exception("Segment metadata cache is not synced yet."); case LEADER_READY: // Cache is now ready for use } } } - /** - * Waits for cache to become ready if we are leader and current state is - * {@link CacheState#LEADER_FIRST_SYNC_PENDING} or - * {@link CacheState#LEADER_FIRST_SYNC_STARTED}. - */ - private void waitForCacheToFinishSync() - { - synchronized (cacheStateLock) { - log.info("Waiting for cache to finish sync with metadata store."); - while (currentCacheState == CacheState.LEADER_FIRST_SYNC_PENDING - || currentCacheState == CacheState.LEADER_FIRST_SYNC_STARTED) { - try { - cacheStateLock.wait(READY_TIMEOUT_MILLIS); - } - catch (InterruptedException e) { - log.noStackTrace().info(e, "Interrupted while waiting for cache to be ready"); - } - catch (Exception e) { - log.noStackTrace().error(e, "Error while waiting for cache to be ready"); - throw DruidException.defensive(e, "Error while waiting for cache to be ready"); - } - } - log.info("Wait complete. Cache is now in state[%s].", currentCacheState); - } - } - private void updateCacheState(CacheState targetState, String message) { synchronized (cacheStateLock) { @@ -272,7 +253,9 @@ private void updateCacheState(CacheState targetState, String message) // Notify threads waiting for cache to be ready if (currentCacheState == CacheState.LEADER_READY) { - cacheStateLock.notifyAll(); + isCacheReady.set(true); + } else { + isCacheReady.set(false); } } } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java index 476b6b1f6aa6..9a3c2aeebf5f 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java @@ -50,7 +50,7 @@ private Metric() /** * Number of transactions performed on the cache for a datasource. */ - public static final String TRANSACTION_COUNT = "transactions"; + public static final String TRANSACTION_COUNT = METRIC_NAME_PREFIX + "transactions"; /** * Time taken in milliseconds for the latest sync with metadata store. diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java index 080464b44605..97370158a9db 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java @@ -51,6 +51,12 @@ public boolean isEnabled() return false; } + @Override + public boolean isReady() + { + return false; + } + @Override public DatasourceSegmentCache getDatasource(String dataSource) { diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java index cff12f54635c..b1ead0b90a92 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java @@ -49,6 +49,11 @@ public interface SegmentMetadataCache */ boolean isEnabled(); + /** + * @return true if the cache is enabled and ready to serve requests. + */ + boolean isReady(); + /** * Returns the cache for the given datasource. */ diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index b6578fa9a721..e501ad24bfd6 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -38,7 +38,7 @@ import org.apache.druid.metadata.segment.SegmentMetadataTransaction; import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory; import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache; -import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache; +import org.apache.druid.metadata.segment.cache.Metric; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.TestDataSource; @@ -91,6 +91,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; @RunWith(Parameterized.class) @@ -765,13 +766,7 @@ public void testTransactionalAnnounceRetryAndSuccess() throws IOException final AtomicLong attemptCounter = new AtomicLong(); final IndexerSQLMetadataStorageCoordinator failOnceCoordinator = new IndexerSQLMetadataStorageCoordinator( - new SqlSegmentMetadataTransactionFactory( - mapper, - derbyConnectorRule.metadataTablesConfigSupplier().get(), - derbyConnector, - new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() - ), + transactionFactory, mapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, @@ -924,6 +919,8 @@ public void testRetrieveSegmentForId() @Test public void testCleanUpgradeSegmentsTableForTask() { + Assume.assumeFalse(useSegmentCache); + final String taskToClean = "taskToClean"; final ReplaceTaskLock replaceLockToClean = new ReplaceTaskLock( taskToClean, @@ -3698,6 +3695,8 @@ public void testRetrieveUpgradedFromSegmentIds() @Test public void testRetrieveUpgradedFromSegmentIdsInBatches() { + Assume.assumeFalse(useSegmentCache); + final int size = 500; final int batchSize = 100; @@ -3917,6 +3916,64 @@ public void testCachedTransaction_cannotReadWhatItWrites() return 0; } ); + + emitter.verifyValue(Metric.TRANSACTION_COUNT, 1L); + } + + @Test + public void testCacheIsUsed_ifReady() + { + Assume.assumeTrue(useSegmentCache); + + Assert.assertTrue(segmentMetadataCache.isReady()); + + insertUsedSegments(Set.of(defaultSegment), Map.of()); + final Supplier> retrieveAction = + () -> coordinator.retrieveAllUsedSegments( + defaultSegment.getDataSource(), + Segments.INCLUDING_OVERSHADOWED + ); + + // Retrieve returns empty since cache is not synced with metadata store yet + Assert.assertTrue(retrieveAction.get().isEmpty()); + + refreshCache(); + Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); + + emitter.verifyEmitted(Metric.TRANSACTION_COUNT, 2); + } + + @Test + public void testCacheIsNotUsed_ifNotReady() + { + Assume.assumeTrue(useSegmentCache); + + segmentMetadataCache.stopBeingLeader(); + Assert.assertFalse(segmentMetadataCache.isReady()); + + final Supplier> retrieveAction = + () -> coordinator.retrieveAllUsedSegments( + defaultSegment.getDataSource(), + Segments.INCLUDING_OVERSHADOWED + ); + + insertUsedSegments(Set.of(defaultSegment), Map.of()); + + Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); + emitter.verifyNotEmitted(Metric.TRANSACTION_COUNT); + + // Become leader but cache will still not be used + segmentMetadataCache.becomeLeader(); + Assert.assertFalse(segmentMetadataCache.isReady()); + Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); + emitter.verifyNotEmitted(Metric.TRANSACTION_COUNT); + + // Sync the cache so that it becomes ready for use + refreshCache(); + refreshCache(); + Assert.assertTrue(segmentMetadataCache.isReady()); + Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); + emitter.verifyValue(Metric.TRANSACTION_COUNT, 1L); } private SegmentIdWithShardSpec allocatePendingSegment( diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java index 0b6cf606f44a..fe61b1218fe0 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java @@ -50,7 +50,6 @@ import org.junit.Rule; import org.junit.Test; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -230,50 +229,40 @@ public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() ); } - @Test(timeout = 60_000) - public void testGetDataSource_waitsForOneSync_afterBecomingLeader() throws InterruptedException + @Test + public void testGetDataSource_throwsException_ifCacheIsNotSynced() { setupTargetWithCaching(true); cache.start(); cache.becomeLeader(); - final List observedEventOrder = new ArrayList<>(); - - // Invoke getDatasource in Thread 1 - final Thread getDatasourceThread = new Thread(() -> { - cache.getDatasource(TestDataSource.WIKI); - observedEventOrder.add("getDatasource completed"); - }); - getDatasourceThread.start(); - - // Invoke syncCache in Thread 2 after a wait period - Thread.sleep(100); - final Thread syncCompleteThread = new Thread(() -> { - observedEventOrder.add("before first sync"); - syncCacheAfterBecomingLeader(); - }); - syncCompleteThread.start(); - - // Verify that the getDatasource call finishes only after the first sync - getDatasourceThread.join(); - syncCompleteThread.join(); - Assert.assertEquals( - List.of("before first sync", "getDatasource completed"), - observedEventOrder + DruidExceptionMatcher.internalServerError().expectMessageIs( + "Segment metadata cache is not synced yet." + ).assertThrowsAndMatches( + () -> cache.getDatasource(TestDataSource.WIKI) ); + } - // Verify that subsequent calls to getDatasource do not wait - final Thread getDatasourceThread2 = new Thread(() -> { - cache.getDatasource(TestDataSource.WIKI); - observedEventOrder.add("getDatasource 2 completed"); - }); - getDatasourceThread2.start(); - getDatasourceThread2.join(); + @Test + public void testIsReady_returnsTrue_whenCacheIsLeaderAndSynced() + { + setupTargetWithCaching(true); + Assert.assertFalse(cache.isReady()); - Assert.assertEquals( - List.of("before first sync", "getDatasource completed", "getDatasource 2 completed"), - observedEventOrder - ); + cache.start(); + Assert.assertFalse(cache.isReady()); + + syncCache(); + Assert.assertFalse(cache.isReady()); + + cache.becomeLeader(); + Assert.assertFalse(cache.isReady()); + + syncCache(); + syncCache(); + Assert.assertTrue(cache.isReady()); + + Assert.assertNotNull(cache.getDatasource(TestDataSource.WIKI)); } @Test From 052c61e677c4adf8135149da0980d30cad9f5402 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Mar 2025 12:14:32 +0530 Subject: [PATCH 02/13] Read from cache only when synced, but write even if sync is pending --- .../MaterializedViewSupervisorTest.java | 4 +- .../DatasourceOptimizerTest.java | 2 +- .../common/actions/TaskActionTestKit.java | 6 ++- .../common/task/IngestionTestBase.java | 3 +- .../overlord/TaskLockBoxConcurrencyTest.java | 4 +- .../indexing/overlord/TaskLockboxTest.java | 7 ++- .../indexing/overlord/TaskQueueScaleTest.java | 3 +- .../SeekableStreamIndexTaskTestBase.java | 4 +- .../IndexerSQLMetadataStorageCoordinator.java | 4 +- .../CachedSegmentMetadataTransaction.java | 52 ++++++++++++++----- .../SqlSegmentMetadataTransactionFactory.java | 49 ++++++++++++++--- .../cache/HeapMemorySegmentMetadataCache.java | 23 ++++---- .../druid/metadata/segment/cache/Metric.java | 14 ++++- .../cache/NoopSegmentMetadataCache.java | 9 +++- .../segment/cache/SegmentMetadataCache.java | 6 ++- ...exerSQLMetadataStorageCoordinatorTest.java | 51 ++++++++++++++---- ...orageCoordinatorSchemaPersistenceTest.java | 4 +- .../HeapMemorySegmentMetadataCacheTest.java | 41 +++------------ 18 files changed, 189 insertions(+), 97 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index 6c39d21c1f59..444eb7cb61b9 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -56,6 +56,7 @@ import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.segment.realtime.ChatHandlerProvider; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -113,7 +114,8 @@ public void setUp() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), objectMapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java index 999a38ffd3a0..78a8fb74f8be 100644 --- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java +++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java @@ -120,7 +120,7 @@ public void setUp() throws Exception derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance() ), jsonMapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 352b062fe947..3270e66f7086 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -47,6 +47,7 @@ import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.checkerframework.checker.units.qual.N; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.rules.ExternalResource; @@ -188,7 +189,7 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe Suppliers.ofInstance(metadataStorageTablesConfig), testDerbyConnector, (poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false), - new NoopServiceEmitter() + NoopServiceEmitter.instance() ); final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector(); @@ -199,7 +200,8 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe metadataStorageTablesConfig, testDerbyConnector, leaderSelector, - segmentMetadataCache + segmentMetadataCache, + NoopServiceEmitter.instance() ) { @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 57a918cd9e50..a66e5605889d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -332,7 +332,8 @@ private SqlSegmentMetadataTransactionFactory createTransactionFactory() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnectorRule.getConnector(), leaderSelector, - segmentMetadataCache + segmentMetadataCache, + NoopServiceEmitter.instance() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java index 6436ad3d35ca..a55117aad12d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java @@ -38,6 +38,7 @@ import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; @@ -88,7 +89,8 @@ public void setup() derby.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), objectMapper, derby.metadataTablesConfigSupplier().get(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 8e72d21fda81..48c5b5c4e94a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -63,6 +63,7 @@ import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec; @@ -136,7 +137,8 @@ public void setup() tablesConfig, derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), objectMapper, tablesConfig, @@ -477,7 +479,8 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() derby.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), loadedMapper, derby.metadataTablesConfigSupplier().get(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index 80026f137269..f54ea7fe5860 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -111,7 +111,8 @@ public void setUp() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnectorRule.getConnector(), new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), jsonMapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 214c024b4548..9e82196f8117 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -123,6 +123,7 @@ import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CompressionUtils; @@ -595,7 +596,8 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b derby.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), objectMapper, derby.metadataTablesConfigSupplier().get(), diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index ce092f8c6967..9b7590a82ee4 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -175,7 +175,7 @@ private Set doRetrieveUsedSegments( final Segments visibility ) { - return inReadWriteDatasourceTransaction( + return inReadOnlyDatasourceTransaction( dataSource, transaction -> { if (visibility == Segments.ONLY_VISIBLE) { @@ -259,7 +259,7 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv } private SegmentTimeline getTimelineForIntervals( - final SegmentMetadataTransaction transaction, + final SegmentMetadataReadTransaction transaction, final List intervals ) { diff --git a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java index da00d2918832..5d3e17d34818 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java @@ -58,21 +58,34 @@ class CachedSegmentMetadataTransaction implements SegmentMetadataTransaction private final DruidLeaderSelector leaderSelector; private final int startTerm; + private final boolean readFromCache; private boolean isRollingBack = false; private boolean isClosed = false; private final List> pendingCacheWrites = new ArrayList<>(); + /** + * Creates a transaction that may read or write from the cache or delegate to + * the metadata store if necessary. Writes are always done to the cache unless + * the update does not affect the cache contents. + * + * @param leaderSelector Used to ensure that leadership does not change during + * the course of the transaction. + * @param readFromCache If true, reads are done from cache. Otherwise, + * reads are done directly from the metadata store. + */ CachedSegmentMetadataTransaction( SegmentMetadataTransaction delegate, DatasourceSegmentCache metadataCache, - DruidLeaderSelector leaderSelector + DruidLeaderSelector leaderSelector, + boolean readFromCache ) { this.delegate = delegate; this.metadataCache = metadataCache; this.leaderSelector = leaderSelector; + this.readFromCache = readFromCache; if (leaderSelector.isLeader()) { this.startTerm = leaderSelector.localTerm(); @@ -142,19 +155,19 @@ public void close() @Override public Set findExistingSegmentIds(Set segments) { - return metadataCache.findExistingSegmentIds(segments); + return performReadAction(reader -> reader.findExistingSegmentIds(segments)); } @Override public Set findUsedSegmentIdsOverlapping(Interval interval) { - return metadataCache.findUsedSegmentIdsOverlapping(interval); + return performReadAction(reader -> reader.findUsedSegmentIdsOverlapping(interval)); } @Override public SegmentId findHighestUnusedSegmentId(Interval interval, String version) { - return metadataCache.findHighestUnusedSegmentId(interval, version); + return performReadAction(reader -> reader.findHighestUnusedSegmentId(interval, version)); } @Override @@ -174,19 +187,19 @@ public List findSegmentsWithSchema(Set segmentIds) @Override public Set findUsedSegmentsOverlappingAnyOf(List intervals) { - return metadataCache.findUsedSegmentsOverlappingAnyOf(intervals); + return performReadAction(reader -> reader.findUsedSegmentsOverlappingAnyOf(intervals)); } @Override public List findUsedSegments(Set segmentIds) { - return metadataCache.findUsedSegments(segmentIds); + return performReadAction(reader -> reader.findUsedSegments(segmentIds)); } @Override public Set findUsedSegmentsPlusOverlappingAnyOf(List intervals) { - return metadataCache.findUsedSegmentsPlusOverlappingAnyOf(intervals); + return performReadAction(reader -> reader.findUsedSegmentsPlusOverlappingAnyOf(intervals)); } @Override @@ -211,7 +224,7 @@ public DataSegment findSegment(SegmentId segmentId) @Override public DataSegment findUsedSegment(SegmentId segmentId) { - return metadataCache.findUsedSegment(segmentId); + return performReadAction(reader -> reader.findUsedSegment(segmentId)); } @Override @@ -220,7 +233,7 @@ public List findPendingSegmentIds( String sequencePreviousId ) { - return metadataCache.findPendingSegmentIds(sequenceName, sequencePreviousId); + return performReadAction(reader -> reader.findPendingSegmentIds(sequenceName, sequencePreviousId)); } @Override @@ -229,25 +242,25 @@ public List findPendingSegmentIdsWithExactInterval( Interval interval ) { - return metadataCache.findPendingSegmentIdsWithExactInterval(sequenceName, interval); + return performReadAction(reader -> reader.findPendingSegmentIdsWithExactInterval(sequenceName, interval)); } @Override public List findPendingSegmentsOverlapping(Interval interval) { - return metadataCache.findPendingSegmentsOverlapping(interval); + return performReadAction(reader -> reader.findPendingSegmentsOverlapping(interval)); } @Override public List findPendingSegmentsWithExactInterval(Interval interval) { - return metadataCache.findPendingSegmentsWithExactInterval(interval); + return performReadAction(reader -> reader.findPendingSegmentsWithExactInterval(interval)); } @Override public List findPendingSegments(String taskAllocatorId) { - return metadataCache.findPendingSegments(taskAllocatorId); + return performReadAction(reader -> reader.findPendingSegments(taskAllocatorId)); } // WRITE METHODS @@ -339,6 +352,19 @@ public int deletePendingSegmentsCreatedIn(Interval interval) ); } + /** + * Performs a read from cache only if {@link #readFromCache} is true. + * Otherwise, reads directly from the metadata store. + */ + private T performReadAction(Function action) + { + if (readFromCache) { + return action.apply(metadataCache); + } else { + return action.apply(delegate); + } + } + private T performWriteAction(Function action) { if (isClosed) { diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java index f4b7f743c981..1814fd106220 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java @@ -23,10 +23,15 @@ import com.google.inject.Inject; import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.discovery.DruidLeaderSelector; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; import org.apache.druid.metadata.segment.cache.DatasourceSegmentCache; +import org.apache.druid.metadata.segment.cache.Metric; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; +import org.apache.druid.query.DruidMetrics; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.TransactionStatus; @@ -40,6 +45,8 @@ */ public class SqlSegmentMetadataTransactionFactory implements SegmentMetadataTransactionFactory { + private static final Logger log = new Logger(SqlSegmentMetadataTransactionFactory.class); + private static final int QUIET_RETRIES = 2; private static final int MAX_RETRIES = 3; @@ -48,6 +55,7 @@ public class SqlSegmentMetadataTransactionFactory implements SegmentMetadataTran private final SQLMetadataConnector connector; private final DruidLeaderSelector leaderSelector; private final SegmentMetadataCache segmentMetadataCache; + private final ServiceEmitter emitter; @Inject public SqlSegmentMetadataTransactionFactory( @@ -55,7 +63,8 @@ public SqlSegmentMetadataTransactionFactory( MetadataStorageTablesConfig tablesConfig, SQLMetadataConnector connector, @IndexingService DruidLeaderSelector leaderSelector, - SegmentMetadataCache segmentMetadataCache + SegmentMetadataCache segmentMetadataCache, + ServiceEmitter emitter ) { this.jsonMapper = jsonMapper; @@ -63,6 +72,7 @@ public SqlSegmentMetadataTransactionFactory( this.connector = connector; this.leaderSelector = leaderSelector; this.segmentMetadataCache = segmentMetadataCache; + this.emitter = emitter; } public int getMaxRetries() @@ -81,12 +91,14 @@ public T inReadOnlyDatasourceTransaction( final SegmentMetadataTransaction sqlTransaction = createSqlTransaction(dataSource, handle, status); - if (segmentMetadataCache.isReady()) { + // For read-only transactions, use cache only if it is already synced + if (segmentMetadataCache.isSyncedForRead()) { final DatasourceSegmentCache datasourceCache = segmentMetadataCache.getDatasource(dataSource); final SegmentMetadataReadTransaction cachedTransaction - = new CachedSegmentMetadataTransaction(sqlTransaction, datasourceCache, leaderSelector); + = new CachedSegmentMetadataTransaction(sqlTransaction, datasourceCache, leaderSelector, true); + emitTransactionCount(Metric.READ_ONLY_TRANSACTIONS, dataSource); return datasourceCache.read(() -> executeReadAndClose(cachedTransaction, callback)); } else { return executeReadAndClose(createSqlTransaction(dataSource, handle, status), callback); @@ -108,11 +120,27 @@ public T inReadWriteDatasourceTransaction( final SegmentMetadataTransaction sqlTransaction = createSqlTransaction(dataSource, handle, status); - if (segmentMetadataCache.isReady()) { + if (segmentMetadataCache.isEnabled()) { + final boolean isCacheReadyForRead = segmentMetadataCache.isSyncedForRead(); final DatasourceSegmentCache datasourceCache = segmentMetadataCache.getDatasource(dataSource); - final SegmentMetadataTransaction cachedTransaction - = new CachedSegmentMetadataTransaction(sqlTransaction, datasourceCache, leaderSelector); + final SegmentMetadataTransaction cachedTransaction = new CachedSegmentMetadataTransaction( + sqlTransaction, + datasourceCache, + leaderSelector, + isCacheReadyForRead + ); + + if (isCacheReadyForRead) { + emitTransactionCount(Metric.READ_WRITE_TRANSACTIONS, dataSource); + } else { + log.warn( + "Starting read-write transaction for datasource[%s]. Reads will be done" + + " directly from metadata store since cache is not synced yet.", + dataSource + ); + emitTransactionCount(Metric.WRITE_ONLY_TRANSACTIONS, dataSource); + } return datasourceCache.write(() -> executeWriteAndClose(cachedTransaction, callback)); } else { @@ -163,4 +191,13 @@ private T executeReadAndClose( } } + private void emitTransactionCount(String metricName, String datasource) + { + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.DATASOURCE, datasource) + .setMetric(metricName, 1L) + ); + } + } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java index 7e9023116f9c..b5a0282f1540 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java @@ -68,6 +68,11 @@ /** * In-memory implementation of {@link SegmentMetadataCache}. + *

+ * Reads can be done from the cache only after it has synced with the metadata store. + * Writes can happen even when the sync is in progress. This is because the + * {@link #syncWithMetadataStore()} is able to reconcile differences based on + * the update time of a transaction. */ @ThreadSafe public class HeapMemorySegmentMetadataCache implements SegmentMetadataCache @@ -77,10 +82,6 @@ public class HeapMemorySegmentMetadataCache implements SegmentMetadataCache private static final int SQL_MAX_RETRIES = 3; private static final int SQL_QUIET_RETRIES = 2; - /** - * Maximum time to wait for cache to be ready. - */ - private static final int READY_TIMEOUT_MILLIS = 5 * 60_000; private static final int MIN_SYNC_DELAY_MILLIS = 1000; private static final int MAX_IMMEDIATE_SYNC_RETRIES = 3; @@ -201,7 +202,7 @@ public boolean isEnabled() } @Override - public boolean isReady() + public boolean isSyncedForRead() { return isCacheEnabled && isCacheReady.get(); } @@ -210,7 +211,6 @@ public boolean isReady() public DatasourceSegmentCache getDatasource(String dataSource) { verifyCacheIsUsable(); - emitMetric(dataSource, Metric.TRANSACTION_COUNT, 1); return getCacheForDatasource(dataSource); } @@ -238,9 +238,9 @@ private void verifyCacheIsUsable() throw InternalServerError.exception("Not leader yet. Segment metadata cache is not usable."); case LEADER_FIRST_SYNC_PENDING: case LEADER_FIRST_SYNC_STARTED: - throw InternalServerError.exception("Segment metadata cache is not synced yet."); + // Cache is now ready for writes case LEADER_READY: - // Cache is now ready for use + // Cache is now ready for both reads and writes } } } @@ -251,12 +251,7 @@ private void updateCacheState(CacheState targetState, String message) currentCacheState = targetState; log.info("%s. Cache is now in state[%s].", message, currentCacheState); - // Notify threads waiting for cache to be ready - if (currentCacheState == CacheState.LEADER_READY) { - isCacheReady.set(true); - } else { - isCacheReady.set(false); - } + isCacheReady.set(currentCacheState == CacheState.LEADER_READY); } } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java index 9a3c2aeebf5f..19e51317a915 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java @@ -48,9 +48,19 @@ private Metric() private static final String METRIC_NAME_PREFIX = "segment/metadataCache/"; /** - * Number of transactions performed on the cache for a datasource. + * Number of read-write transactions performed on the cache for a datasource. */ - public static final String TRANSACTION_COUNT = METRIC_NAME_PREFIX + "transactions"; + public static final String READ_WRITE_TRANSACTIONS = METRIC_NAME_PREFIX + "transactions/readWrite"; + + /** + * Number of write-only transactions performed on the cache for a datasource. + */ + public static final String WRITE_ONLY_TRANSACTIONS = METRIC_NAME_PREFIX + "transactions/writeOnly"; + + /** + * Number of read-only transactions performed on the cache for a datasource. + */ + public static final String READ_ONLY_TRANSACTIONS = METRIC_NAME_PREFIX + "transactions/readOnly"; /** * Time taken in milliseconds for the latest sync with metadata store. diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java index 97370158a9db..88e2cf05e3b6 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java @@ -21,6 +21,13 @@ public class NoopSegmentMetadataCache implements SegmentMetadataCache { + private static final NoopSegmentMetadataCache INSTANCE = new NoopSegmentMetadataCache(); + + public static NoopSegmentMetadataCache instance() + { + return INSTANCE; + } + @Override public void start() { @@ -52,7 +59,7 @@ public boolean isEnabled() } @Override - public boolean isReady() + public boolean isSyncedForRead() { return false; } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java index b1ead0b90a92..94073434274b 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java @@ -50,9 +50,11 @@ public interface SegmentMetadataCache boolean isEnabled(); /** - * @return true if the cache is enabled and ready to serve requests. + * @return true if the cache is enabled and synced with the metadata store. + * Reads can be done from the cache only if it is synced but writes can happen + * even before the sync has finished. */ - boolean isReady(); + boolean isSyncedForRead(); /** * Returns the cache for the given datasource. diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index e501ad24bfd6..f3dcd1bf6359 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -168,7 +168,8 @@ public void setUp() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, leaderSelector, - segmentMetadataCache + segmentMetadataCache, + emitter ) { @Override @@ -3917,7 +3918,7 @@ public void testCachedTransaction_cannotReadWhatItWrites() } ); - emitter.verifyValue(Metric.TRANSACTION_COUNT, 1L); + emitter.verifyValue(Metric.READ_WRITE_TRANSACTIONS, 1L); } @Test @@ -3925,7 +3926,7 @@ public void testCacheIsUsed_ifReady() { Assume.assumeTrue(useSegmentCache); - Assert.assertTrue(segmentMetadataCache.isReady()); + Assert.assertTrue(segmentMetadataCache.isSyncedForRead()); insertUsedSegments(Set.of(defaultSegment), Map.of()); final Supplier> retrieveAction = @@ -3940,16 +3941,16 @@ public void testCacheIsUsed_ifReady() refreshCache(); Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); - emitter.verifyEmitted(Metric.TRANSACTION_COUNT, 2); + emitter.verifyEmitted(Metric.READ_WRITE_TRANSACTIONS, 2); } @Test - public void testCacheIsNotUsed_ifNotReady() + public void testReadOperation_doesNotUseCache_ifNotSynced() { Assume.assumeTrue(useSegmentCache); segmentMetadataCache.stopBeingLeader(); - Assert.assertFalse(segmentMetadataCache.isReady()); + Assert.assertFalse(segmentMetadataCache.isSyncedForRead()); final Supplier> retrieveAction = () -> coordinator.retrieveAllUsedSegments( @@ -3960,20 +3961,48 @@ public void testCacheIsNotUsed_ifNotReady() insertUsedSegments(Set.of(defaultSegment), Map.of()); Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); - emitter.verifyNotEmitted(Metric.TRANSACTION_COUNT); + emitter.verifyNotEmitted(Metric.READ_ONLY_TRANSACTIONS); // Become leader but cache will still not be used segmentMetadataCache.becomeLeader(); - Assert.assertFalse(segmentMetadataCache.isReady()); + Assert.assertFalse(segmentMetadataCache.isSyncedForRead()); Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); - emitter.verifyNotEmitted(Metric.TRANSACTION_COUNT); + emitter.verifyNotEmitted(Metric.READ_ONLY_TRANSACTIONS); // Sync the cache so that it becomes ready for use refreshCache(); refreshCache(); - Assert.assertTrue(segmentMetadataCache.isReady()); + Assert.assertTrue(segmentMetadataCache.isSyncedForRead()); Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); - emitter.verifyValue(Metric.TRANSACTION_COUNT, 1L); + emitter.verifyValue(Metric.READ_ONLY_TRANSACTIONS, 1L); + } + + @Test + public void testWriteOperation_usesCache_ifNotSynced() + { + Assume.assumeTrue(useSegmentCache); + + // Lose and regain leadership + segmentMetadataCache.stopBeingLeader(); + segmentMetadataCache.becomeLeader(); + + Assert.assertTrue(segmentMetadataCache.isEnabled()); + Assert.assertFalse(segmentMetadataCache.isSyncedForRead()); + + final Supplier> writeAction = + () -> coordinator.commitSegments(Set.of(defaultSegment), null); + + // Cache is not synced yet and will be used only for write operations + Assert.assertEquals(Set.of(defaultSegment), writeAction.get()); + emitter.verifyValue(Metric.WRITE_ONLY_TRANSACTIONS, 1L); + + // Sync the cache to use it for both read and write operations + refreshCache(); + refreshCache(); + Assert.assertTrue(segmentMetadataCache.isSyncedForRead()); + + Assert.assertTrue(writeAction.get().isEmpty()); + emitter.verifyValue(Metric.READ_WRITE_TRANSACTIONS, 1L); } private SegmentIdWithShardSpec allocatePendingSegment( diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java index 9bed46b2c3a9..19dae6c85f0e 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java @@ -41,6 +41,7 @@ import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.segment.metadata.SegmentSchemaTestUtils; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -96,7 +97,8 @@ public void setUp() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ); coordinator = new IndexerSQLMetadataStorageCoordinator( transactionFactory, diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java index fe61b1218fe0..3c7039fe5de7 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java @@ -32,7 +32,6 @@ import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.TestDerbyConnector; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.TestDataSource; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -51,7 +50,6 @@ import org.junit.Test; import java.util.List; -import java.util.Map; import java.util.Set; public class HeapMemorySegmentMetadataCacheTest @@ -230,37 +228,23 @@ public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() } @Test - public void testGetDataSource_throwsException_ifCacheIsNotSynced() + public void testIsSyncedForRead_returnsTrue_whenCacheIsLeaderAndSynced() { setupTargetWithCaching(true); - cache.start(); - cache.becomeLeader(); - - DruidExceptionMatcher.internalServerError().expectMessageIs( - "Segment metadata cache is not synced yet." - ).assertThrowsAndMatches( - () -> cache.getDatasource(TestDataSource.WIKI) - ); - } - - @Test - public void testIsReady_returnsTrue_whenCacheIsLeaderAndSynced() - { - setupTargetWithCaching(true); - Assert.assertFalse(cache.isReady()); + Assert.assertFalse(cache.isSyncedForRead()); cache.start(); - Assert.assertFalse(cache.isReady()); + Assert.assertFalse(cache.isSyncedForRead()); syncCache(); - Assert.assertFalse(cache.isReady()); + Assert.assertFalse(cache.isSyncedForRead()); cache.becomeLeader(); - Assert.assertFalse(cache.isReady()); + Assert.assertFalse(cache.isSyncedForRead()); syncCache(); syncCache(); - Assert.assertTrue(cache.isReady()); + Assert.assertTrue(cache.isSyncedForRead()); Assert.assertNotNull(cache.getDatasource(TestDataSource.WIKI)); } @@ -620,19 +604,6 @@ public void testSync_removesPendingSegment_ifNotPresentInMetadataStore() ); } - @Test - public void testGetDatasource_increasesTransactionCount() - { - setupAndSyncCache(); - cache.getDatasource(TestDataSource.WIKI); - cache.getDatasource(TestDataSource.WIKI); - serviceEmitter.verifyEmitted( - Metric.TRANSACTION_COUNT, - Map.of(DruidMetrics.DATASOURCE, TestDataSource.WIKI), - 2 - ); - } - private void insertSegmentsInMetadataStore(Set segments) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); From e7075f1afd20e8fe3dc4957c10867698ae02fbcb Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Mar 2025 12:29:37 +0530 Subject: [PATCH 03/13] Fix compilation --- .../druid/query/materializedview/DatasourceOptimizerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java index 78a8fb74f8be..606c6a529587 100644 --- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java +++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java @@ -120,7 +120,8 @@ public void setUp() throws Exception derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - NoopSegmentMetadataCache.instance() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), jsonMapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), From 4bc587f73a04164e0c1ecb94368d4496c955fa3f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Mar 2025 14:30:34 +0530 Subject: [PATCH 04/13] Fix checkstyle, test --- .../apache/druid/indexing/common/actions/TaskActionTestKit.java | 1 - .../metadata/IndexerSQLMetadataStorageCoordinatorTest.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 3270e66f7086..05e5895520fc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -47,7 +47,6 @@ import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; import org.apache.druid.server.metrics.NoopServiceEmitter; -import org.checkerframework.checker.units.qual.N; import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.rules.ExternalResource; diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index f3dcd1bf6359..6f407680c130 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -3941,7 +3941,7 @@ public void testCacheIsUsed_ifReady() refreshCache(); Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); - emitter.verifyEmitted(Metric.READ_WRITE_TRANSACTIONS, 2); + emitter.verifyEmitted(Metric.READ_ONLY_TRANSACTIONS, 2); } @Test From 33a12312c2280aeb0cf7d63605c83bb8c911e427 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 27 Mar 2025 09:28:04 +0530 Subject: [PATCH 05/13] Revert some extra changes --- .../cache/HeapMemorySegmentMetadataCache.java | 60 ++++++++++------ .../HeapMemorySegmentMetadataCacheTest.java | 68 +++++++++++++++---- 2 files changed, 95 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java index b5a0282f1540..2d177509547b 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java @@ -62,17 +62,11 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** * In-memory implementation of {@link SegmentMetadataCache}. - *

- * Reads can be done from the cache only after it has synced with the metadata store. - * Writes can happen even when the sync is in progress. This is because the - * {@link #syncWithMetadataStore()} is able to reconcile differences based on - * the update time of a transaction. */ @ThreadSafe public class HeapMemorySegmentMetadataCache implements SegmentMetadataCache @@ -82,6 +76,10 @@ public class HeapMemorySegmentMetadataCache implements SegmentMetadataCache private static final int SQL_MAX_RETRIES = 3; private static final int SQL_QUIET_RETRIES = 2; + /** + * Maximum time to wait for cache to be ready. + */ + private static final int READY_TIMEOUT_MILLIS = 5 * 60_000; private static final int MIN_SYNC_DELAY_MILLIS = 1000; private static final int MAX_IMMEDIATE_SYNC_RETRIES = 3; @@ -107,8 +105,6 @@ private enum CacheState private ListenableFuture nextSyncFuture = null; private int consecutiveSyncFailures = 0; - private final AtomicBoolean isCacheReady = new AtomicBoolean(false); - private final ConcurrentHashMap datasourceToSegmentCache = new ConcurrentHashMap<>(); @@ -201,16 +197,11 @@ public boolean isEnabled() return isCacheEnabled; } - @Override - public boolean isSyncedForRead() - { - return isCacheEnabled && isCacheReady.get(); - } - @Override public DatasourceSegmentCache getDatasource(String dataSource) { - verifyCacheIsUsable(); + verifyCacheIsUsableAndAwaitSync(); + emitMetric(dataSource, Metric.TRANSACTION_COUNT, 1); return getCacheForDatasource(dataSource); } @@ -221,10 +212,11 @@ private HeapMemoryDatasourceSegmentCache getCacheForDatasource(String dataSource /** * Verifies that the cache is enabled, started and has become leader. + * Also waits for the cache to be synced with metadata store. * * @throws DruidException if the cache is disabled, stopped or not leader. */ - private void verifyCacheIsUsable() + private void verifyCacheIsUsableAndAwaitSync() { if (!isCacheEnabled) { throw DruidException.defensive("Segment metadata cache is not enabled."); @@ -238,20 +230,50 @@ private void verifyCacheIsUsable() throw InternalServerError.exception("Not leader yet. Segment metadata cache is not usable."); case LEADER_FIRST_SYNC_PENDING: case LEADER_FIRST_SYNC_STARTED: - // Cache is now ready for writes + waitForCacheToFinishSync(); + verifyCacheIsUsableAndAwaitSync(); case LEADER_READY: - // Cache is now ready for both reads and writes + // Cache is now ready for use } } } + /** + * Waits for cache to become ready if we are leader and current state is + * {@link CacheState#LEADER_FIRST_SYNC_PENDING} or + * {@link CacheState#LEADER_FIRST_SYNC_STARTED}. + */ + private void waitForCacheToFinishSync() + { + synchronized (cacheStateLock) { + log.info("Waiting for cache to finish sync with metadata store."); + while (currentCacheState == CacheState.LEADER_FIRST_SYNC_PENDING + || currentCacheState == CacheState.LEADER_FIRST_SYNC_STARTED) { + try { + cacheStateLock.wait(READY_TIMEOUT_MILLIS); + } + catch (InterruptedException e) { + log.noStackTrace().info(e, "Interrupted while waiting for cache to be ready"); + } + catch (Exception e) { + log.noStackTrace().error(e, "Error while waiting for cache to be ready"); + throw DruidException.defensive(e, "Error while waiting for cache to be ready"); + } + } + log.info("Wait complete. Cache is now in state[%s].", currentCacheState); + } + } + private void updateCacheState(CacheState targetState, String message) { synchronized (cacheStateLock) { currentCacheState = targetState; log.info("%s. Cache is now in state[%s].", message, currentCacheState); - isCacheReady.set(currentCacheState == CacheState.LEADER_READY); + // Notify threads waiting for cache to be ready + if (currentCacheState == CacheState.LEADER_READY) { + cacheStateLock.notifyAll(); + } } } diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java index 3c7039fe5de7..0b6cf606f44a 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java @@ -32,6 +32,7 @@ import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.TestDataSource; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -49,7 +50,9 @@ import org.junit.Rule; import org.junit.Test; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; public class HeapMemorySegmentMetadataCacheTest @@ -227,26 +230,50 @@ public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() ); } - @Test - public void testIsSyncedForRead_returnsTrue_whenCacheIsLeaderAndSynced() + @Test(timeout = 60_000) + public void testGetDataSource_waitsForOneSync_afterBecomingLeader() throws InterruptedException { setupTargetWithCaching(true); - Assert.assertFalse(cache.isSyncedForRead()); - cache.start(); - Assert.assertFalse(cache.isSyncedForRead()); - - syncCache(); - Assert.assertFalse(cache.isSyncedForRead()); - cache.becomeLeader(); - Assert.assertFalse(cache.isSyncedForRead()); - syncCache(); - syncCache(); - Assert.assertTrue(cache.isSyncedForRead()); + final List observedEventOrder = new ArrayList<>(); + + // Invoke getDatasource in Thread 1 + final Thread getDatasourceThread = new Thread(() -> { + cache.getDatasource(TestDataSource.WIKI); + observedEventOrder.add("getDatasource completed"); + }); + getDatasourceThread.start(); + + // Invoke syncCache in Thread 2 after a wait period + Thread.sleep(100); + final Thread syncCompleteThread = new Thread(() -> { + observedEventOrder.add("before first sync"); + syncCacheAfterBecomingLeader(); + }); + syncCompleteThread.start(); + + // Verify that the getDatasource call finishes only after the first sync + getDatasourceThread.join(); + syncCompleteThread.join(); + Assert.assertEquals( + List.of("before first sync", "getDatasource completed"), + observedEventOrder + ); - Assert.assertNotNull(cache.getDatasource(TestDataSource.WIKI)); + // Verify that subsequent calls to getDatasource do not wait + final Thread getDatasourceThread2 = new Thread(() -> { + cache.getDatasource(TestDataSource.WIKI); + observedEventOrder.add("getDatasource 2 completed"); + }); + getDatasourceThread2.start(); + getDatasourceThread2.join(); + + Assert.assertEquals( + List.of("before first sync", "getDatasource completed", "getDatasource 2 completed"), + observedEventOrder + ); } @Test @@ -604,6 +631,19 @@ public void testSync_removesPendingSegment_ifNotPresentInMetadataStore() ); } + @Test + public void testGetDatasource_increasesTransactionCount() + { + setupAndSyncCache(); + cache.getDatasource(TestDataSource.WIKI); + cache.getDatasource(TestDataSource.WIKI); + serviceEmitter.verifyEmitted( + Metric.TRANSACTION_COUNT, + Map.of(DruidMetrics.DATASOURCE, TestDataSource.WIKI), + 2 + ); + } + private void insertSegmentsInMetadataStore(Set segments) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); From bfbca0998123948276cf4fa3210002c67fd4d4f9 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 27 Mar 2025 14:03:46 +0530 Subject: [PATCH 06/13] Add 3 modes of cache usage --- .../actions/RetrieveSegmentsActionsTest.java | 1 - .../common/actions/TaskActionTestKit.java | 10 ++- .../common/task/IngestionTestBase.java | 6 +- .../SegmentsMetadataManagerConfig.java | 34 ++++++++-- .../cache/HeapMemorySegmentMetadataCache.java | 67 ++++++++++++++----- ...exerSQLMetadataStorageCoordinatorTest.java | 7 +- ...SegmentsMetadataManagerSchemaPollTest.java | 6 +- .../SqlSegmentsMetadataManagerTest.java | 4 +- .../HeapMemorySegmentMetadataCacheTest.java | 44 +++++------- .../duty/KillUnusedSegmentsTest.java | 2 +- 10 files changed, 118 insertions(+), 63 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 5800e3b1ec15..8ef97352921f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -99,7 +99,6 @@ private static DataSegment createSegment(Interval interval, String version) @Test public void testRetrieveUsedSegmentsAction() { - actionTestKit.syncSegmentMetadataCache(); final RetrieveUsedSegmentsAction action = new RetrieveUsedSegmentsAction(task.getDataSource(), ImmutableList.of(INTERVAL)); final Set observedUsedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 05e5895520fc..7c55f4f9fdc5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -66,7 +66,7 @@ public class TaskActionTestKit extends ExternalResource private SegmentMetadataCache segmentMetadataCache; private BlockingExecutorService metadataCachePollExec; - private boolean useSegmentMetadataCache = new SegmentsMetadataManagerConfig(null, null).isUseCache(); + private boolean useSegmentMetadataCache = false; private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isBatchAllocationReduceMetadataIO(); public TaskLockbox getTaskLockbox() @@ -182,9 +182,13 @@ public boolean isBatchAllocationReduceMetadataIO() private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMapper objectMapper) { metadataCachePollExec = new BlockingExecutorService("test-cache-poll-exec"); + SegmentsMetadataManagerConfig.UseCache cacheUsage + = useSegmentMetadataCache + ? SegmentsMetadataManagerConfig.UseCache.ALWAYS + : SegmentsMetadataManagerConfig.UseCache.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), useSegmentMetadataCache)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheUsage)), Suppliers.ofInstance(metadataStorageTablesConfig), testDerbyConnector, (poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false), @@ -223,6 +227,6 @@ public void after() taskActionToolbox = null; segmentMetadataCache.stopBeingLeader(); segmentMetadataCache.stop(); - useSegmentMetadataCache = new SegmentsMetadataManagerConfig(null, null).isUseCache(); + useSegmentMetadataCache = false; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index a66e5605889d..b509c9183306 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -315,9 +315,13 @@ public TaskToolbox createTaskToolbox(TaskConfig config, Task task, SupervisorMan private SqlSegmentMetadataTransactionFactory createTransactionFactory() { + final SegmentsMetadataManagerConfig.UseCache cacheUsage + = useSegmentMetadataCache + ? SegmentsMetadataManagerConfig.UseCache.ALWAYS + : SegmentsMetadataManagerConfig.UseCache.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), useSegmentMetadataCache)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheUsage)), derbyConnectorRule.metadataTablesConfigSupplier(), derbyConnectorRule.getConnector(), ScheduledExecutors::fixed, diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java index fc65de4af532..6075c1c9319b 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java @@ -32,23 +32,49 @@ public class SegmentsMetadataManagerConfig { public static final String CONFIG_PREFIX = "druid.manager.segments"; + /** + * Cache usage modes. + */ + public enum UseCache + { + /** + * Always read from the cache. Service start-up may be blocked until cache + * has synced with the metadata store at least once. Transactions may block + * until cache has synced with the metadata store at least once after + * becoming leader. + */ + ALWAYS, + + /** + * Cache is disabled. + */ + NEVER, + + /** + * Read from the cache only if it is already synced with the metadata store. + * Does not block service start-up or transactions. Writes may still go to + * cache to reduce sync times. + */ + IF_SYNCED + } + @JsonProperty private final Period pollDuration; @JsonProperty - private final boolean useCache; + private final UseCache useCache; @JsonCreator public SegmentsMetadataManagerConfig( @JsonProperty("pollDuration") Period pollDuration, - @JsonProperty("useCache") Boolean useCache + @JsonProperty("useCache") UseCache useCache ) { this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); - this.useCache = Configs.valueOrDefault(useCache, false); + this.useCache = Configs.valueOrDefault(useCache, UseCache.NEVER); } - public boolean isUseCache() + public UseCache getUseCache() { return useCache; } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java index 0ba5d8c8837a..94344d1f428a 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java @@ -77,6 +77,8 @@ * Non-leader Overlords also keep polling the metadata store to keep the cache * up-to-date in case leadership changes. *

+ * Cache usage modes: {@link SegmentsMetadataManagerConfig.UseCache}: + *

* The map {@link #datasourceToSegmentCache} contains the cache for each datasource. * Items are only added to this map and never removed. This is to avoid handling * race conditions where a thread has invoked {@link #getDatasource} but hasn't @@ -106,7 +108,7 @@ private enum CacheState private final ObjectMapper jsonMapper; private final Duration pollDuration; - private final boolean isCacheEnabled; + private final SegmentsMetadataManagerConfig.UseCache useCache; private final MetadataStorageTablesConfig tablesConfig; private final SQLMetadataConnector connector; @@ -114,6 +116,12 @@ private enum CacheState private final ServiceEmitter emitter; private final Object cacheStateLock = new Object(); + + /** + * Denotes that the cache is in state {@link CacheState#LEADER_READY}. + * Maintained as a separate variable to avoid acquiring the {@link #cacheStateLock} + * whenever {@link #isSyncedForRead()} is checked in a transaction. + */ private final AtomicBoolean isCacheReady = new AtomicBoolean(false); @GuardedBy("cacheStateLock") @@ -139,11 +147,11 @@ public HeapMemorySegmentMetadataCache( ) { this.jsonMapper = jsonMapper; - this.isCacheEnabled = config.get().isUseCache(); + this.useCache = config.get().getUseCache(); this.pollDuration = config.get().getPollDuration().toStandardDuration(); this.tablesConfig = tablesConfig.get(); this.connector = connector; - this.pollExecutor = isCacheEnabled + this.pollExecutor = isEnabled() ? MoreExecutors.listeningDecorator(executorFactory.create(1, "SegmentMetadataCache-%s")) : null; this.emitter = emitter; @@ -153,7 +161,7 @@ public HeapMemorySegmentMetadataCache( @Override public void start() { - if (!isCacheEnabled) { + if (!isEnabled()) { log.info("Segment metadata cache is not enabled."); return; } @@ -161,8 +169,28 @@ public void start() synchronized (cacheStateLock) { if (currentCacheState == CacheState.STOPPED) { updateCacheState(CacheState.FOLLOWER, "Scheduling sync with metadata store"); - scheduleSyncWithMetadataStore(pollDuration.getMillis()); } + + if (useCache == SegmentsMetadataManagerConfig.UseCache.ALWAYS) { + // Cache must always be used, do not finish startup until cache has synced + performFirstSync(); + } + + scheduleSyncWithMetadataStore(pollDuration.getMillis()); + } + } + + private void performFirstSync() + { + try { + log.info("Cache is in usage mode[%s]. Starting first sync with metadata store.", useCache); + + final long syncDurationMillis = syncWithMetadataStore(); + emitMetric(Metric.SYNC_DURATION_MILLIS, syncDurationMillis); + log.info("Finished first sync of cache with metadata store in [%d] millis.", syncDurationMillis); + } + catch (Throwable t) { + throw InternalServerError.exception(t, "Could not sync segment metadata cache with metadata store"); } } @@ -170,7 +198,7 @@ public void start() public void stop() { synchronized (cacheStateLock) { - if (isCacheEnabled) { + if (isEnabled()) { pollExecutor.shutdownNow(); datasourceToSegmentCache.forEach((datasource, cache) -> cache.stop()); datasourceToSegmentCache.clear(); @@ -185,7 +213,7 @@ public void stop() public void becomeLeader() { synchronized (cacheStateLock) { - if (isCacheEnabled) { + if (isEnabled()) { if (currentCacheState == CacheState.STOPPED) { throw DruidException.defensive("Cache has not been started yet"); } else { @@ -204,7 +232,7 @@ public void becomeLeader() public void stopBeingLeader() { synchronized (cacheStateLock) { - if (isCacheEnabled) { + if (isEnabled()) { updateCacheState(CacheState.FOLLOWER, "Not leader anymore"); } } @@ -213,13 +241,13 @@ public void stopBeingLeader() @Override public boolean isEnabled() { - return isCacheEnabled; + return useCache != SegmentsMetadataManagerConfig.UseCache.NEVER; } @Override public boolean isSyncedForRead() { - return isCacheEnabled && isCacheReady.get(); + return isEnabled() && isCacheReady.get(); } @Override @@ -236,13 +264,14 @@ private HeapMemoryDatasourceSegmentCache getCacheForDatasource(String dataSource /** * Verifies that the cache is enabled, started and has become leader. - * Also waits for the cache to be synced with metadata store. + * Also waits for the cache to be synced with the metadata store after becoming + * leader if {@link #useCache} is set to {@link SegmentsMetadataManagerConfig.UseCache#ALWAYS}. * * @throws DruidException if the cache is disabled, stopped or not leader. */ private void verifyCacheIsUsableAndAwaitSync() { - if (!isCacheEnabled) { + if (!isEnabled()) { throw DruidException.defensive("Segment metadata cache is not enabled."); } @@ -254,8 +283,10 @@ private void verifyCacheIsUsableAndAwaitSync() throw InternalServerError.exception("Not leader yet. Segment metadata cache is not usable."); case LEADER_FIRST_SYNC_PENDING: case LEADER_FIRST_SYNC_STARTED: - waitForCacheToFinishSync(); - verifyCacheIsUsableAndAwaitSync(); + if (useCache == SegmentsMetadataManagerConfig.UseCache.ALWAYS) { + waitForCacheToFinishSync(); + verifyCacheIsUsableAndAwaitSync(); + } case LEADER_READY: // Cache is now ready for use } @@ -294,10 +325,10 @@ private void updateCacheState(CacheState targetState, String message) currentCacheState = targetState; log.info("%s. Cache is now in state[%s].", message, currentCacheState); - // Notify threads waiting for cache to be ready - if (currentCacheState == CacheState.LEADER_READY) { - cacheStateLock.notifyAll(); - } + isCacheReady.set(currentCacheState == CacheState.LEADER_READY); + + // Notify threads waiting for cache to change state + cacheStateLock.notifyAll(); } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 3dd5d22d9f1b..9ad4d6d17dce 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -140,9 +140,14 @@ public void setUp() leaderSelector = new TestDruidLeaderSelector(); cachePollExecutor = new BlockingExecutorService("test-cache-poll-exec"); + + final SegmentsMetadataManagerConfig.UseCache cacheUsage + = useSegmentCache + ? SegmentsMetadataManagerConfig.UseCache.ALWAYS + : SegmentsMetadataManagerConfig.UseCache.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( mapper, - () -> new SegmentsMetadataManagerConfig(null, useSegmentCache), + () -> new SegmentsMetadataManagerConfig(null, cacheUsage), derbyConnectorRule.metadataTablesConfigSupplier(), derbyConnector, (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService( diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java index 834553655570..788de1fa0ffb 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java @@ -54,7 +54,7 @@ public class SqlSegmentsMetadataManagerSchemaPollTest extends SqlSegmentsMetadat public void setUp() throws Exception { connector = derbyConnectorRule.getConnector(); - SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(3), false); + SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); segmentSchemaCache = new SegmentSchemaCache(new NoopServiceEmitter()); segmentSchemaManager = new SegmentSchemaManager( @@ -129,7 +129,7 @@ public void testPollSegmentAndSchema() CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = new CentralizedDatasourceSchemaConfig(); centralizedDatasourceSchemaConfig.setEnabled(true); - config = new SegmentsMetadataManagerConfig(Period.seconds(3), false); + config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), @@ -217,7 +217,7 @@ public void testPollOnlyNewSchemaVersion() CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = new CentralizedDatasourceSchemaConfig(); centralizedDatasourceSchemaConfig.setEnabled(true); - config = new SegmentsMetadataManagerConfig(Period.seconds(3), false); + config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 1f71a3513a9e..c1f9a0d544f4 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -110,7 +110,7 @@ private void publishWikiSegments() public void setUp() { connector = derbyConnectorRule.getConnector(); - SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(3), false); + SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); storageConfig = derbyConnectorRule.metadataTablesConfigSupplier().get(); segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance()); @@ -1330,7 +1330,7 @@ public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() thr final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000"); // Re-create SqlSegmentsMetadataManager with a higher poll duration - final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(1), false); + final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(1), null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java index 2d3b69325398..e4ef43e06514 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java @@ -32,7 +32,6 @@ import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.TestDerbyConnector; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.TestDataSource; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -53,7 +52,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; public class HeapMemorySegmentMetadataCacheTest @@ -95,13 +93,13 @@ public void tearDown() /** * Creates the target {@link #cache} to be tested in the current test. */ - private void setupTargetWithCaching(boolean enabled) + private void setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache useCache) { if (cache != null) { throw new ISE("Test target has already been initialized with caching[%s]", cache.isEnabled()); } final SegmentsMetadataManagerConfig metadataManagerConfig - = new SegmentsMetadataManagerConfig(null, enabled); + = new SegmentsMetadataManagerConfig(null, useCache); cache = new HeapMemorySegmentMetadataCache( TestHelper.JSON_MAPPER, () -> metadataManagerConfig, @@ -114,7 +112,7 @@ private void setupTargetWithCaching(boolean enabled) private void setupAndSyncCache() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); cache.start(); cache.becomeLeader(); syncCacheAfterBecomingLeader(); @@ -134,13 +132,14 @@ private void syncCacheAfterBecomingLeader() */ private void syncCache() { + serviceEmitter.flush(); pollExecutor.finishNextPendingTasks(2); } @Test public void testStart_schedulesDbPoll_ifCacheIsEnabled() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); Assert.assertTrue(cache.isEnabled()); cache.start(); @@ -155,7 +154,7 @@ public void testStart_schedulesDbPoll_ifCacheIsEnabled() @Test public void testStart_doesNotScheduleDbPoll_ifCacheIsDisabled() { - setupTargetWithCaching(false); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.NEVER); Assert.assertFalse(cache.isEnabled()); cache.start(); @@ -166,7 +165,7 @@ public void testStart_doesNotScheduleDbPoll_ifCacheIsDisabled() @Test public void testStop_stopsDbPoll_ifCacheIsEnabled() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); Assert.assertTrue(cache.isEnabled()); cache.start(); @@ -180,7 +179,7 @@ public void testStop_stopsDbPoll_ifCacheIsEnabled() @Test public void testBecomeLeader_isNoop_ifCacheIsDisabled() { - setupTargetWithCaching(false); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.NEVER); cache.start(); Assert.assertFalse(pollExecutor.hasPendingTasks()); @@ -192,7 +191,7 @@ public void testBecomeLeader_isNoop_ifCacheIsDisabled() @Test public void testBecomeLeader_throwsException_ifCacheIsStopped() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); DruidExceptionMatcher.defensive().expectMessageIs( "Cache has not been started yet" ).assertThrowsAndMatches( @@ -203,7 +202,7 @@ public void testBecomeLeader_throwsException_ifCacheIsStopped() @Test public void testGetDataSource_throwsException_ifCacheIsDisabled() { - setupTargetWithCaching(false); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.NEVER); DruidExceptionMatcher.defensive().expectMessageIs( "Segment metadata cache is not enabled." ).assertThrowsAndMatches( @@ -214,7 +213,7 @@ public void testGetDataSource_throwsException_ifCacheIsDisabled() @Test public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); Assert.assertTrue(cache.isEnabled()); DruidExceptionMatcher.internalServerError().expectMessageIs( @@ -234,7 +233,7 @@ public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() @Test(timeout = 60_000) public void testGetDataSource_waitsForOneSync_afterBecomingLeader() throws InterruptedException { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); cache.start(); cache.becomeLeader(); @@ -338,7 +337,7 @@ public void testSync_emitsAlert_ifErrorOccurs() ); // Verify that sync duration is not emitted again - serviceEmitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 1); + serviceEmitter.verifyNotEmitted(Metric.SYNC_DURATION_MILLIS); } @Test @@ -361,7 +360,7 @@ public void testSync_doesNotFail_ifSegmentRecordIsBad() // Verify that sync completes successfully and updates the valid segment setupAndSyncCache(); serviceEmitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 1); - serviceEmitter.verifyValue(Metric.UPDATED_USED_SEGMENTS, 1L); + serviceEmitter.verifyValue(Metric.CACHED_USED_SEGMENTS, 1L); serviceEmitter.verifyValue(Metric.SKIPPED_SEGMENTS, 1L); final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); @@ -409,7 +408,7 @@ public void testSync_doesNotFail_ifPendingSegmentRecordIsBad() setupAndSyncCache(); serviceEmitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 1); serviceEmitter.verifyValue(Metric.SKIPPED_PENDING_SEGMENTS, 1L); - serviceEmitter.verifyValue(Metric.UPDATED_USED_SEGMENTS, 1L); + serviceEmitter.verifyValue(Metric.CACHED_USED_SEGMENTS, 1L); serviceEmitter.verifyValue(Metric.PERSISTED_USED_SEGMENTS, 1L); serviceEmitter.verifyValue(Metric.PERSISTED_PENDING_SEGMENTS, 0L); @@ -598,19 +597,6 @@ public void testSync_removesPendingSegment_ifNotPresentInMetadataStore() ); } - @Test - public void testGetDatasource_increasesTransactionCount() - { - setupAndSyncCache(); - cache.getDatasource(TestDataSource.WIKI); - cache.getDatasource(TestDataSource.WIKI); - serviceEmitter.verifyEmitted( - Metric.READ_WRITE_TRANSACTIONS, - Map.of(DruidMetrics.DATASOURCE, TestDataSource.WIKI), - 2 - ); - } - private void insertSegmentsInMetadataStore(Set segments) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 272ed1887ce7..1f02a5e9db97 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -111,7 +111,7 @@ public class KillUnusedSegmentsTest public void setup() { connector = derbyConnectorRule.getConnector(); - SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.millis(1), false); + SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.millis(1), null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( TestHelper.makeJsonMapper(), Suppliers.ofInstance(config), From edc4d8b9f73db43df770367c79a81450aac786bb Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 27 Mar 2025 14:11:27 +0530 Subject: [PATCH 07/13] Move enum to SegmentMetadataCache --- .../common/actions/TaskActionTestKit.java | 8 ++-- .../common/task/IngestionTestBase.java | 8 ++-- .../SegmentsMetadataManagerConfig.java | 37 +++---------------- .../cache/HeapMemorySegmentMetadataCache.java | 16 ++++---- .../segment/cache/SegmentMetadataCache.java | 25 +++++++++++++ ...exerSQLMetadataStorageCoordinatorTest.java | 8 ++-- .../HeapMemorySegmentMetadataCacheTest.java | 22 +++++------ 7 files changed, 62 insertions(+), 62 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 7c55f4f9fdc5..ed30db142a95 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -182,13 +182,13 @@ public boolean isBatchAllocationReduceMetadataIO() private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMapper objectMapper) { metadataCachePollExec = new BlockingExecutorService("test-cache-poll-exec"); - SegmentsMetadataManagerConfig.UseCache cacheUsage + SegmentMetadataCache.UsageMode cacheMode = useSegmentMetadataCache - ? SegmentsMetadataManagerConfig.UseCache.ALWAYS - : SegmentsMetadataManagerConfig.UseCache.NEVER; + ? SegmentMetadataCache.UsageMode.ALWAYS + : SegmentMetadataCache.UsageMode.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheUsage)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)), Suppliers.ofInstance(metadataStorageTablesConfig), testDerbyConnector, (poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index b509c9183306..6e7b30b4be27 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -315,13 +315,13 @@ public TaskToolbox createTaskToolbox(TaskConfig config, Task task, SupervisorMan private SqlSegmentMetadataTransactionFactory createTransactionFactory() { - final SegmentsMetadataManagerConfig.UseCache cacheUsage + final SegmentMetadataCache.UsageMode cacheMode = useSegmentMetadataCache - ? SegmentsMetadataManagerConfig.UseCache.ALWAYS - : SegmentsMetadataManagerConfig.UseCache.NEVER; + ? SegmentMetadataCache.UsageMode.ALWAYS + : SegmentMetadataCache.UsageMode.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheUsage)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)), derbyConnectorRule.metadataTablesConfigSupplier(), derbyConnectorRule.getConnector(), ScheduledExecutors::fixed, diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java index 6075c1c9319b..55a34e842c86 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; +import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; import org.joda.time.Period; /** @@ -32,51 +33,25 @@ public class SegmentsMetadataManagerConfig { public static final String CONFIG_PREFIX = "druid.manager.segments"; - /** - * Cache usage modes. - */ - public enum UseCache - { - /** - * Always read from the cache. Service start-up may be blocked until cache - * has synced with the metadata store at least once. Transactions may block - * until cache has synced with the metadata store at least once after - * becoming leader. - */ - ALWAYS, - - /** - * Cache is disabled. - */ - NEVER, - - /** - * Read from the cache only if it is already synced with the metadata store. - * Does not block service start-up or transactions. Writes may still go to - * cache to reduce sync times. - */ - IF_SYNCED - } - @JsonProperty private final Period pollDuration; @JsonProperty - private final UseCache useCache; + private final SegmentMetadataCache.UsageMode cacheMode; @JsonCreator public SegmentsMetadataManagerConfig( @JsonProperty("pollDuration") Period pollDuration, - @JsonProperty("useCache") UseCache useCache + @JsonProperty("useCache") SegmentMetadataCache.UsageMode cacheMode ) { this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); - this.useCache = Configs.valueOrDefault(useCache, UseCache.NEVER); + this.cacheMode = Configs.valueOrDefault(cacheMode, SegmentMetadataCache.UsageMode.NEVER); } - public UseCache getUseCache() + public SegmentMetadataCache.UsageMode getCacheMode() { - return useCache; + return cacheMode; } public Period getPollDuration() diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java index 94344d1f428a..bebc5d520dd9 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java @@ -77,7 +77,7 @@ * Non-leader Overlords also keep polling the metadata store to keep the cache * up-to-date in case leadership changes. *

- * Cache usage modes: {@link SegmentsMetadataManagerConfig.UseCache}: + * Cache usage modes: {@link UsageMode}: *

* The map {@link #datasourceToSegmentCache} contains the cache for each datasource. * Items are only added to this map and never removed. This is to avoid handling @@ -108,7 +108,7 @@ private enum CacheState private final ObjectMapper jsonMapper; private final Duration pollDuration; - private final SegmentsMetadataManagerConfig.UseCache useCache; + private final UsageMode cacheMode; private final MetadataStorageTablesConfig tablesConfig; private final SQLMetadataConnector connector; @@ -147,7 +147,7 @@ public HeapMemorySegmentMetadataCache( ) { this.jsonMapper = jsonMapper; - this.useCache = config.get().getUseCache(); + this.cacheMode = config.get().getCacheMode(); this.pollDuration = config.get().getPollDuration().toStandardDuration(); this.tablesConfig = tablesConfig.get(); this.connector = connector; @@ -171,7 +171,7 @@ public void start() updateCacheState(CacheState.FOLLOWER, "Scheduling sync with metadata store"); } - if (useCache == SegmentsMetadataManagerConfig.UseCache.ALWAYS) { + if (cacheMode == UsageMode.ALWAYS) { // Cache must always be used, do not finish startup until cache has synced performFirstSync(); } @@ -183,7 +183,7 @@ public void start() private void performFirstSync() { try { - log.info("Cache is in usage mode[%s]. Starting first sync with metadata store.", useCache); + log.info("Cache is in usage mode[%s]. Starting first sync with metadata store.", cacheMode); final long syncDurationMillis = syncWithMetadataStore(); emitMetric(Metric.SYNC_DURATION_MILLIS, syncDurationMillis); @@ -241,7 +241,7 @@ public void stopBeingLeader() @Override public boolean isEnabled() { - return useCache != SegmentsMetadataManagerConfig.UseCache.NEVER; + return cacheMode != UsageMode.NEVER; } @Override @@ -265,7 +265,7 @@ private HeapMemoryDatasourceSegmentCache getCacheForDatasource(String dataSource /** * Verifies that the cache is enabled, started and has become leader. * Also waits for the cache to be synced with the metadata store after becoming - * leader if {@link #useCache} is set to {@link SegmentsMetadataManagerConfig.UseCache#ALWAYS}. + * leader if {@link #cacheMode} is set to {@link UsageMode#ALWAYS}. * * @throws DruidException if the cache is disabled, stopped or not leader. */ @@ -283,7 +283,7 @@ private void verifyCacheIsUsableAndAwaitSync() throw InternalServerError.exception("Not leader yet. Segment metadata cache is not usable."); case LEADER_FIRST_SYNC_PENDING: case LEADER_FIRST_SYNC_STARTED: - if (useCache == SegmentsMetadataManagerConfig.UseCache.ALWAYS) { + if (cacheMode == UsageMode.ALWAYS) { waitForCacheToFinishSync(); verifyCacheIsUsableAndAwaitSync(); } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java index 78c9b35015ac..29458cca2b87 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java @@ -66,4 +66,29 @@ public interface SegmentMetadataCache */ DatasourceSegmentCache getDatasource(String dataSource); + /** + * Cache usage modes. + */ + enum UsageMode + { + /** + * Always read from the cache. Service start-up may be blocked until cache + * has synced with the metadata store at least once. Transactions may block + * until cache has synced with the metadata store at least once after + * becoming leader. + */ + ALWAYS, + + /** + * Cache is disabled. + */ + NEVER, + + /** + * Read from the cache only if it is already synced with the metadata store. + * Does not block service start-up or transactions. Writes may still go to + * cache to reduce sync times. + */ + IF_SYNCED + } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 9ad4d6d17dce..b34e395a18fe 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -141,13 +141,13 @@ public void setUp() cachePollExecutor = new BlockingExecutorService("test-cache-poll-exec"); - final SegmentsMetadataManagerConfig.UseCache cacheUsage + final SegmentMetadataCache.UsageMode cacheMode = useSegmentCache - ? SegmentsMetadataManagerConfig.UseCache.ALWAYS - : SegmentsMetadataManagerConfig.UseCache.NEVER; + ? SegmentMetadataCache.UsageMode.ALWAYS + : SegmentMetadataCache.UsageMode.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( mapper, - () -> new SegmentsMetadataManagerConfig(null, cacheUsage), + () -> new SegmentsMetadataManagerConfig(null, cacheMode), derbyConnectorRule.metadataTablesConfigSupplier(), derbyConnector, (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService( diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java index e4ef43e06514..89ad8139dea3 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java @@ -93,13 +93,13 @@ public void tearDown() /** * Creates the target {@link #cache} to be tested in the current test. */ - private void setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache useCache) + private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode) { if (cache != null) { throw new ISE("Test target has already been initialized with caching[%s]", cache.isEnabled()); } final SegmentsMetadataManagerConfig metadataManagerConfig - = new SegmentsMetadataManagerConfig(null, useCache); + = new SegmentsMetadataManagerConfig(null, cacheMode); cache = new HeapMemorySegmentMetadataCache( TestHelper.JSON_MAPPER, () -> metadataManagerConfig, @@ -112,7 +112,7 @@ private void setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache useCa private void setupAndSyncCache() { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); cache.start(); cache.becomeLeader(); syncCacheAfterBecomingLeader(); @@ -139,7 +139,7 @@ private void syncCache() @Test public void testStart_schedulesDbPoll_ifCacheIsEnabled() { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); Assert.assertTrue(cache.isEnabled()); cache.start(); @@ -154,7 +154,7 @@ public void testStart_schedulesDbPoll_ifCacheIsEnabled() @Test public void testStart_doesNotScheduleDbPoll_ifCacheIsDisabled() { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.NEVER); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER); Assert.assertFalse(cache.isEnabled()); cache.start(); @@ -165,7 +165,7 @@ public void testStart_doesNotScheduleDbPoll_ifCacheIsDisabled() @Test public void testStop_stopsDbPoll_ifCacheIsEnabled() { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); Assert.assertTrue(cache.isEnabled()); cache.start(); @@ -179,7 +179,7 @@ public void testStop_stopsDbPoll_ifCacheIsEnabled() @Test public void testBecomeLeader_isNoop_ifCacheIsDisabled() { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.NEVER); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER); cache.start(); Assert.assertFalse(pollExecutor.hasPendingTasks()); @@ -191,7 +191,7 @@ public void testBecomeLeader_isNoop_ifCacheIsDisabled() @Test public void testBecomeLeader_throwsException_ifCacheIsStopped() { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); DruidExceptionMatcher.defensive().expectMessageIs( "Cache has not been started yet" ).assertThrowsAndMatches( @@ -202,7 +202,7 @@ public void testBecomeLeader_throwsException_ifCacheIsStopped() @Test public void testGetDataSource_throwsException_ifCacheIsDisabled() { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.NEVER); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER); DruidExceptionMatcher.defensive().expectMessageIs( "Segment metadata cache is not enabled." ).assertThrowsAndMatches( @@ -213,7 +213,7 @@ public void testGetDataSource_throwsException_ifCacheIsDisabled() @Test public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); Assert.assertTrue(cache.isEnabled()); DruidExceptionMatcher.internalServerError().expectMessageIs( @@ -233,7 +233,7 @@ public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() @Test(timeout = 60_000) public void testGetDataSource_waitsForOneSync_afterBecomingLeader() throws InterruptedException { - setupTargetWithCaching(SegmentsMetadataManagerConfig.UseCache.ALWAYS); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); cache.start(); cache.becomeLeader(); From f0e94d2107dcaf32637b25e660ed8b7136c346b7 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 27 Mar 2025 14:34:53 +0530 Subject: [PATCH 08/13] Run tests in all 3 cache modes --- ...exerSQLMetadataStorageCoordinatorTest.java | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index b34e395a18fe..6671fe4e1332 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -105,17 +105,21 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata private SqlSegmentMetadataTransactionFactory transactionFactory; private BlockingExecutorService cachePollExecutor; - private final boolean useSegmentCache; + private final SegmentMetadataCache.UsageMode cacheMode; - @Parameterized.Parameters(name = "useSegmentCache = {0}") + @Parameterized.Parameters(name = "cacheMode = {0}") public static Object[][] testParameters() { - return new Object[][]{{true}, {false}}; + return new Object[][]{ + {SegmentMetadataCache.UsageMode.ALWAYS}, + {SegmentMetadataCache.UsageMode.NEVER}, + {SegmentMetadataCache.UsageMode.IF_SYNCED} + }; } - public IndexerSQLMetadataStorageCoordinatorTest(boolean useSegmentCache) + public IndexerSQLMetadataStorageCoordinatorTest(SegmentMetadataCache.UsageMode cacheMode) { - this.useSegmentCache = useSegmentCache; + this.cacheMode = cacheMode; } @Before @@ -141,10 +145,6 @@ public void setUp() cachePollExecutor = new BlockingExecutorService("test-cache-poll-exec"); - final SegmentMetadataCache.UsageMode cacheMode - = useSegmentCache - ? SegmentMetadataCache.UsageMode.ALWAYS - : SegmentMetadataCache.UsageMode.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( mapper, () -> new SegmentsMetadataManagerConfig(null, cacheMode), @@ -161,10 +161,11 @@ public void setUp() leaderSelector.becomeLeader(); // Get the cache ready if required - if (useSegmentCache) { + if (isCacheEnabled()) { segmentMetadataCache.start(); segmentMetadataCache.becomeLeader(); - cachePollExecutor.finishNextPendingTasks(4); + refreshCache(); + refreshCache(); } transactionFactory = new SqlSegmentMetadataTransactionFactory( @@ -214,12 +215,17 @@ public void tearDown() leaderSelector.stopBeingLeader(); } - void refreshCache() + private void refreshCache() { - if (useSegmentCache) { + if (isCacheEnabled()) { cachePollExecutor.finishNextPendingTasks(2); } } + + private boolean isCacheEnabled() + { + return cacheMode != SegmentMetadataCache.UsageMode.NEVER; + } @Test public void testCommitAppendSegments() @@ -937,7 +943,7 @@ public void testRetrieveSegmentForId() @Test public void testCleanUpgradeSegmentsTableForTask() { - Assume.assumeFalse(useSegmentCache); + Assume.assumeFalse(isCacheEnabled()); final String taskToClean = "taskToClean"; final ReplaceTaskLock replaceLockToClean = new ReplaceTaskLock( @@ -2182,7 +2188,7 @@ public void testSecondHalfEternitySegmentWithStringComparison() public void testLargeIntervalWithStringComparison() { // Known Issue when not using cache: https://github.com/apache/druid/issues/12860 - Assume.assumeTrue(useSegmentCache); + Assume.assumeTrue(isCacheEnabled()); coordinator.commitSegments( ImmutableSet.of( @@ -2283,7 +2289,7 @@ public void testDeleteSegmentsInMetaDataStorage() @Test public void testUpdateSegmentsInMetaDataStorage() { - Assume.assumeFalse(useSegmentCache); + Assume.assumeFalse(isCacheEnabled()); // Published segments to MetaDataStorage coordinator.commitSegments(SEGMENTS, new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)); @@ -3714,7 +3720,7 @@ public void testRetrieveUpgradedFromSegmentIds() @Test public void testRetrieveUpgradedFromSegmentIdsInBatches() { - Assume.assumeFalse(useSegmentCache); + Assume.assumeFalse(isCacheEnabled()); final int size = 500; final int batchSize = 100; @@ -3914,7 +3920,7 @@ public void testRetrieveUsedSegmentsForSegmentAllocation() @Test public void testCachedTransaction_cannotReadWhatItWrites() { - Assume.assumeTrue(useSegmentCache); + Assume.assumeTrue(isCacheEnabled()); transactionFactory.inReadWriteDatasourceTransaction( TestDataSource.WIKI, @@ -3940,9 +3946,9 @@ public void testCachedTransaction_cannotReadWhatItWrites() } @Test - public void testCacheIsUsed_ifReady() + public void testReadOperation_usesCache_ifSynced() { - Assume.assumeTrue(useSegmentCache); + Assume.assumeTrue(isCacheEnabled()); Assert.assertTrue(segmentMetadataCache.isSyncedForRead()); @@ -3965,7 +3971,7 @@ public void testCacheIsUsed_ifReady() @Test public void testReadOperation_doesNotUseCache_ifNotSynced() { - Assume.assumeTrue(useSegmentCache); + Assume.assumeTrue(isCacheEnabled()); segmentMetadataCache.stopBeingLeader(); Assert.assertFalse(segmentMetadataCache.isSyncedForRead()); @@ -3996,9 +4002,9 @@ public void testReadOperation_doesNotUseCache_ifNotSynced() } @Test - public void testWriteOperation_usesCache_ifNotSynced() + public void testWriteOperation_alwaysUsesCache_inModeIfSynced() { - Assume.assumeTrue(useSegmentCache); + Assume.assumeTrue(cacheMode == SegmentMetadataCache.UsageMode.IF_SYNCED); // Lose and regain leadership segmentMetadataCache.stopBeingLeader(); From c9038c07de1b5dc3688cee79ae3dc9f07a7d5cf0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 27 Mar 2025 14:42:03 +0530 Subject: [PATCH 09/13] Fix docs and IT configs --- docs/configuration/index.md | 4 ++-- .../cases/cluster/Common/environment-configs/overlord.env | 2 +- integration-tests/docker/environment-configs/overlord | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 687e1d688a67..aa30a584bc47 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1175,8 +1175,8 @@ The following properties pertain to segment metadata caching on the Overlord tha |Property|Description|Default| |--------|-----------|-------| -|`druid.manager.segments.useCache`|If `true`, segment metadata is cached on the Overlord to speed up metadata operations.|false| -|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is `true`.|`PT1M` (1 minute)| +|`druid.manager.segments.useCache`|Denotes the usage mode of the segment metadata cache. Possible modes are: (a) `NEVER`: Cache is disable (b) `ALWAYS`: Reads are always done from the cache. Service start-up may be blocked until cache has synced with the metadata store at least once. Transactions may block until cache has synced with the metadata store at least once after becoming leader. (c) `IF_SYNCED`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`NEVER`| +|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is set to `ALWAYS` or `IF_SYNCED`.|`PT1M` (1 minute)| #### Overlord dynamic configuration diff --git a/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env b/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env index 0d92fe9e09d4..412b62c0105d 100644 --- a/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env +++ b/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env @@ -30,4 +30,4 @@ druid_auth_basic_common_cacheDirectory=/tmp/authCache/overlord druid_server_https_crlPath=/tls/revocations.crl druid_segments_manager_pollDuration=PT5S -druid_segments_manager_useCache=true +druid_segments_manager_useCache=ALWAYS diff --git a/integration-tests/docker/environment-configs/overlord b/integration-tests/docker/environment-configs/overlord index 1f3e7fcb602e..987b42aaab11 100644 --- a/integration-tests/docker/environment-configs/overlord +++ b/integration-tests/docker/environment-configs/overlord @@ -34,4 +34,4 @@ druid_indexer_runner_type=remote druid_auth_basic_common_cacheDirectory=/tmp/authCache/overlord druid_server_https_crlPath=/tls/revocations.crl druid_segments_manager_pollDuration=PT2S -druid_segments_manager_useCache=true +druid_segments_manager_useCache=ALWAYS From 2d17e9ebbc92a3f3d6842bf7f4e72bc3ff56b72b Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 27 Mar 2025 15:40:06 +0530 Subject: [PATCH 10/13] Fix config binding --- .../druid/metadata/SegmentsMetadataManagerConfig.java | 8 ++++---- .../metadata/segment/cache/SegmentMetadataCache.java | 10 +++++++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java index 55a34e842c86..033a703247fa 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java @@ -37,21 +37,21 @@ public class SegmentsMetadataManagerConfig private final Period pollDuration; @JsonProperty - private final SegmentMetadataCache.UsageMode cacheMode; + private final SegmentMetadataCache.UsageMode useCache; @JsonCreator public SegmentsMetadataManagerConfig( @JsonProperty("pollDuration") Period pollDuration, - @JsonProperty("useCache") SegmentMetadataCache.UsageMode cacheMode + @JsonProperty("useCache") SegmentMetadataCache.UsageMode useCache ) { this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); - this.cacheMode = Configs.valueOrDefault(cacheMode, SegmentMetadataCache.UsageMode.NEVER); + this.useCache = Configs.valueOrDefault(useCache, SegmentMetadataCache.UsageMode.NEVER); } public SegmentMetadataCache.UsageMode getCacheMode() { - return cacheMode; + return useCache; } public Period getPollDuration() diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java index 29458cca2b87..c9502f8a3c96 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java @@ -19,6 +19,8 @@ package org.apache.druid.metadata.segment.cache; +import com.fasterxml.jackson.annotation.JsonCreator; + /** * Cache for metadata of pending segments and committed segments maintained by * the Overlord to improve performance of segment allocation and other task actions. @@ -89,6 +91,12 @@ enum UsageMode * Does not block service start-up or transactions. Writes may still go to * cache to reduce sync times. */ - IF_SYNCED + IF_SYNCED; + + @JsonCreator + public static UsageMode fromString(String value) + { + return UsageMode.valueOf(value.toUpperCase()); + } } } From 4513d78b96e422b4fff792d3a3d6d9790d1e15c3 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 27 Mar 2025 15:50:23 +0530 Subject: [PATCH 11/13] Remove forbidden api --- .../druid/metadata/segment/cache/SegmentMetadataCache.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java index c9502f8a3c96..fcb4d366f62e 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java @@ -21,6 +21,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; +import java.util.Locale; + /** * Cache for metadata of pending segments and committed segments maintained by * the Overlord to improve performance of segment allocation and other task actions. @@ -96,7 +98,7 @@ enum UsageMode @JsonCreator public static UsageMode fromString(String value) { - return UsageMode.valueOf(value.toUpperCase()); + return UsageMode.valueOf(value.toUpperCase(Locale.ROOT)); } } } From 6a0bdd2f2ceb2401123a499b6f850e8ee97c862e Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 28 Mar 2025 01:56:39 +0530 Subject: [PATCH 12/13] Fix typos, docs and enum casing --- docs/configuration/index.md | 2 +- .../Common/environment-configs/overlord.env | 4 +-- .../docker/environment-configs/overlord | 4 +-- .../IndexerSQLMetadataStorageCoordinator.java | 5 ++++ .../cache/HeapMemorySegmentMetadataCache.java | 4 +-- .../segment/cache/SegmentMetadataCache.java | 30 +++++++++++++++---- 6 files changed, 36 insertions(+), 13 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index aa30a584bc47..d35b976d9b14 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1175,7 +1175,7 @@ The following properties pertain to segment metadata caching on the Overlord tha |Property|Description|Default| |--------|-----------|-------| -|`druid.manager.segments.useCache`|Denotes the usage mode of the segment metadata cache. Possible modes are: (a) `NEVER`: Cache is disable (b) `ALWAYS`: Reads are always done from the cache. Service start-up may be blocked until cache has synced with the metadata store at least once. Transactions may block until cache has synced with the metadata store at least once after becoming leader. (c) `IF_SYNCED`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`NEVER`| +|`druid.manager.segments.useCache`|Denotes the usage mode of the segment metadata cache. Possible modes are: (a) `never`: Cache is disabled. (b) `always`: Reads are always done from the cache. Service start-up will be blocked until cache has synced with the metadata store at least once. Transactions will block until cache has synced with the metadata store at least once after becoming leader. (c) `ifSynced`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`never`| |`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is set to `ALWAYS` or `IF_SYNCED`.|`PT1M` (1 minute)| #### Overlord dynamic configuration diff --git a/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env b/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env index 412b62c0105d..5e8b6a809cc9 100644 --- a/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env +++ b/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env @@ -29,5 +29,5 @@ druid_indexer_runner_type=remote druid_auth_basic_common_cacheDirectory=/tmp/authCache/overlord druid_server_https_crlPath=/tls/revocations.crl -druid_segments_manager_pollDuration=PT5S -druid_segments_manager_useCache=ALWAYS +druid_manager_segments_pollDuration=PT5S +druid_manager_segments_useCache=always diff --git a/integration-tests/docker/environment-configs/overlord b/integration-tests/docker/environment-configs/overlord index 987b42aaab11..df351c73d278 100644 --- a/integration-tests/docker/environment-configs/overlord +++ b/integration-tests/docker/environment-configs/overlord @@ -33,5 +33,5 @@ druid_indexer_storage_type=metadata druid_indexer_runner_type=remote druid_auth_basic_common_cacheDirectory=/tmp/authCache/overlord druid_server_https_crlPath=/tls/revocations.crl -druid_segments_manager_pollDuration=PT2S -druid_segments_manager_useCache=ALWAYS +druid_manager_segments_pollDuration=PT5S +druid_manager_segments_useCache=always diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 16dec0431448..2f35c9f45d47 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1500,6 +1500,11 @@ private SegmentIdWithShardSpec getTrueAllocatedId( allocatedId.getInterval(), allocatedId.getVersion() ); + log.info( + "Allocated SegmentId[%s] is already in use. Using next ID after max[%s].", + allocatedId.asSegmentId(), unusedMaxId + ); + // No unused segment. Just return the allocated id if (unusedMaxId == null) { return allocatedId; diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java index bebc5d520dd9..d9210b782afd 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java @@ -71,13 +71,13 @@ /** * In-memory implementation of {@link SegmentMetadataCache}. *

- * Only used segments (excluding num_rows and schema_fingerpring) and + * Only used segments (excluding num_rows and schema_fingerprint) and * pending segments are cached. Unused segments are not cached. *

* Non-leader Overlords also keep polling the metadata store to keep the cache * up-to-date in case leadership changes. *

- * Cache usage modes: {@link UsageMode}: + * For cache usage modes, see {@link UsageMode}. *

* The map {@link #datasourceToSegmentCache} contains the cache for each datasource. * Items are only added to this map and never removed. This is to avoid handling diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java index fcb4d366f62e..2fa501c7a123 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java @@ -20,8 +20,7 @@ package org.apache.druid.metadata.segment.cache; import com.fasterxml.jackson.annotation.JsonCreator; - -import java.util.Locale; +import org.apache.druid.error.InvalidInput; /** * Cache for metadata of pending segments and committed segments maintained by @@ -81,24 +80,43 @@ enum UsageMode * until cache has synced with the metadata store at least once after * becoming leader. */ - ALWAYS, + ALWAYS("always"), /** * Cache is disabled. */ - NEVER, + NEVER("never"), /** * Read from the cache only if it is already synced with the metadata store. * Does not block service start-up or transactions. Writes may still go to * cache to reduce sync times. */ - IF_SYNCED; + IF_SYNCED("ifSynced"); + + private final String name; + + UsageMode(String name) + { + this.name = name; + } @JsonCreator public static UsageMode fromString(String value) { - return UsageMode.valueOf(value.toUpperCase(Locale.ROOT)); + for (UsageMode mode : values()) { + if (mode.toString().equals(value)) { + return mode; + } + } + + throw InvalidInput.exception("No such cache usage mode[%s]", value); + } + + @Override + public String toString() + { + return name; } } } From d9c35ca18fa198ba93a0e77eea38fc796418f9ef Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 28 Mar 2025 02:02:59 +0530 Subject: [PATCH 13/13] Fix doc --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index d35b976d9b14..b3f4f38d5ca4 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1176,7 +1176,7 @@ The following properties pertain to segment metadata caching on the Overlord tha |Property|Description|Default| |--------|-----------|-------| |`druid.manager.segments.useCache`|Denotes the usage mode of the segment metadata cache. Possible modes are: (a) `never`: Cache is disabled. (b) `always`: Reads are always done from the cache. Service start-up will be blocked until cache has synced with the metadata store at least once. Transactions will block until cache has synced with the metadata store at least once after becoming leader. (c) `ifSynced`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`never`| -|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is set to `ALWAYS` or `IF_SYNCED`.|`PT1M` (1 minute)| +|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is set to `always` or `ifSynced`.|`PT1M` (1 minute)| #### Overlord dynamic configuration