diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 687e1d688a67..b3f4f38d5ca4 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1175,8 +1175,8 @@ The following properties pertain to segment metadata caching on the Overlord tha |Property|Description|Default| |--------|-----------|-------| -|`druid.manager.segments.useCache`|If `true`, segment metadata is cached on the Overlord to speed up metadata operations.|false| -|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is `true`.|`PT1M` (1 minute)| +|`druid.manager.segments.useCache`|Denotes the usage mode of the segment metadata cache. Possible modes are: (a) `never`: Cache is disabled. (b) `always`: Reads are always done from the cache. Service start-up will be blocked until cache has synced with the metadata store at least once. Transactions will block until cache has synced with the metadata store at least once after becoming leader. (c) `ifSynced`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`never`| +|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useCache` is set to `always` or `ifSynced`.|`PT1M` (1 minute)| #### Overlord dynamic configuration diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index 6c39d21c1f59..444eb7cb61b9 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -56,6 +56,7 @@ import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.segment.realtime.ChatHandlerProvider; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -113,7 +114,8 @@ public void setUp() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), objectMapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java index 999a38ffd3a0..606c6a529587 100644 --- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java +++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java @@ -120,7 +120,8 @@ public void setUp() throws Exception derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), jsonMapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 5800e3b1ec15..8ef97352921f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -99,7 +99,6 @@ private static DataSegment createSegment(Interval interval, String version) @Test public void testRetrieveUsedSegmentsAction() { - actionTestKit.syncSegmentMetadataCache(); final RetrieveUsedSegmentsAction action = new RetrieveUsedSegmentsAction(task.getDataSource(), ImmutableList.of(INTERVAL)); final Set observedUsedSegments = new HashSet<>(action.perform(task, actionTestKit.getTaskActionToolbox())); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 352b062fe947..ed30db142a95 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -66,7 +66,7 @@ public class TaskActionTestKit extends ExternalResource private SegmentMetadataCache segmentMetadataCache; private BlockingExecutorService metadataCachePollExec; - private boolean useSegmentMetadataCache = new SegmentsMetadataManagerConfig(null, null).isUseCache(); + private boolean useSegmentMetadataCache = false; private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isBatchAllocationReduceMetadataIO(); public TaskLockbox getTaskLockbox() @@ -182,13 +182,17 @@ public boolean isBatchAllocationReduceMetadataIO() private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMapper objectMapper) { metadataCachePollExec = new BlockingExecutorService("test-cache-poll-exec"); + SegmentMetadataCache.UsageMode cacheMode + = useSegmentMetadataCache + ? SegmentMetadataCache.UsageMode.ALWAYS + : SegmentMetadataCache.UsageMode.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), useSegmentMetadataCache)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.seconds(1), cacheMode)), Suppliers.ofInstance(metadataStorageTablesConfig), testDerbyConnector, (poolSize, name) -> new WrappingScheduledExecutorService(name, metadataCachePollExec, false), - new NoopServiceEmitter() + NoopServiceEmitter.instance() ); final TestDruidLeaderSelector leaderSelector = new TestDruidLeaderSelector(); @@ -199,7 +203,8 @@ private SqlSegmentMetadataTransactionFactory setupTransactionFactory(ObjectMappe metadataStorageTablesConfig, testDerbyConnector, leaderSelector, - segmentMetadataCache + segmentMetadataCache, + NoopServiceEmitter.instance() ) { @Override @@ -222,6 +227,6 @@ public void after() taskActionToolbox = null; segmentMetadataCache.stopBeingLeader(); segmentMetadataCache.stop(); - useSegmentMetadataCache = new SegmentsMetadataManagerConfig(null, null).isUseCache(); + useSegmentMetadataCache = false; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 57a918cd9e50..6e7b30b4be27 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -315,9 +315,13 @@ public TaskToolbox createTaskToolbox(TaskConfig config, Task task, SupervisorMan private SqlSegmentMetadataTransactionFactory createTransactionFactory() { + final SegmentMetadataCache.UsageMode cacheMode + = useSegmentMetadataCache + ? SegmentMetadataCache.UsageMode.ALWAYS + : SegmentMetadataCache.UsageMode.NEVER; segmentMetadataCache = new HeapMemorySegmentMetadataCache( objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), useSegmentMetadataCache)), + Suppliers.ofInstance(new SegmentsMetadataManagerConfig(Period.millis(10), cacheMode)), derbyConnectorRule.metadataTablesConfigSupplier(), derbyConnectorRule.getConnector(), ScheduledExecutors::fixed, @@ -332,7 +336,8 @@ private SqlSegmentMetadataTransactionFactory createTransactionFactory() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnectorRule.getConnector(), leaderSelector, - segmentMetadataCache + segmentMetadataCache, + NoopServiceEmitter.instance() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java index 6436ad3d35ca..a55117aad12d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java @@ -38,6 +38,7 @@ import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; @@ -88,7 +89,8 @@ public void setup() derby.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), objectMapper, derby.metadataTablesConfigSupplier().get(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 8e72d21fda81..48c5b5c4e94a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -63,6 +63,7 @@ import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.NumberedOverwritePartialShardSpec; @@ -136,7 +137,8 @@ public void setup() tablesConfig, derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), objectMapper, tablesConfig, @@ -477,7 +479,8 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() derby.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), loadedMapper, derby.metadataTablesConfigSupplier().get(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index 80026f137269..f54ea7fe5860 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -111,7 +111,8 @@ public void setUp() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnectorRule.getConnector(), new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), jsonMapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 214c024b4548..9e82196f8117 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -123,6 +123,7 @@ import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.security.AuthTestUtils; import org.apache.druid.timeline.DataSegment; import org.apache.druid.utils.CompressionUtils; @@ -595,7 +596,8 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b derby.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ), objectMapper, derby.metadataTablesConfigSupplier().get(), diff --git a/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env b/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env index 0d92fe9e09d4..5e8b6a809cc9 100644 --- a/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env +++ b/integration-tests-ex/cases/cluster/Common/environment-configs/overlord.env @@ -29,5 +29,5 @@ druid_indexer_runner_type=remote druid_auth_basic_common_cacheDirectory=/tmp/authCache/overlord druid_server_https_crlPath=/tls/revocations.crl -druid_segments_manager_pollDuration=PT5S -druid_segments_manager_useCache=true +druid_manager_segments_pollDuration=PT5S +druid_manager_segments_useCache=always diff --git a/integration-tests/docker/environment-configs/overlord b/integration-tests/docker/environment-configs/overlord index 1f3e7fcb602e..df351c73d278 100644 --- a/integration-tests/docker/environment-configs/overlord +++ b/integration-tests/docker/environment-configs/overlord @@ -33,5 +33,5 @@ druid_indexer_storage_type=metadata druid_indexer_runner_type=remote druid_auth_basic_common_cacheDirectory=/tmp/authCache/overlord druid_server_https_crlPath=/tls/revocations.crl -druid_segments_manager_pollDuration=PT2S -druid_segments_manager_useCache=true +druid_manager_segments_pollDuration=PT5S +druid_manager_segments_useCache=always diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index b4682e2be5a3..2f35c9f45d47 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -174,7 +174,7 @@ private Set doRetrieveUsedSegments( final Segments visibility ) { - return inReadWriteDatasourceTransaction( + return inReadOnlyDatasourceTransaction( dataSource, transaction -> { if (visibility == Segments.ONLY_VISIBLE) { @@ -283,7 +283,7 @@ public int markAllSegmentsAsUnused(String dataSource) } private SegmentTimeline getTimelineForIntervals( - final SegmentMetadataTransaction transaction, + final SegmentMetadataReadTransaction transaction, final List intervals ) { @@ -1500,6 +1500,11 @@ private SegmentIdWithShardSpec getTrueAllocatedId( allocatedId.getInterval(), allocatedId.getVersion() ); + log.info( + "Allocated SegmentId[%s] is already in use. Using next ID after max[%s].", + allocatedId.asSegmentId(), unusedMaxId + ); + // No unused segment. Just return the allocated id if (unusedMaxId == null) { return allocatedId; diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java index fc65de4af532..033a703247fa 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.common.config.Configs; +import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; import org.joda.time.Period; /** @@ -36,19 +37,19 @@ public class SegmentsMetadataManagerConfig private final Period pollDuration; @JsonProperty - private final boolean useCache; + private final SegmentMetadataCache.UsageMode useCache; @JsonCreator public SegmentsMetadataManagerConfig( @JsonProperty("pollDuration") Period pollDuration, - @JsonProperty("useCache") Boolean useCache + @JsonProperty("useCache") SegmentMetadataCache.UsageMode useCache ) { this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); - this.useCache = Configs.valueOrDefault(useCache, false); + this.useCache = Configs.valueOrDefault(useCache, SegmentMetadataCache.UsageMode.NEVER); } - public boolean isUseCache() + public SegmentMetadataCache.UsageMode getCacheMode() { return useCache; } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java index b6f21b301f09..3d19d7c7365e 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/CachedSegmentMetadataTransaction.java @@ -60,21 +60,34 @@ class CachedSegmentMetadataTransaction implements SegmentMetadataTransaction private final DruidLeaderSelector leaderSelector; private final int startTerm; + private final boolean readFromCache; private boolean isRollingBack = false; private boolean isClosed = false; private final List> pendingCacheWrites = new ArrayList<>(); + /** + * Creates a transaction that may read or write from the cache or delegate to + * the metadata store if necessary. Writes are always done to the cache unless + * the update does not affect the cache contents. + * + * @param leaderSelector Used to ensure that leadership does not change during + * the course of the transaction. + * @param readFromCache If true, reads are done from cache. Otherwise, + * reads are done directly from the metadata store. + */ CachedSegmentMetadataTransaction( SegmentMetadataTransaction delegate, DatasourceSegmentCache metadataCache, - DruidLeaderSelector leaderSelector + DruidLeaderSelector leaderSelector, + boolean readFromCache ) { this.delegate = delegate; this.metadataCache = metadataCache; this.leaderSelector = leaderSelector; + this.readFromCache = readFromCache; if (leaderSelector.isLeader()) { this.startTerm = leaderSelector.localTerm(); @@ -147,11 +160,16 @@ public Set findExistingSegmentIds(Set segmentIds) final Set remainingIdsToFind = new HashSet<>(segmentIds); // Try to find IDs in cache - final Set foundIds = metadataCache.findUsedSegments(remainingIdsToFind) - .stream() - .map(segment -> segment.getDataSegment().getId().toString()) - .collect(Collectors.toCollection(HashSet::new)); - remainingIdsToFind.removeIf(id -> foundIds.contains(id.toString())); + final Set foundIds = new HashSet<>(); + if (readFromCache) { + foundIds.addAll( + metadataCache.findUsedSegments(remainingIdsToFind) + .stream() + .map(segment -> segment.getDataSegment().getId().toString()) + .collect(Collectors.toCollection(HashSet::new)) + ); + remainingIdsToFind.removeIf(id -> foundIds.contains(id.toString())); + } // Find remaining IDs in metadata store if (!remainingIdsToFind.isEmpty()) { @@ -166,7 +184,7 @@ public Set findExistingSegmentIds(Set segmentIds) @Override public Set findUsedSegmentIdsOverlapping(Interval interval) { - return metadataCache.findUsedSegmentIdsOverlapping(interval); + return performReadAction(reader -> reader.findUsedSegmentIdsOverlapping(interval)); } @Override @@ -182,15 +200,20 @@ public List findSegments(Set segmentIds) final Set remainingIdsToFind = new HashSet<>(segmentIds); // Try to find segments in cache - final List foundSegments = new ArrayList<>( - metadataCache.findUsedSegments(remainingIdsToFind) - ); - foundSegments.forEach(segment -> remainingIdsToFind.remove(segment.getDataSegment().getId())); + final List foundSegments = new ArrayList<>(); + if (readFromCache) { + foundSegments.addAll( + metadataCache.findUsedSegments(remainingIdsToFind) + ); + foundSegments.forEach(segment -> remainingIdsToFind.remove(segment.getDataSegment().getId())); + } // Find remaining segments in metadata store - foundSegments.addAll( - delegate.findSegments(remainingIdsToFind) - ); + if (!remainingIdsToFind.isEmpty()) { + foundSegments.addAll( + delegate.findSegments(remainingIdsToFind) + ); + } return List.copyOf(foundSegments); } @@ -205,19 +228,19 @@ public List findSegmentsWithSchema(Set segmentIds) @Override public Set findUsedSegmentsOverlappingAnyOf(List intervals) { - return metadataCache.findUsedSegmentsOverlappingAnyOf(intervals); + return performReadAction(reader -> reader.findUsedSegmentsOverlappingAnyOf(intervals)); } @Override public List findUsedSegments(Set segmentIds) { - return metadataCache.findUsedSegments(segmentIds); + return performReadAction(reader -> reader.findUsedSegments(segmentIds)); } @Override public Set findUsedSegmentsPlusOverlappingAnyOf(List intervals) { - return metadataCache.findUsedSegmentsPlusOverlappingAnyOf(intervals); + return performReadAction(reader -> reader.findUsedSegmentsPlusOverlappingAnyOf(intervals)); } @Override @@ -248,7 +271,7 @@ public DataSegment findSegment(SegmentId segmentId) @Override public DataSegment findUsedSegment(SegmentId segmentId) { - return metadataCache.findUsedSegment(segmentId); + return performReadAction(reader -> reader.findUsedSegment(segmentId)); } @Override @@ -257,7 +280,7 @@ public List findPendingSegmentIds( String sequencePreviousId ) { - return metadataCache.findPendingSegmentIds(sequenceName, sequencePreviousId); + return performReadAction(reader -> reader.findPendingSegmentIds(sequenceName, sequencePreviousId)); } @Override @@ -266,25 +289,25 @@ public List findPendingSegmentIdsWithExactInterval( Interval interval ) { - return metadataCache.findPendingSegmentIdsWithExactInterval(sequenceName, interval); + return performReadAction(reader -> reader.findPendingSegmentIdsWithExactInterval(sequenceName, interval)); } @Override public List findPendingSegmentsOverlapping(Interval interval) { - return metadataCache.findPendingSegmentsOverlapping(interval); + return performReadAction(reader -> reader.findPendingSegmentsOverlapping(interval)); } @Override public List findPendingSegmentsWithExactInterval(Interval interval) { - return metadataCache.findPendingSegmentsWithExactInterval(interval); + return performReadAction(reader -> reader.findPendingSegmentsWithExactInterval(interval)); } @Override public List findPendingSegments(String taskAllocatorId) { - return metadataCache.findPendingSegments(taskAllocatorId); + return performReadAction(reader -> reader.findPendingSegments(taskAllocatorId)); } // WRITE METHODS @@ -404,6 +427,19 @@ public int deletePendingSegmentsCreatedIn(Interval interval) ); } + /** + * Performs a read from cache only if {@link #readFromCache} is true. + * Otherwise, reads directly from the metadata store. + */ + private T performReadAction(Function action) + { + if (readFromCache) { + return action.apply(metadataCache); + } else { + return action.apply(delegate); + } + } + private T performWriteAction(Function action) { if (isClosed) { diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java index 0d8b707bded5..71974872a237 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java @@ -23,10 +23,15 @@ import com.google.inject.Inject; import org.apache.druid.client.indexing.IndexingService; import org.apache.druid.discovery.DruidLeaderSelector; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; import org.apache.druid.metadata.segment.cache.DatasourceSegmentCache; +import org.apache.druid.metadata.segment.cache.Metric; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; +import org.apache.druid.query.DruidMetrics; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.TransactionStatus; @@ -40,6 +45,8 @@ */ public class SqlSegmentMetadataTransactionFactory implements SegmentMetadataTransactionFactory { + private static final Logger log = new Logger(SqlSegmentMetadataTransactionFactory.class); + private static final int QUIET_RETRIES = 2; private static final int MAX_RETRIES = 3; @@ -48,6 +55,7 @@ public class SqlSegmentMetadataTransactionFactory implements SegmentMetadataTran private final SQLMetadataConnector connector; private final DruidLeaderSelector leaderSelector; private final SegmentMetadataCache segmentMetadataCache; + private final ServiceEmitter emitter; @Inject public SqlSegmentMetadataTransactionFactory( @@ -55,7 +63,8 @@ public SqlSegmentMetadataTransactionFactory( MetadataStorageTablesConfig tablesConfig, SQLMetadataConnector connector, @IndexingService DruidLeaderSelector leaderSelector, - SegmentMetadataCache segmentMetadataCache + SegmentMetadataCache segmentMetadataCache, + ServiceEmitter emitter ) { this.jsonMapper = jsonMapper; @@ -63,6 +72,7 @@ public SqlSegmentMetadataTransactionFactory( this.connector = connector; this.leaderSelector = leaderSelector; this.segmentMetadataCache = segmentMetadataCache; + this.emitter = emitter; } public int getMaxRetries() @@ -81,12 +91,14 @@ public T inReadOnlyDatasourceTransaction( final SegmentMetadataTransaction sqlTransaction = createSqlTransaction(dataSource, handle, status); - if (segmentMetadataCache.isEnabled()) { + // For read-only transactions, use cache only if it is already synced + if (segmentMetadataCache.isSyncedForRead()) { final DatasourceSegmentCache datasourceCache = segmentMetadataCache.getDatasource(dataSource); final SegmentMetadataReadTransaction cachedTransaction - = new CachedSegmentMetadataTransaction(sqlTransaction, datasourceCache, leaderSelector); + = new CachedSegmentMetadataTransaction(sqlTransaction, datasourceCache, leaderSelector, true); + emitTransactionCount(Metric.READ_ONLY_TRANSACTIONS, dataSource); return datasourceCache.read(() -> executeReadAndClose(cachedTransaction, callback)); } else { return executeReadAndClose(sqlTransaction, callback); @@ -109,10 +121,26 @@ public T inReadWriteDatasourceTransaction( = createSqlTransaction(dataSource, handle, status); if (segmentMetadataCache.isEnabled()) { + final boolean isCacheReadyForRead = segmentMetadataCache.isSyncedForRead(); final DatasourceSegmentCache datasourceCache = segmentMetadataCache.getDatasource(dataSource); - final SegmentMetadataTransaction cachedTransaction - = new CachedSegmentMetadataTransaction(sqlTransaction, datasourceCache, leaderSelector); + final SegmentMetadataTransaction cachedTransaction = new CachedSegmentMetadataTransaction( + sqlTransaction, + datasourceCache, + leaderSelector, + isCacheReadyForRead + ); + + if (isCacheReadyForRead) { + emitTransactionCount(Metric.READ_WRITE_TRANSACTIONS, dataSource); + } else { + log.warn( + "Starting read-write transaction for datasource[%s]. Reads will be done" + + " directly from metadata store since cache is not synced yet.", + dataSource + ); + emitTransactionCount(Metric.WRITE_ONLY_TRANSACTIONS, dataSource); + } return datasourceCache.write(() -> executeWriteAndClose(cachedTransaction, callback)); } else { @@ -163,4 +191,13 @@ private T executeReadAndClose( } } + private void emitTransactionCount(String metricName, String datasource) + { + emitter.emit( + ServiceMetricEvent.builder() + .setDimension(DruidMetrics.DATASOURCE, datasource) + .setMetric(metricName, 1L) + ); + } + } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java index b51f1e026a0a..d9210b782afd 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java @@ -64,18 +64,21 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** * In-memory implementation of {@link SegmentMetadataCache}. *

- * Only used segments (excluding num_rows and schema_fingerpring) and + * Only used segments (excluding num_rows and schema_fingerprint) and * pending segments are cached. Unused segments are not cached. *

* Non-leader Overlords also keep polling the metadata store to keep the cache * up-to-date in case leadership changes. *

+ * For cache usage modes, see {@link UsageMode}. + *

* The map {@link #datasourceToSegmentCache} contains the cache for each datasource. * Items are only added to this map and never removed. This is to avoid handling * race conditions where a thread has invoked {@link #getDatasource} but hasn't @@ -105,7 +108,7 @@ private enum CacheState private final ObjectMapper jsonMapper; private final Duration pollDuration; - private final boolean isCacheEnabled; + private final UsageMode cacheMode; private final MetadataStorageTablesConfig tablesConfig; private final SQLMetadataConnector connector; @@ -114,6 +117,13 @@ private enum CacheState private final Object cacheStateLock = new Object(); + /** + * Denotes that the cache is in state {@link CacheState#LEADER_READY}. + * Maintained as a separate variable to avoid acquiring the {@link #cacheStateLock} + * whenever {@link #isSyncedForRead()} is checked in a transaction. + */ + private final AtomicBoolean isCacheReady = new AtomicBoolean(false); + @GuardedBy("cacheStateLock") private CacheState currentCacheState = CacheState.STOPPED; @GuardedBy("cacheStateLock") @@ -137,11 +147,11 @@ public HeapMemorySegmentMetadataCache( ) { this.jsonMapper = jsonMapper; - this.isCacheEnabled = config.get().isUseCache(); + this.cacheMode = config.get().getCacheMode(); this.pollDuration = config.get().getPollDuration().toStandardDuration(); this.tablesConfig = tablesConfig.get(); this.connector = connector; - this.pollExecutor = isCacheEnabled + this.pollExecutor = isEnabled() ? MoreExecutors.listeningDecorator(executorFactory.create(1, "SegmentMetadataCache-%s")) : null; this.emitter = emitter; @@ -151,7 +161,7 @@ public HeapMemorySegmentMetadataCache( @Override public void start() { - if (!isCacheEnabled) { + if (!isEnabled()) { log.info("Segment metadata cache is not enabled."); return; } @@ -159,8 +169,28 @@ public void start() synchronized (cacheStateLock) { if (currentCacheState == CacheState.STOPPED) { updateCacheState(CacheState.FOLLOWER, "Scheduling sync with metadata store"); - scheduleSyncWithMetadataStore(pollDuration.getMillis()); } + + if (cacheMode == UsageMode.ALWAYS) { + // Cache must always be used, do not finish startup until cache has synced + performFirstSync(); + } + + scheduleSyncWithMetadataStore(pollDuration.getMillis()); + } + } + + private void performFirstSync() + { + try { + log.info("Cache is in usage mode[%s]. Starting first sync with metadata store.", cacheMode); + + final long syncDurationMillis = syncWithMetadataStore(); + emitMetric(Metric.SYNC_DURATION_MILLIS, syncDurationMillis); + log.info("Finished first sync of cache with metadata store in [%d] millis.", syncDurationMillis); + } + catch (Throwable t) { + throw InternalServerError.exception(t, "Could not sync segment metadata cache with metadata store"); } } @@ -168,7 +198,7 @@ public void start() public void stop() { synchronized (cacheStateLock) { - if (isCacheEnabled) { + if (isEnabled()) { pollExecutor.shutdownNow(); datasourceToSegmentCache.forEach((datasource, cache) -> cache.stop()); datasourceToSegmentCache.clear(); @@ -183,7 +213,7 @@ public void stop() public void becomeLeader() { synchronized (cacheStateLock) { - if (isCacheEnabled) { + if (isEnabled()) { if (currentCacheState == CacheState.STOPPED) { throw DruidException.defensive("Cache has not been started yet"); } else { @@ -202,7 +232,7 @@ public void becomeLeader() public void stopBeingLeader() { synchronized (cacheStateLock) { - if (isCacheEnabled) { + if (isEnabled()) { updateCacheState(CacheState.FOLLOWER, "Not leader anymore"); } } @@ -211,14 +241,19 @@ public void stopBeingLeader() @Override public boolean isEnabled() { - return isCacheEnabled; + return cacheMode != UsageMode.NEVER; + } + + @Override + public boolean isSyncedForRead() + { + return isEnabled() && isCacheReady.get(); } @Override public DatasourceSegmentCache getDatasource(String dataSource) { verifyCacheIsUsableAndAwaitSync(); - emitMetric(dataSource, Metric.TRANSACTION_COUNT, 1); return getCacheForDatasource(dataSource); } @@ -229,13 +264,14 @@ private HeapMemoryDatasourceSegmentCache getCacheForDatasource(String dataSource /** * Verifies that the cache is enabled, started and has become leader. - * Also waits for the cache to be synced with metadata store. + * Also waits for the cache to be synced with the metadata store after becoming + * leader if {@link #cacheMode} is set to {@link UsageMode#ALWAYS}. * * @throws DruidException if the cache is disabled, stopped or not leader. */ private void verifyCacheIsUsableAndAwaitSync() { - if (!isCacheEnabled) { + if (!isEnabled()) { throw DruidException.defensive("Segment metadata cache is not enabled."); } @@ -247,8 +283,10 @@ private void verifyCacheIsUsableAndAwaitSync() throw InternalServerError.exception("Not leader yet. Segment metadata cache is not usable."); case LEADER_FIRST_SYNC_PENDING: case LEADER_FIRST_SYNC_STARTED: - waitForCacheToFinishSync(); - verifyCacheIsUsableAndAwaitSync(); + if (cacheMode == UsageMode.ALWAYS) { + waitForCacheToFinishSync(); + verifyCacheIsUsableAndAwaitSync(); + } case LEADER_READY: // Cache is now ready for use } @@ -287,10 +325,10 @@ private void updateCacheState(CacheState targetState, String message) currentCacheState = targetState; log.info("%s. Cache is now in state[%s].", message, currentCacheState); - // Notify threads waiting for cache to be ready - if (currentCacheState == CacheState.LEADER_READY) { - cacheStateLock.notifyAll(); - } + isCacheReady.set(currentCacheState == CacheState.LEADER_READY); + + // Notify threads waiting for cache to change state + cacheStateLock.notifyAll(); } } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java index 7e5e5485c5d8..ea22a5fb7116 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java @@ -43,9 +43,19 @@ private Metric() private static final String METRIC_NAME_PREFIX = "segment/metadataCache/"; /** - * Number of transactions performed on the cache for a datasource. + * Number of read-write transactions performed on the cache for a datasource. */ - public static final String TRANSACTION_COUNT = METRIC_NAME_PREFIX + "transactions"; + public static final String READ_WRITE_TRANSACTIONS = METRIC_NAME_PREFIX + "transactions/readWrite"; + + /** + * Number of write-only transactions performed on the cache for a datasource. + */ + public static final String WRITE_ONLY_TRANSACTIONS = METRIC_NAME_PREFIX + "transactions/writeOnly"; + + /** + * Number of read-only transactions performed on the cache for a datasource. + */ + public static final String READ_ONLY_TRANSACTIONS = METRIC_NAME_PREFIX + "transactions/readOnly"; /** * Total number of distinct intervals currently present in the cache. diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java index 080464b44605..88e2cf05e3b6 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java @@ -21,6 +21,13 @@ public class NoopSegmentMetadataCache implements SegmentMetadataCache { + private static final NoopSegmentMetadataCache INSTANCE = new NoopSegmentMetadataCache(); + + public static NoopSegmentMetadataCache instance() + { + return INSTANCE; + } + @Override public void start() { @@ -51,6 +58,12 @@ public boolean isEnabled() return false; } + @Override + public boolean isSyncedForRead() + { + return false; + } + @Override public DatasourceSegmentCache getDatasource(String dataSource) { diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java index af0b17f696f2..2fa501c7a123 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java @@ -19,6 +19,9 @@ package org.apache.druid.metadata.segment.cache; +import com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.druid.error.InvalidInput; + /** * Cache for metadata of pending segments and committed segments maintained by * the Overlord to improve performance of segment allocation and other task actions. @@ -54,9 +57,66 @@ public interface SegmentMetadataCache */ boolean isEnabled(); + /** + * @return true if the cache is enabled and synced with the metadata store. + * Reads can be done from the cache only if it is synced but writes can happen + * even before the sync has finished. + */ + boolean isSyncedForRead(); + /** * Returns the cache for the given datasource. */ DatasourceSegmentCache getDatasource(String dataSource); + /** + * Cache usage modes. + */ + enum UsageMode + { + /** + * Always read from the cache. Service start-up may be blocked until cache + * has synced with the metadata store at least once. Transactions may block + * until cache has synced with the metadata store at least once after + * becoming leader. + */ + ALWAYS("always"), + + /** + * Cache is disabled. + */ + NEVER("never"), + + /** + * Read from the cache only if it is already synced with the metadata store. + * Does not block service start-up or transactions. Writes may still go to + * cache to reduce sync times. + */ + IF_SYNCED("ifSynced"); + + private final String name; + + UsageMode(String name) + { + this.name = name; + } + + @JsonCreator + public static UsageMode fromString(String value) + { + for (UsageMode mode : values()) { + if (mode.toString().equals(value)) { + return mode; + } + } + + throw InvalidInput.exception("No such cache usage mode[%s]", value); + } + + @Override + public String toString() + { + return name; + } + } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 71e92fc31787..6671fe4e1332 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -37,7 +37,7 @@ import org.apache.druid.metadata.segment.SegmentMetadataTransaction; import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory; import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache; -import org.apache.druid.metadata.segment.cache.NoopSegmentMetadataCache; +import org.apache.druid.metadata.segment.cache.Metric; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.TestDataSource; @@ -90,6 +90,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import java.util.stream.Collectors; @RunWith(Parameterized.class) @@ -104,17 +105,21 @@ public class IndexerSQLMetadataStorageCoordinatorTest extends IndexerSqlMetadata private SqlSegmentMetadataTransactionFactory transactionFactory; private BlockingExecutorService cachePollExecutor; - private final boolean useSegmentCache; + private final SegmentMetadataCache.UsageMode cacheMode; - @Parameterized.Parameters(name = "useSegmentCache = {0}") + @Parameterized.Parameters(name = "cacheMode = {0}") public static Object[][] testParameters() { - return new Object[][]{{true}, {false}}; + return new Object[][]{ + {SegmentMetadataCache.UsageMode.ALWAYS}, + {SegmentMetadataCache.UsageMode.NEVER}, + {SegmentMetadataCache.UsageMode.IF_SYNCED} + }; } - public IndexerSQLMetadataStorageCoordinatorTest(boolean useSegmentCache) + public IndexerSQLMetadataStorageCoordinatorTest(SegmentMetadataCache.UsageMode cacheMode) { - this.useSegmentCache = useSegmentCache; + this.cacheMode = cacheMode; } @Before @@ -139,9 +144,10 @@ public void setUp() leaderSelector = new TestDruidLeaderSelector(); cachePollExecutor = new BlockingExecutorService("test-cache-poll-exec"); + segmentMetadataCache = new HeapMemorySegmentMetadataCache( mapper, - () -> new SegmentsMetadataManagerConfig(null, useSegmentCache), + () -> new SegmentsMetadataManagerConfig(null, cacheMode), derbyConnectorRule.metadataTablesConfigSupplier(), derbyConnector, (corePoolSize, nameFormat) -> new WrappingScheduledExecutorService( @@ -155,10 +161,11 @@ public void setUp() leaderSelector.becomeLeader(); // Get the cache ready if required - if (useSegmentCache) { + if (isCacheEnabled()) { segmentMetadataCache.start(); segmentMetadataCache.becomeLeader(); - cachePollExecutor.finishNextPendingTasks(4); + refreshCache(); + refreshCache(); } transactionFactory = new SqlSegmentMetadataTransactionFactory( @@ -166,7 +173,8 @@ public void setUp() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, leaderSelector, - segmentMetadataCache + segmentMetadataCache, + emitter ) { @Override @@ -207,12 +215,17 @@ public void tearDown() leaderSelector.stopBeingLeader(); } - void refreshCache() + private void refreshCache() { - if (useSegmentCache) { + if (isCacheEnabled()) { cachePollExecutor.finishNextPendingTasks(2); } } + + private boolean isCacheEnabled() + { + return cacheMode != SegmentMetadataCache.UsageMode.NEVER; + } @Test public void testCommitAppendSegments() @@ -764,13 +777,7 @@ public void testTransactionalAnnounceRetryAndSuccess() throws IOException final AtomicLong attemptCounter = new AtomicLong(); final IndexerSQLMetadataStorageCoordinator failOnceCoordinator = new IndexerSQLMetadataStorageCoordinator( - new SqlSegmentMetadataTransactionFactory( - mapper, - derbyConnectorRule.metadataTablesConfigSupplier().get(), - derbyConnector, - new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() - ), + transactionFactory, mapper, derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, @@ -936,6 +943,8 @@ public void testRetrieveSegmentForId() @Test public void testCleanUpgradeSegmentsTableForTask() { + Assume.assumeFalse(isCacheEnabled()); + final String taskToClean = "taskToClean"; final ReplaceTaskLock replaceLockToClean = new ReplaceTaskLock( taskToClean, @@ -2179,7 +2188,7 @@ public void testSecondHalfEternitySegmentWithStringComparison() public void testLargeIntervalWithStringComparison() { // Known Issue when not using cache: https://github.com/apache/druid/issues/12860 - Assume.assumeTrue(useSegmentCache); + Assume.assumeTrue(isCacheEnabled()); coordinator.commitSegments( ImmutableSet.of( @@ -2280,7 +2289,7 @@ public void testDeleteSegmentsInMetaDataStorage() @Test public void testUpdateSegmentsInMetaDataStorage() { - Assume.assumeFalse(useSegmentCache); + Assume.assumeFalse(isCacheEnabled()); // Published segments to MetaDataStorage coordinator.commitSegments(SEGMENTS, new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION)); @@ -3711,6 +3720,8 @@ public void testRetrieveUpgradedFromSegmentIds() @Test public void testRetrieveUpgradedFromSegmentIdsInBatches() { + Assume.assumeFalse(isCacheEnabled()); + final int size = 500; final int batchSize = 100; @@ -3909,7 +3920,7 @@ public void testRetrieveUsedSegmentsForSegmentAllocation() @Test public void testCachedTransaction_cannotReadWhatItWrites() { - Assume.assumeTrue(useSegmentCache); + Assume.assumeTrue(isCacheEnabled()); transactionFactory.inReadWriteDatasourceTransaction( TestDataSource.WIKI, @@ -3930,6 +3941,92 @@ public void testCachedTransaction_cannotReadWhatItWrites() return 0; } ); + + emitter.verifyValue(Metric.READ_WRITE_TRANSACTIONS, 1L); + } + + @Test + public void testReadOperation_usesCache_ifSynced() + { + Assume.assumeTrue(isCacheEnabled()); + + Assert.assertTrue(segmentMetadataCache.isSyncedForRead()); + + insertUsedSegments(Set.of(defaultSegment), Map.of()); + final Supplier> retrieveAction = + () -> coordinator.retrieveAllUsedSegments( + defaultSegment.getDataSource(), + Segments.INCLUDING_OVERSHADOWED + ); + + // Retrieve returns empty since cache is not synced with metadata store yet + Assert.assertTrue(retrieveAction.get().isEmpty()); + + refreshCache(); + Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); + + emitter.verifyEmitted(Metric.READ_ONLY_TRANSACTIONS, 2); + } + + @Test + public void testReadOperation_doesNotUseCache_ifNotSynced() + { + Assume.assumeTrue(isCacheEnabled()); + + segmentMetadataCache.stopBeingLeader(); + Assert.assertFalse(segmentMetadataCache.isSyncedForRead()); + + final Supplier> retrieveAction = + () -> coordinator.retrieveAllUsedSegments( + defaultSegment.getDataSource(), + Segments.INCLUDING_OVERSHADOWED + ); + + insertUsedSegments(Set.of(defaultSegment), Map.of()); + + Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); + emitter.verifyNotEmitted(Metric.READ_ONLY_TRANSACTIONS); + + // Become leader but cache will still not be used + segmentMetadataCache.becomeLeader(); + Assert.assertFalse(segmentMetadataCache.isSyncedForRead()); + Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); + emitter.verifyNotEmitted(Metric.READ_ONLY_TRANSACTIONS); + + // Sync the cache so that it becomes ready for use + refreshCache(); + refreshCache(); + Assert.assertTrue(segmentMetadataCache.isSyncedForRead()); + Assert.assertEquals(Set.of(defaultSegment), retrieveAction.get()); + emitter.verifyValue(Metric.READ_ONLY_TRANSACTIONS, 1L); + } + + @Test + public void testWriteOperation_alwaysUsesCache_inModeIfSynced() + { + Assume.assumeTrue(cacheMode == SegmentMetadataCache.UsageMode.IF_SYNCED); + + // Lose and regain leadership + segmentMetadataCache.stopBeingLeader(); + segmentMetadataCache.becomeLeader(); + + Assert.assertTrue(segmentMetadataCache.isEnabled()); + Assert.assertFalse(segmentMetadataCache.isSyncedForRead()); + + final Supplier> writeAction = + () -> coordinator.commitSegments(Set.of(defaultSegment), null); + + // Cache is not synced yet and will be used only for write operations + Assert.assertEquals(Set.of(defaultSegment), writeAction.get()); + emitter.verifyValue(Metric.WRITE_ONLY_TRANSACTIONS, 1L); + + // Sync the cache to use it for both read and write operations + refreshCache(); + refreshCache(); + Assert.assertTrue(segmentMetadataCache.isSyncedForRead()); + + Assert.assertTrue(writeAction.get().isEmpty()); + emitter.verifyValue(Metric.READ_WRITE_TRANSACTIONS, 1L); } private SegmentIdWithShardSpec allocatePendingSegment( diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java index c72e1a75afee..c075b7ae8902 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java @@ -41,6 +41,7 @@ import org.apache.druid.segment.metadata.SegmentSchemaManager; import org.apache.druid.segment.metadata.SegmentSchemaTestUtils; import org.apache.druid.server.coordinator.simulate.TestDruidLeaderSelector; +import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -96,7 +97,8 @@ public void setUp() derbyConnectorRule.metadataTablesConfigSupplier().get(), derbyConnector, new TestDruidLeaderSelector(), - new NoopSegmentMetadataCache() + NoopSegmentMetadataCache.instance(), + NoopServiceEmitter.instance() ); coordinator = new IndexerSQLMetadataStorageCoordinator( transactionFactory, diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java index 834553655570..788de1fa0ffb 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerSchemaPollTest.java @@ -54,7 +54,7 @@ public class SqlSegmentsMetadataManagerSchemaPollTest extends SqlSegmentsMetadat public void setUp() throws Exception { connector = derbyConnectorRule.getConnector(); - SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(3), false); + SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); segmentSchemaCache = new SegmentSchemaCache(new NoopServiceEmitter()); segmentSchemaManager = new SegmentSchemaManager( @@ -129,7 +129,7 @@ public void testPollSegmentAndSchema() CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = new CentralizedDatasourceSchemaConfig(); centralizedDatasourceSchemaConfig.setEnabled(true); - config = new SegmentsMetadataManagerConfig(Period.seconds(3), false); + config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), @@ -217,7 +217,7 @@ public void testPollOnlyNewSchemaVersion() CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig = new CentralizedDatasourceSchemaConfig(); centralizedDatasourceSchemaConfig.setEnabled(true); - config = new SegmentsMetadataManagerConfig(Period.seconds(3), false); + config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 1f71a3513a9e..c1f9a0d544f4 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -110,7 +110,7 @@ private void publishWikiSegments() public void setUp() { connector = derbyConnectorRule.getConnector(); - SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(3), false); + SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(3), null); storageConfig = derbyConnectorRule.metadataTablesConfigSupplier().get(); segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance()); @@ -1330,7 +1330,7 @@ public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() thr final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000"); // Re-create SqlSegmentsMetadataManager with a higher poll duration - final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(1), false); + final SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.seconds(1), null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( jsonMapper, Suppliers.ofInstance(config), diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java index 16710fef591f..89ad8139dea3 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java @@ -32,7 +32,6 @@ import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.TestDerbyConnector; -import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.TestDataSource; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -53,7 +52,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; public class HeapMemorySegmentMetadataCacheTest @@ -95,13 +93,13 @@ public void tearDown() /** * Creates the target {@link #cache} to be tested in the current test. */ - private void setupTargetWithCaching(boolean enabled) + private void setupTargetWithCaching(SegmentMetadataCache.UsageMode cacheMode) { if (cache != null) { throw new ISE("Test target has already been initialized with caching[%s]", cache.isEnabled()); } final SegmentsMetadataManagerConfig metadataManagerConfig - = new SegmentsMetadataManagerConfig(null, enabled); + = new SegmentsMetadataManagerConfig(null, cacheMode); cache = new HeapMemorySegmentMetadataCache( TestHelper.JSON_MAPPER, () -> metadataManagerConfig, @@ -114,7 +112,7 @@ private void setupTargetWithCaching(boolean enabled) private void setupAndSyncCache() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); cache.start(); cache.becomeLeader(); syncCacheAfterBecomingLeader(); @@ -134,13 +132,14 @@ private void syncCacheAfterBecomingLeader() */ private void syncCache() { + serviceEmitter.flush(); pollExecutor.finishNextPendingTasks(2); } @Test public void testStart_schedulesDbPoll_ifCacheIsEnabled() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); Assert.assertTrue(cache.isEnabled()); cache.start(); @@ -155,7 +154,7 @@ public void testStart_schedulesDbPoll_ifCacheIsEnabled() @Test public void testStart_doesNotScheduleDbPoll_ifCacheIsDisabled() { - setupTargetWithCaching(false); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER); Assert.assertFalse(cache.isEnabled()); cache.start(); @@ -166,7 +165,7 @@ public void testStart_doesNotScheduleDbPoll_ifCacheIsDisabled() @Test public void testStop_stopsDbPoll_ifCacheIsEnabled() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); Assert.assertTrue(cache.isEnabled()); cache.start(); @@ -180,7 +179,7 @@ public void testStop_stopsDbPoll_ifCacheIsEnabled() @Test public void testBecomeLeader_isNoop_ifCacheIsDisabled() { - setupTargetWithCaching(false); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER); cache.start(); Assert.assertFalse(pollExecutor.hasPendingTasks()); @@ -192,7 +191,7 @@ public void testBecomeLeader_isNoop_ifCacheIsDisabled() @Test public void testBecomeLeader_throwsException_ifCacheIsStopped() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); DruidExceptionMatcher.defensive().expectMessageIs( "Cache has not been started yet" ).assertThrowsAndMatches( @@ -203,7 +202,7 @@ public void testBecomeLeader_throwsException_ifCacheIsStopped() @Test public void testGetDataSource_throwsException_ifCacheIsDisabled() { - setupTargetWithCaching(false); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER); DruidExceptionMatcher.defensive().expectMessageIs( "Segment metadata cache is not enabled." ).assertThrowsAndMatches( @@ -214,7 +213,7 @@ public void testGetDataSource_throwsException_ifCacheIsDisabled() @Test public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); Assert.assertTrue(cache.isEnabled()); DruidExceptionMatcher.internalServerError().expectMessageIs( @@ -234,7 +233,7 @@ public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() @Test(timeout = 60_000) public void testGetDataSource_waitsForOneSync_afterBecomingLeader() throws InterruptedException { - setupTargetWithCaching(true); + setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); cache.start(); cache.becomeLeader(); @@ -338,7 +337,7 @@ public void testSync_emitsAlert_ifErrorOccurs() ); // Verify that sync duration is not emitted again - serviceEmitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 1); + serviceEmitter.verifyNotEmitted(Metric.SYNC_DURATION_MILLIS); } @Test @@ -361,7 +360,7 @@ public void testSync_doesNotFail_ifSegmentRecordIsBad() // Verify that sync completes successfully and updates the valid segment setupAndSyncCache(); serviceEmitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 1); - serviceEmitter.verifyValue(Metric.UPDATED_USED_SEGMENTS, 1L); + serviceEmitter.verifyValue(Metric.CACHED_USED_SEGMENTS, 1L); serviceEmitter.verifyValue(Metric.SKIPPED_SEGMENTS, 1L); final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); @@ -409,7 +408,7 @@ public void testSync_doesNotFail_ifPendingSegmentRecordIsBad() setupAndSyncCache(); serviceEmitter.verifyEmitted(Metric.SYNC_DURATION_MILLIS, 1); serviceEmitter.verifyValue(Metric.SKIPPED_PENDING_SEGMENTS, 1L); - serviceEmitter.verifyValue(Metric.UPDATED_USED_SEGMENTS, 1L); + serviceEmitter.verifyValue(Metric.CACHED_USED_SEGMENTS, 1L); serviceEmitter.verifyValue(Metric.PERSISTED_USED_SEGMENTS, 1L); serviceEmitter.verifyValue(Metric.PERSISTED_PENDING_SEGMENTS, 0L); @@ -598,19 +597,6 @@ public void testSync_removesPendingSegment_ifNotPresentInMetadataStore() ); } - @Test - public void testGetDatasource_increasesTransactionCount() - { - setupAndSyncCache(); - cache.getDatasource(TestDataSource.WIKI); - cache.getDatasource(TestDataSource.WIKI); - serviceEmitter.verifyEmitted( - Metric.TRANSACTION_COUNT, - Map.of(DruidMetrics.DATASOURCE, TestDataSource.WIKI), - 2 - ); - } - private void insertSegmentsInMetadataStore(Set segments) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java index 272ed1887ce7..1f02a5e9db97 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/KillUnusedSegmentsTest.java @@ -111,7 +111,7 @@ public class KillUnusedSegmentsTest public void setup() { connector = derbyConnectorRule.getConnector(); - SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.millis(1), false); + SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig(Period.millis(1), null); sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager( TestHelper.makeJsonMapper(), Suppliers.ofInstance(config),