diff --git a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java index 71974872a237..a894cfc301a4 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/SqlSegmentMetadataTransactionFactory.java @@ -28,7 +28,6 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; -import org.apache.druid.metadata.segment.cache.DatasourceSegmentCache; import org.apache.druid.metadata.segment.cache.Metric; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; import org.apache.druid.query.DruidMetrics; @@ -93,13 +92,12 @@ public T inReadOnlyDatasourceTransaction( // For read-only transactions, use cache only if it is already synced if (segmentMetadataCache.isSyncedForRead()) { - final DatasourceSegmentCache datasourceCache - = segmentMetadataCache.getDatasource(dataSource); - final SegmentMetadataReadTransaction cachedTransaction - = new CachedSegmentMetadataTransaction(sqlTransaction, datasourceCache, leaderSelector, true); - emitTransactionCount(Metric.READ_ONLY_TRANSACTIONS, dataSource); - return datasourceCache.read(() -> executeReadAndClose(cachedTransaction, callback)); + return segmentMetadataCache.readCacheForDataSource(dataSource, dataSourceCache -> { + final SegmentMetadataReadTransaction cachedTransaction + = new CachedSegmentMetadataTransaction(sqlTransaction, dataSourceCache, leaderSelector, true); + return executeReadAndClose(cachedTransaction, callback); + }); } else { return executeReadAndClose(sqlTransaction, callback); } @@ -121,17 +119,9 @@ public T inReadWriteDatasourceTransaction( = createSqlTransaction(dataSource, handle, status); if (segmentMetadataCache.isEnabled()) { - final boolean isCacheReadyForRead = segmentMetadataCache.isSyncedForRead(); - final DatasourceSegmentCache datasourceCache - = segmentMetadataCache.getDatasource(dataSource); - final SegmentMetadataTransaction cachedTransaction = new CachedSegmentMetadataTransaction( - sqlTransaction, - datasourceCache, - leaderSelector, - isCacheReadyForRead - ); - - if (isCacheReadyForRead) { + final boolean isSynced = segmentMetadataCache.isSyncedForRead(); + + if (isSynced) { emitTransactionCount(Metric.READ_WRITE_TRANSACTIONS, dataSource); } else { log.warn( @@ -142,7 +132,11 @@ public T inReadWriteDatasourceTransaction( emitTransactionCount(Metric.WRITE_ONLY_TRANSACTIONS, dataSource); } - return datasourceCache.write(() -> executeWriteAndClose(cachedTransaction, callback)); + return segmentMetadataCache.writeCacheForDataSource(dataSource, dataSourceCache -> { + final SegmentMetadataTransaction cachedTransaction = + new CachedSegmentMetadataTransaction(sqlTransaction, dataSourceCache, leaderSelector, isSynced); + return executeWriteAndClose(cachedTransaction, callback); + }); } else { return executeWriteAndClose(sqlTransaction, callback); } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/DatasourceSegmentCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/DatasourceSegmentCache.java index 58ebefe4927a..9003b5a27de5 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/DatasourceSegmentCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/DatasourceSegmentCache.java @@ -27,26 +27,5 @@ */ public interface DatasourceSegmentCache extends DatasourceSegmentMetadataWriter, DatasourceSegmentMetadataReader { - /** - * Performs a thread-safe read action on the cache. - * Read actions can be concurrent with other reads but are mutually exclusive - * from other write actions. - */ - T read(Action action) throws Exception; - /** - * Performs a thread-safe write action on the cache. - * Write actions are mutually exclusive from other writes or reads. - */ - T write(Action action) throws Exception; - - /** - * Represents a thread-safe read or write action performed on the cache within - * required locks. - */ - @FunctionalInterface - interface Action - { - T perform() throws Exception; - } } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java index 4a3b81505f24..8b9258a32a5f 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemoryDatasourceSegmentCache.java @@ -38,6 +38,7 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -45,7 +46,7 @@ /** * In-memory cache for segments and pending segments of a single datasource. */ -class HeapMemoryDatasourceSegmentCache extends ReadWriteCache +class HeapMemoryDatasourceSegmentCache extends ReadWriteCache implements AutoCloseable { private final String dataSource; @@ -60,6 +61,13 @@ class HeapMemoryDatasourceSegmentCache extends ReadWriteCache private final TreeMap intervalToSegments = new TreeMap<>(Comparators.intervalsByEndThenStart()); + /** + * Number of transactions currently using this cache. This field is accessed + * without acquiring an explicit lock on this cache since the operations are + * always performed within a ConcurrentHashMap.compute() which is atomic. + */ + private final AtomicInteger references = new AtomicInteger(0); + HeapMemoryDatasourceSegmentCache(String dataSource) { super(true); @@ -84,6 +92,30 @@ boolean isEmpty() return withReadLock(intervalToSegments::isEmpty); } + /** + * Acquires a reference to this cache, which must be closed in {@link #close()} + * after the transaction holding this reference has completed. + */ + void acquireReference() + { + references.incrementAndGet(); + } + + @Override + public void close() + { + references.decrementAndGet(); + } + + /** + * @return true if this cache is currently being used by a transaction and + * the number of {@link #references} is non-zero. + */ + boolean isBeingUsedByTransaction() + { + return references.get() > 0; + } + /** * Checks if a record in the cache needs to be updated. * diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java index d9210b782afd..c21399addc26 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCache.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Supplier; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -80,11 +81,7 @@ * For cache usage modes, see {@link UsageMode}. *

* The map {@link #datasourceToSegmentCache} contains the cache for each datasource. - * Items are only added to this map and never removed. This is to avoid handling - * race conditions where a thread has invoked {@link #getDatasource} but hasn't - * acquired a lock on the returned cache yet while another thread sees this cache - * as empty and cleans it up. The first thread would then end up using a stopped - * cache, resulting in errors. + * If the cache for a datasource is empty, the sync thread removes it from the map. */ @ThreadSafe public class HeapMemorySegmentMetadataCache implements SegmentMetadataCache @@ -251,12 +248,66 @@ public boolean isSyncedForRead() } @Override - public DatasourceSegmentCache getDatasource(String dataSource) + public T readCacheForDataSource(String dataSource, Action readAction) { verifyCacheIsUsableAndAwaitSync(); - return getCacheForDatasource(dataSource); + try (final HeapMemoryDatasourceSegmentCache datasourceCache = getCacheWithReference(dataSource)) { + return datasourceCache.withReadLock( + () -> { + try { + return readAction.perform(datasourceCache); + } + catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + ); + } } + @Override + public T writeCacheForDataSource(String dataSource, Action writeAction) + { + verifyCacheIsUsableAndAwaitSync(); + try (final HeapMemoryDatasourceSegmentCache datasourceCache = getCacheWithReference(dataSource)) { + return datasourceCache.withWriteLock( + () -> { + try { + return writeAction.perform(datasourceCache); + } + catch (Exception e) { + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + ); + } + } + + /** + * Returns the (existing or new) cache instance for the given datasource and + * acquires a single reference to it, which must be closed after the cache + * has been read or updated. + */ + private HeapMemoryDatasourceSegmentCache getCacheWithReference(String dataSource) + { + return datasourceToSegmentCache.compute( + dataSource, + (ds, existingCache) -> { + final HeapMemoryDatasourceSegmentCache newCache + = existingCache == null ? new HeapMemoryDatasourceSegmentCache(ds) : existingCache; + newCache.acquireReference(); + return newCache; + } + ); + } + + /** + * Returns the (existing or new) cache instance for the given datasource. + * Similar to {@link #getCacheWithReference} but does not acquire references + * that need to be closed. This method should be called only by the sync thread. + */ private HeapMemoryDatasourceSegmentCache getCacheForDatasource(String dataSource) { return datasourceToSegmentCache.computeIfAbsent(dataSource, HeapMemoryDatasourceSegmentCache::new); @@ -460,16 +511,36 @@ private long syncWithMetadataStore() */ private void markCacheSynced() { - datasourceToSegmentCache.forEach((dataSource, cache) -> { + final Set cachedDatasources = Set.copyOf(datasourceToSegmentCache.keySet()); + + for (String dataSource : cachedDatasources) { + final HeapMemoryDatasourceSegmentCache cache = datasourceToSegmentCache.getOrDefault( + dataSource, + new HeapMemoryDatasourceSegmentCache(dataSource) + ); final CacheStats stats = cache.markCacheSynced(); - if (!cache.isEmpty()) { + if (cache.isEmpty()) { + // If the cache is empty and not currently in use, remove it from the map + datasourceToSegmentCache.compute( + dataSource, + (ds, existingCache) -> { + if (existingCache != null && existingCache.isEmpty() + && !existingCache.isBeingUsedByTransaction()) { + emitMetric(dataSource, Metric.DELETED_DATASOURCES, 1L); + return null; + } else { + return existingCache; + } + } + ); + } else { emitMetric(dataSource, Metric.CACHED_INTERVALS, stats.getNumIntervals()); emitMetric(dataSource, Metric.CACHED_USED_SEGMENTS, stats.getNumUsedSegments()); emitMetric(dataSource, Metric.CACHED_UNUSED_SEGMENTS, stats.getNumUnusedSegments()); emitMetric(dataSource, Metric.CACHED_PENDING_SEGMENTS, stats.getNumPendingSegments()); } - }); + } } /** diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java index ea22a5fb7116..24f4fd8efa5d 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/Metric.java @@ -108,6 +108,11 @@ private Metric() // CACHE UPDATE METRICS + /** + * Total number of segments deleted from the cache in the latest sync. + */ + public static final String DELETED_DATASOURCES = METRIC_NAME_PREFIX + "dataSource/deleted"; + /** * Number of segments which are now stale in the cache and need to be refreshed. */ diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java index 88e2cf05e3b6..7dc7c080ea6a 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/NoopSegmentMetadataCache.java @@ -65,7 +65,13 @@ public boolean isSyncedForRead() } @Override - public DatasourceSegmentCache getDatasource(String dataSource) + public T readCacheForDataSource(String dataSource, Action readAction) + { + throw new UnsupportedOperationException(); + } + + @Override + public T writeCacheForDataSource(String dataSource, Action writeAction) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/ReadWriteCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/ReadWriteCache.java index d7ca22590107..168ddd90cbe7 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/ReadWriteCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/ReadWriteCache.java @@ -80,32 +80,6 @@ public T withReadLock(Supplier action) } } - @Override - public T read(DatasourceSegmentCache.Action action) throws Exception - { - stateLock.readLock().lock(); - try { - verifyCacheIsNotStopped(); - return action.perform(); - } - finally { - stateLock.readLock().unlock(); - } - } - - @Override - public T write(DatasourceSegmentCache.Action action) throws Exception - { - stateLock.writeLock().lock(); - try { - verifyCacheIsNotStopped(); - return action.perform(); - } - finally { - stateLock.writeLock().unlock(); - } - } - private void verifyCacheIsNotStopped() { if (isStopped) { diff --git a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java index 2fa501c7a123..671aba0c9c54 100644 --- a/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/metadata/segment/cache/SegmentMetadataCache.java @@ -65,9 +65,28 @@ public interface SegmentMetadataCache boolean isSyncedForRead(); /** - * Returns the cache for the given datasource. + * Performs a thread-safe read action on the cache for the given datasource. + * Read actions can be concurrent with other reads but are mutually exclusive + * from other write actions on the same datasource. */ - DatasourceSegmentCache getDatasource(String dataSource); + T readCacheForDataSource(String dataSource, Action readAction); + + /** + * Performs a thread-safe write action on the cache for the given datasource. + * Write actions are mutually exclusive from other writes or reads on the same + * datasource. + */ + T writeCacheForDataSource(String dataSource, Action writeAction); + + /** + * Represents a thread-safe read or write action performed on the cache within + * required locks. + */ + @FunctionalInterface + interface Action + { + T perform(DatasourceSegmentCache dataSourceCache) throws Exception; + } /** * Cache usage modes. diff --git a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java index 89ad8139dea3..b5a2fe6b0d79 100644 --- a/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/metadata/segment/cache/HeapMemorySegmentMetadataCacheTest.java @@ -200,18 +200,18 @@ public void testBecomeLeader_throwsException_ifCacheIsStopped() } @Test - public void testGetDataSource_throwsException_ifCacheIsDisabled() + public void testReadCacheForDataSource_throwsException_ifCacheIsDisabled() { setupTargetWithCaching(SegmentMetadataCache.UsageMode.NEVER); DruidExceptionMatcher.defensive().expectMessageIs( "Segment metadata cache is not enabled." ).assertThrowsAndMatches( - () -> cache.getDatasource(TestDataSource.WIKI) + () -> cache.readCacheForDataSource(TestDataSource.WIKI, d -> 0) ); } @Test - public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() + public void testReadCacheForDataSource_throwsException_ifCacheIsStoppedOrNotLeader() { setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); Assert.assertTrue(cache.isEnabled()); @@ -219,19 +219,19 @@ public void testGetDataSource_throwsException_ifCacheIsStoppedOrNotLeader() DruidExceptionMatcher.internalServerError().expectMessageIs( "Segment metadata cache has not been started yet." ).assertThrowsAndMatches( - () -> cache.getDatasource(TestDataSource.WIKI) + () -> cache.readCacheForDataSource(TestDataSource.WIKI, d -> 0) ); cache.start(); DruidExceptionMatcher.internalServerError().expectMessageIs( "Not leader yet. Segment metadata cache is not usable." ).assertThrowsAndMatches( - () -> cache.getDatasource(TestDataSource.WIKI) + () -> cache.readCacheForDataSource(TestDataSource.WIKI, d -> 0) ); } @Test(timeout = 60_000) - public void testGetDataSource_waitsForOneSync_afterBecomingLeader() throws InterruptedException + public void testReadCacheForDataSource_waitsForOneSync_afterBecomingLeader() throws InterruptedException { setupTargetWithCaching(SegmentMetadataCache.UsageMode.ALWAYS); cache.start(); @@ -241,7 +241,7 @@ public void testGetDataSource_waitsForOneSync_afterBecomingLeader() throws Inter // Invoke getDatasource in Thread 1 final Thread getDatasourceThread = new Thread(() -> { - cache.getDatasource(TestDataSource.WIKI); + cache.readCacheForDataSource(TestDataSource.WIKI, d -> 0); observedEventOrder.add("getDatasource completed"); }); getDatasourceThread.start(); @@ -264,7 +264,7 @@ public void testGetDataSource_waitsForOneSync_afterBecomingLeader() throws Inter // Verify that subsequent calls to getDatasource do not wait final Thread getDatasourceThread2 = new Thread(() -> { - cache.getDatasource(TestDataSource.WIKI); + cache.readCacheForDataSource(TestDataSource.WIKI, d -> 0); observedEventOrder.add("getDatasource 2 completed"); }); getDatasourceThread2.start(); @@ -281,16 +281,29 @@ public void testAddSegmentsToCache() { setupAndSyncCache(); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); - final DataSegmentPlus segment = CreateDataSegments.ofDatasource(TestDataSource.WIKI) .markUsed().asPlus(); final SegmentId segmentId = segment.getDataSegment().getId(); - Assert.assertNull(wikiCache.findUsedSegment(segmentId)); + Assert.assertNull( + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegment(segmentId) + ) + ); - Assert.assertEquals(1, wikiCache.insertSegments(Set.of(segment))); - Assert.assertEquals(segment.getDataSegment(), wikiCache.findUsedSegment(segmentId)); + final int numInsertedSegments = cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.insertSegments(Set.of(segment)) + ); + Assert.assertEquals(1, numInsertedSegments); + Assert.assertEquals( + segment.getDataSegment(), + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegment(segmentId) + ) + ); } @Test @@ -302,9 +315,11 @@ public void testSync_addsUsedSegment_ifNotPresentInCache() = CreateDataSegments.ofDatasource(TestDataSource.WIKI).updatedNow().markUsed().asPlus(); insertSegmentsInMetadataStore(Set.of(usedSegmentPlus)); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); Assert.assertTrue( - wikiCache.findUsedSegmentsPlusOverlappingAnyOf(List.of()).isEmpty() + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegmentsPlusOverlappingAnyOf(List.of()) + ).isEmpty() ); syncCache(); @@ -314,7 +329,10 @@ public void testSync_addsUsedSegment_ifNotPresentInCache() Assert.assertEquals( usedSegmentPlus.getDataSegment(), - wikiCache.findUsedSegment(usedSegmentPlus.getDataSegment().getId()) + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegment(usedSegmentPlus.getDataSegment().getId()) + ) ); } @@ -363,11 +381,18 @@ public void testSync_doesNotFail_ifSegmentRecordIsBad() serviceEmitter.verifyValue(Metric.CACHED_USED_SEGMENTS, 1L); serviceEmitter.verifyValue(Metric.SKIPPED_SEGMENTS, 1L); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); - Assert.assertNull(wikiCache.findUsedSegment(invalidSegment.getDataSegment().getId())); + Assert.assertNull( + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegment(invalidSegment.getDataSegment().getId()) + ) + ); Assert.assertEquals( validSegment.getDataSegment(), - wikiCache.findUsedSegment(validSegment.getDataSegment().getId()) + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegment(validSegment.getDataSegment().getId()) + ) ); } @@ -412,11 +437,18 @@ public void testSync_doesNotFail_ifPendingSegmentRecordIsBad() serviceEmitter.verifyValue(Metric.PERSISTED_USED_SEGMENTS, 1L); serviceEmitter.verifyValue(Metric.PERSISTED_PENDING_SEGMENTS, 0L); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); - Assert.assertEquals(segment.getDataSegment(), wikiCache.findUsedSegment(segment.getDataSegment().getId())); + Assert.assertEquals( + segment.getDataSegment(), + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegment(segment.getDataSegment().getId()) + ) + ); Assert.assertTrue( - wikiCache.findPendingSegmentsOverlapping(Intervals.ETERNITY) - .isEmpty() + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findPendingSegmentsOverlapping(Intervals.ETERNITY) + ).isEmpty() ); } @@ -424,7 +456,6 @@ public void testSync_doesNotFail_ifPendingSegmentRecordIsBad() public void testSync_updatesUsedSegment_ifCacheHasOlderEntry() { setupAndSyncCache(); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); // Add a used segment to both metadata store and cache final DateTime updateTime = DateTimes.nowUtc(); @@ -432,11 +463,17 @@ public void testSync_updatesUsedSegment_ifCacheHasOlderEntry() CreateDataSegments.ofDatasource(TestDataSource.WIKI).markUsed() .lastUpdatedOn(updateTime).asPlus(); insertSegmentsInMetadataStore(Set.of(usedSegmentPlus)); - wikiCache.insertSegments(Set.of(usedSegmentPlus)); + cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.insertSegments(Set.of(usedSegmentPlus)) + ); Assert.assertEquals( Set.of(usedSegmentPlus), - wikiCache.findUsedSegmentsPlusOverlappingAnyOf(List.of()) + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegmentsPlusOverlappingAnyOf(List.of()) + ) ); // Add a newer version of segment to metadata store @@ -458,7 +495,10 @@ public void testSync_updatesUsedSegment_ifCacheHasOlderEntry() Assert.assertEquals( Set.of(updatedSegment), - wikiCache.findUsedSegmentsPlusOverlappingAnyOf(List.of()) + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegmentsPlusOverlappingAnyOf(List.of()) + ) ); } @@ -466,16 +506,21 @@ public void testSync_updatesUsedSegment_ifCacheHasOlderEntry() public void testSync_removesUsedSegment_ifNotPresentInMetadataStore() { setupAndSyncCache(); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); final DataSegmentPlus unpersistedSegmentPlus = CreateDataSegments.ofDatasource(TestDataSource.WIKI).markUsed().asPlus(); - wikiCache.insertSegments(Set.of(unpersistedSegmentPlus)); + cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.insertSegments(Set.of(unpersistedSegmentPlus)) + ); final DataSegment unpersistedSegment = unpersistedSegmentPlus.getDataSegment(); Assert.assertEquals( unpersistedSegment, - wikiCache.findUsedSegment(unpersistedSegment.getId()) + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegment(unpersistedSegment.getId()) + ) ); syncCache(); @@ -483,7 +528,10 @@ public void testSync_removesUsedSegment_ifNotPresentInMetadataStore() serviceEmitter.verifyNotEmitted(Metric.PERSISTED_USED_SEGMENTS); Assert.assertNull( - wikiCache.findUsedSegment(unpersistedSegment.getId()) + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findUsedSegment(unpersistedSegment.getId()) + ) ); } @@ -491,13 +539,21 @@ public void testSync_removesUsedSegment_ifNotPresentInMetadataStore() public void testSync_removesUnusedSegment_ifCacheHasOlderEntry() { setupAndSyncCache(); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); final DateTime now = DateTimes.nowUtc(); final DataSegmentPlus unpersistedSegmentPlus = CreateDataSegments.ofDatasource(TestDataSource.WIKI).markUsed().asPlus(); - wikiCache.insertSegments(Set.of(unpersistedSegmentPlus)); - wikiCache.markSegmentAsUnused(unpersistedSegmentPlus.getDataSegment().getId(), now.minusMinutes(1)); + cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.insertSegments(Set.of(unpersistedSegmentPlus)) + ); + cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.markSegmentAsUnused( + unpersistedSegmentPlus.getDataSegment().getId(), + now.minusMinutes(1) + ) + ); syncCache(); serviceEmitter.verifyValue(Metric.DELETED_SEGMENTS, 1L); @@ -515,9 +571,14 @@ public void testSync_doesNotRemoveIntervalWithOnlyUnusedSegments() CreateDataSegments.ofDatasource(TestDataSource.WIKI).updatedNow().markUsed().asPlus(); final DateTime now = DateTimes.nowUtc(); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); - wikiCache.insertSegments(Set.of(usedSegment)); - wikiCache.markSegmentAsUnused(usedSegment.getDataSegment().getId(), now.plusMinutes(1)); + cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.insertSegments(Set.of(usedSegment)) + ); + cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.markSegmentAsUnused(usedSegment.getDataSegment().getId(), now.plusMinutes(1)) + ); syncCache(); serviceEmitter.verifyValue(Metric.CACHED_UNUSED_SEGMENTS, 1L); @@ -544,11 +605,13 @@ public void testSync_addsPendingSegment_ifNotPresentInCache() ); final SegmentIdWithShardSpec segmentId = pendingSegment.getId(); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); Assert.assertTrue( - wikiCache.findPendingSegmentIdsWithExactInterval( - pendingSegment.getSequenceName(), - segmentId.getInterval() + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findPendingSegmentIdsWithExactInterval( + pendingSegment.getSequenceName(), + segmentId.getInterval() + ) ).isEmpty() ); @@ -558,9 +621,12 @@ public void testSync_addsPendingSegment_ifNotPresentInCache() Assert.assertEquals( List.of(segmentId), - wikiCache.findPendingSegmentIdsWithExactInterval( - pendingSegment.getSequenceName(), - segmentId.getInterval() + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findPendingSegmentIdsWithExactInterval( + pendingSegment.getSequenceName(), + segmentId.getInterval() + ) ) ); } @@ -572,15 +638,20 @@ public void testSync_removesPendingSegment_ifNotPresentInMetadataStore() // Create a pending segment and add it only to the cache final PendingSegmentRecord pendingSegment = createPendingSegment(DateTimes.nowUtc().minusHours(1)); - final DatasourceSegmentCache wikiCache = cache.getDatasource(TestDataSource.WIKI); - wikiCache.insertPendingSegment(pendingSegment, false); + cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.insertPendingSegment(pendingSegment, false) + ); final SegmentIdWithShardSpec segmentId = pendingSegment.getId(); Assert.assertEquals( List.of(segmentId), - wikiCache.findPendingSegmentIdsWithExactInterval( - pendingSegment.getSequenceName(), - segmentId.getInterval() + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findPendingSegmentIdsWithExactInterval( + pendingSegment.getSequenceName(), + segmentId.getInterval() + ) ) ); @@ -590,13 +661,37 @@ public void testSync_removesPendingSegment_ifNotPresentInMetadataStore() serviceEmitter.verifyValue(Metric.DELETED_PENDING_SEGMENTS, 1L); Assert.assertTrue( - wikiCache.findPendingSegmentIdsWithExactInterval( - pendingSegment.getSequenceName(), - segmentId.getInterval() + cache.readCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.findPendingSegmentIdsWithExactInterval( + pendingSegment.getSequenceName(), + segmentId.getInterval() + ) ).isEmpty() ); } + @Test + public void testSync_cleansUpDataSourceCache_ifEmptyAndNotInUse() + { + setupAndSyncCache(); + + final DateTime now = DateTimes.nowUtc(); + final DataSegmentPlus segment = CreateDataSegments.ofDatasource(TestDataSource.WIKI) + .markUsed().lastUpdatedOn(now.minusHours(1)) + .asPlus(); + final int numInsertedSegments = cache.writeCacheForDataSource( + TestDataSource.WIKI, + wikiCache -> wikiCache.insertSegments(Set.of(segment)) + ); + Assert.assertEquals(1, numInsertedSegments); + + // Verify that sync removes the extra entry from the cache and also cleans it up + syncCache(); + serviceEmitter.verifyValue(Metric.DELETED_SEGMENTS, 1L); + serviceEmitter.verifyValue(Metric.DELETED_DATASOURCES, 1L); + } + private void insertSegmentsInMetadataStore(Set segments) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();