diff --git a/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java b/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java index f5762a165746..1a24d41adf81 100644 --- a/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java +++ b/processing/src/test/java/org/apache/druid/segment/TestSegmentUtils.java @@ -126,6 +126,12 @@ public List getOrdering() { return Cursors.ascendingTimeOrder(); } + + @Override + public int getNumRows() + { + return 1234; + } }; public static class SegmentForTesting extends QueryableIndexSegment @@ -178,6 +184,8 @@ public T as(@Nonnull Class clazz) return (T) INDEX; } else if (clazz.equals(CursorFactory.class)) { return (T) new QueryableIndexCursorFactory(INDEX); + } else if (clazz.equals(PhysicalSegmentInspector.class)) { + return (T) new QueryableIndexPhysicalSegmentInspector(INDEX); } return null; } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index 3cdf2f1bf641..bc7f2d8d2478 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -648,6 +648,25 @@ boolean isSegmentCached(final DataSegment segment) return false; } + /** + * Testing use only please, any callers that want to do stuff with segments should use + * {@link #acquireCachedSegment(DataSegment)} or {@link #acquireSegment(DataSegment)} instead. Does not hold locks + * and so is not really safe to use while the cache manager is active + */ + @VisibleForTesting + @Nullable + public ReferenceCountedSegmentProvider getSegmentReferenceProvider(DataSegment segment) + { + final SegmentCacheEntry cacheEntry = new SegmentCacheEntry(segment); + for (StorageLocation location : locations) { + final SegmentCacheEntry entry = location.getCacheEntry(cacheEntry.id); + if (entry != null) { + return entry.referenceProvider; + } + } + return null; + } + /** * Returns the effective segment info directory based on the configuration settings. * The directory is selected based on the following configurations injected into this class: diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index cf8314088eca..82b4d44a4fea 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -380,11 +380,11 @@ public void dropSegment(final DataSegment dataSegment) try (final Closer closer = Closer.create()) { final Optional oldSegment = cacheManager.acquireCachedSegment(oldSegmentRef); long numberOfRows = oldSegment.map(segment -> { + closer.register(segment); final PhysicalSegmentInspector countInspector = segment.as(PhysicalSegmentInspector.class); if (countInspector != null) { return countInspector.getNumRows(); } - CloseableUtils.closeAndWrapExceptions(segment); return 0; }).orElse(0); diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 3e05eb043965..214b52bad95c 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -35,6 +35,7 @@ import org.apache.druid.query.expression.TestExprMacroTable; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.ReferenceCountedSegmentProvider; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.SegmentMapFunction; import org.apache.druid.segment.TestHelper; @@ -45,7 +46,6 @@ import org.apache.druid.segment.loading.LeastBytesUsedStorageLocationSelectorStrategy; import org.apache.druid.segment.loading.LocalDataSegmentPuller; import org.apache.druid.segment.loading.LocalLoadSpec; -import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.SegmentLocalCacheManager; @@ -90,7 +90,9 @@ public class SegmentManagerTest extends InitializedNullHandlingTest ); private ExecutorService executor; + private SegmentLocalCacheManager cacheManager; private SegmentManager segmentManager; + private SegmentLocalCacheManager virtualCacheManager; private SegmentManager virtualSegmentManager; @Rule @@ -160,7 +162,7 @@ public boolean isVirtualStorage() ); final List storageLocations = loaderConfig.toStorageLocations(); - final SegmentLocalCacheManager cacheManager = new SegmentLocalCacheManager( + cacheManager = new SegmentLocalCacheManager( storageLocations, loaderConfig, new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations), @@ -170,7 +172,7 @@ public boolean isVirtualStorage() segmentManager = new SegmentManager(cacheManager); final List virtualStorageLocations = virtualLoaderConfig.toStorageLocations(); - final SegmentCacheManager virtualCacheManager = new SegmentLocalCacheManager( + virtualCacheManager = new SegmentLocalCacheManager( virtualStorageLocations, virtualLoaderConfig, new LeastBytesUsedStorageLocationSelectorStrategy(virtualStorageLocations), @@ -238,8 +240,12 @@ public void testLoadBootstrapSegment() throws ExecutionException, InterruptedExc @Test public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException, IOException { + List referenceProviders = new ArrayList<>(); for (DataSegment eachSegment : SEGMENTS) { segmentManager.loadSegment(eachSegment); + ReferenceCountedSegmentProvider refProvider = cacheManager.getSegmentReferenceProvider(eachSegment); + referenceProviders.add(refProvider); + Assert.assertFalse(refProvider.isClosed()); } final List> futures = ImmutableList.of(SEGMENTS.get(0), SEGMENTS.get(2)).stream() @@ -260,6 +266,14 @@ public void testDropSegment() throws SegmentLoadingException, ExecutionException assertResult( ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4)) ); + for (int i = 0; i < SEGMENTS.size(); i++) { + Assert.assertEquals(0, referenceProviders.get(i).getNumReferences()); + if (i == 0 || i == 2) { + Assert.assertTrue(referenceProviders.get(i).isClosed()); + } else { + Assert.assertFalse(referenceProviders.get(i).isClosed()); + } + } } private Void loadSegmentOrFail(DataSegment segment)