diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java index 3886f3dd1454..4e3e8b2bd378 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/DruidSegmentReaderTest.java @@ -606,6 +606,18 @@ public void cleanup(DataSegment segment) { throw new UnsupportedOperationException("unused"); } + + @Override + public boolean reserve(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(DataSegment segment) + { + throw new UnsupportedOperationException(); + } }, DataSegment.builder() .dataSource("ds") diff --git a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java index 5b531316cd5a..55e390463b5f 100644 --- a/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/ReferenceCountingSegment.java @@ -33,6 +33,9 @@ * {@link Segment} that is also a {@link ReferenceCountingSegment}, allowing query engines that operate directly on * segments to track references so that dropping a {@link Segment} can be done safely to ensure there are no in-flight * queries. + * + * Extensions can extend this class for populating {@link org.apache.druid.timeline.VersionedIntervalTimeline} with + * a custom implementation through SegmentLoader. */ public class ReferenceCountingSegment extends ReferenceCountingCloseableObject implements SegmentReference, Overshadowable @@ -67,7 +70,7 @@ public static ReferenceCountingSegment wrapSegment( ); } - private ReferenceCountingSegment( + protected ReferenceCountingSegment( Segment baseSegment, int startRootPartitionId, int endRootPartitionId, @@ -172,4 +175,13 @@ public Optional acquireReferences() { return incrementReferenceAndDecrementOnceCloseable(); } + + @Override + public T as(Class clazz) + { + if (isClosed()) { + return null; + } + return baseObject.as(clazz); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java index 6566592e9d5c..b1f65cfc8d1d 100644 --- a/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/segment/ReferenceCountingSegmentTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.timeline.SegmentId; import org.easymock.EasyMock; import org.joda.time.Days; @@ -43,6 +44,7 @@ public class ReferenceCountingSegmentTest private final Interval dataInterval = new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc()); private QueryableIndex index; private StorageAdapter adapter; + private IndexedTable indexedTable; private int underlyingSegmentClosedCount; @Before @@ -51,6 +53,7 @@ public void setUp() underlyingSegmentClosedCount = 0; index = EasyMock.createNiceMock(QueryableIndex.class); adapter = EasyMock.createNiceMock(StorageAdapter.class); + indexedTable = EasyMock.createNiceMock(IndexedTable.class); segment = ReferenceCountingSegment.wrapRootGenerationSegment( new Segment() @@ -79,6 +82,19 @@ public StorageAdapter asStorageAdapter() return adapter; } + @Override + public T as(Class clazz) + { + if (clazz.equals(QueryableIndex.class)) { + return (T) asQueryableIndex(); + } else if (clazz.equals(StorageAdapter.class)) { + return (T) asStorageAdapter(); + } else if (clazz.equals(IndexedTable.class)) { + return (T) indexedTable; + } + return null; + } + @Override public void close() { @@ -159,4 +175,13 @@ public void testExposesWrappedSegment() Assert.assertEquals(adapter, segment.asStorageAdapter()); } + @Test + public void testSegmentAs() + { + Assert.assertSame(index, segment.as(QueryableIndex.class)); + Assert.assertSame(adapter, segment.as(StorageAdapter.class)); + Assert.assertSame(indexedTable, segment.as(IndexedTable.class)); + Assert.assertNull(segment.as(String.class)); + } + } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java index 945857470316..39ea7858406e 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentCacheManager.java @@ -30,18 +30,58 @@ public interface SegmentCacheManager { /** - * Checks whether a segment is already cached. + * Checks whether a segment is already cached. It can return false even if {@link #reserve(DataSegment)} + * has been successful for a segment but is not downloaded yet. */ boolean isSegmentCached(DataSegment segment); /** - * This method fetches the files for the given segment if the segment is not downloaded already. + * This method fetches the files for the given segment if the segment is not downloaded already. It + * is not required to {@link #reserve(DataSegment)} before calling this method. If caller has not reserved + * the space explicitly via {@link #reserve(DataSegment)}, the implementation should reserve space on caller's + * behalf. + * If the space has been explicitly reserved already + * - implementation should use only the reserved space to store segment files. + * - implementation should not release the location in case of download erros and leave it to the caller. * @throws SegmentLoadingException if there is an error in downloading files */ File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; /** - * Cleanup the cache space used by the segment + * Tries to reserve the space for a segment on any location. When the space has been reserved, + * {@link #getSegmentFiles(DataSegment)} should download the segment on the reserved location or + * fail otherwise. + * + * This function is useful for custom extensions. Extensions can try to reserve the space first and + * if not successful, make some space by cleaning up other segments, etc. There is also improved + * concurrency for extensions with this function. Since reserve is a cheaper operation to invoke + * till the space has been reserved. Hence it can be put inside a lock if required by the extensions. getSegment + * can't be put inside a lock since it is a time-consuming operation, on account of downloading the files. + * + * @param segment - Segment to reserve + * @return True if enough space found to store the segment, false otherwise + */ + /* + * We only return a boolean result instead of a pointer to + * {@link StorageLocation} since we don't want callers to operate on {@code StorageLocation} directly outside {@code SegmentLoader}. + * {@link SegmentLoader} operates on the {@code StorageLocation} objects in a thread-safe manner. + */ + boolean reserve(DataSegment segment); + + /** + * Reverts the effects of {@link #reserve(DataSegment)} (DataSegment)} by releasing the location reserved for this segment. + * Callers, that explicitly reserve the space via {@link #reserve(DataSegment)}, should use this method to release the space. + * + * Implementation can throw error if the space is being released but there is data present. Callers + * are supposed to ensure that any data is removed via {@link #cleanup(DataSegment)} + * @param segment - Segment to release the location for. + * @return - True if any location was reserved and released, false otherwise. + */ + boolean release(DataSegment segment); + + /** + * Cleanup the cache space used by the segment. It will not release the space if the space has been + * explicitly reserved via {@link #reserve(DataSegment)} */ void cleanup(DataSegment segment); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java index 8fe38a310290..03bc0506f7d9 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java @@ -19,7 +19,8 @@ package org.apache.druid.segment.loading; -import org.apache.druid.segment.Segment; +import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; @@ -27,16 +28,25 @@ * Loading segments from deep storage to local storage. Internally, this class can delegate the download to * {@link SegmentCacheManager}. Implementations must be thread-safe. */ +@UnstableApi public interface SegmentLoader { + /** - * Builds a {@link Segment} by downloading if necessary + * Returns a {@link ReferenceCountingSegment} that will be added by the {@link org.apache.druid.server.SegmentManager} + * to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This method can be called multiple times + * by the {@link org.apache.druid.server.SegmentManager} and implementation can either return same {@link ReferenceCountingSegment} + * or a different {@link ReferenceCountingSegment}. Caller should not assume any particular behavior. + * + * Returning a {@code ReferenceCountingSegment} will let custom implementations keep track of reference count for + * segments that the custom implementations are creating. That way, custom implementations can know when the segment + * is in use or not. * @param segment - Segment to load * @param lazy - Whether column metadata de-serialization is to be deferred to access time. Setting this flag to true can speed up segment loading * @param loadFailed - Callback to invoke if lazy loading fails during column access. * @throws SegmentLoadingException - If there is an error in loading the segment */ - Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException; + ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException; /** * cleanup any state used by this segment diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java index 6970f7b0caf2..7fbf425bb2a2 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheLoader.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.guice.annotations.Json; import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; @@ -46,7 +47,7 @@ public SegmentLocalCacheLoader(SegmentCacheManager cacheManager, IndexIO indexIO } @Override - public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException + public ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { final File segmentFiles = cacheManager.getSegmentFiles(segment); File factoryJson = new File(segmentFiles, "factory.json"); @@ -63,7 +64,8 @@ public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFail factory = new MMappedQueryableSegmentizerFactory(indexIO); } - return factory.factorize(segment, segmentFiles, lazy, loadFailed); + Segment segmentObject = factory.factorize(segment, segmentFiles, lazy, loadFailed); + return ReferenceCountingSegment.wrapSegment(segmentObject, segment.getShardSpec()); } @Override diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java index 412cfe9728b1..25c9cae4554f 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLocalCacheManager.java @@ -30,6 +30,7 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; + import java.io.File; import java.io.IOException; import java.util.Iterator; @@ -37,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; /** + * */ public class SegmentLocalCacheManager implements SegmentCacheManager { @@ -110,6 +112,7 @@ public SegmentLocalCacheManager( * * This ctor is mainly for test cases, including test cases in other modules */ + @VisibleForTesting public SegmentLocalCacheManager( SegmentLoaderConfig config, @Json ObjectMapper mapper @@ -122,25 +125,45 @@ public SegmentLocalCacheManager( log.info("Using storage location strategy: [%s]", this.strategy.getClass().getSimpleName()); } + + static String getSegmentDir(DataSegment segment) + { + return DataSegmentPusher.getDefaultStorageDir(segment, false); + } + @Override public boolean isSegmentCached(final DataSegment segment) { - return findStorageLocationIfLoaded(segment) != null; + return findStoragePathIfCached(segment) != null; } + /** + * This method will try to find if the segment is already downloaded on any location. If so, the segment path + * is returned. Along with that, location state is also updated with the segment location. Refer to + * {@link StorageLocation#maybeReserve(String, DataSegment)} for more details. + * If the segment files are damaged in any location, they are removed from the location. + * @param segment - Segment to check + * @return - Path corresponding to segment directory if found, null otherwise. + */ @Nullable - private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) + private File findStoragePathIfCached(final DataSegment segment) { for (StorageLocation location : locations) { - File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); + String storageDir = getSegmentDir(segment); + File localStorageDir = location.segmentDirectoryAsFile(storageDir); if (localStorageDir.exists()) { if (checkSegmentFilesIntact(localStorageDir)) { - log.warn("[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", localStorageDir.getAbsolutePath()); + log.warn( + "[%s] may be damaged. Delete all the segment files and pull from DeepStorage again.", + localStorageDir.getAbsolutePath() + ); cleanupCacheFiles(location.getPath(), localStorageDir); location.removeSegmentDir(localStorageDir, segment); break; } else { - return location; + // Before returning, we also reserve the space. Refer to the StorageLocation#maybeReserve documentation for details. + location.maybeReserve(storageDir, segment); + return localStorageDir; } } } @@ -180,16 +203,12 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException final ReferenceCountingLock lock = createOrGetLock(segment); synchronized (lock) { try { - StorageLocation loc = findStorageLocationIfLoaded(segment); - String storageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); - - if (loc == null) { - loc = loadSegmentWithRetry(segment, storageDir); - } else { - // If the segment is already downloaded on disk, we just update the current usage - loc.maybeReserve(storageDir, segment); + File segmentDir = findStoragePathIfCached(segment); + if (segmentDir != null) { + return segmentDir; } - return new File(loc.getPath(), storageDir); + + return loadSegmentWithRetry(segment); } finally { unlock(segment, lock); @@ -198,44 +217,83 @@ public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException } /** - * location may fail because of IO failure, most likely in two cases:

+ * If we have already reserved a location before, probably via {@link #reserve(DataSegment)}, then only that location + * should be tried. Otherwise, we would fetch locations using {@link StorageLocationSelectorStrategy} and try all + * of them one by one till there is success. + * Location may fail because of IO failure, most likely in two cases:

* 1. druid don't have the write access to this location, most likely the administrator doesn't config it correctly

* 2. disk failure, druid can't read/write to this disk anymore - * + *

* Locations are fetched using {@link StorageLocationSelectorStrategy}. */ - private StorageLocation loadSegmentWithRetry(DataSegment segment, String storageDirStr) throws SegmentLoadingException + private File loadSegmentWithRetry(DataSegment segment) throws SegmentLoadingException { - Iterator locationsIterator = strategy.getLocations(); + String segmentDir = getSegmentDir(segment); + + // Try the already reserved location. If location has been reserved outside, then we do not release the location + // here and simply delete any downloaded files. That is, we revert anything we do in this function and nothing else. + for (StorageLocation loc : locations) { + if (loc.isReserved(segmentDir)) { + File storageDir = loc.segmentDirectoryAsFile(segmentDir); + boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, false); + if (!success) { + throw new SegmentLoadingException("Failed to load segment %s in reserved location [%s]", segment.getId(), loc.getPath().getAbsolutePath()); + } + return storageDir; + } + } + // No location was reserved so we try all the locations + Iterator locationsIterator = strategy.getLocations(); while (locationsIterator.hasNext()) { StorageLocation loc = locationsIterator.next(); - File storageDir = loc.reserve(storageDirStr, segment); + // storageDir is the file path corresponding to segment dir + File storageDir = loc.reserve(segmentDir, segment); if (storageDir != null) { - try { - loadInLocationWithStartMarker(segment, storageDir); - return loc; - } - catch (SegmentLoadingException e) { - try { - log.makeAlert( - e, - "Failed to load segment in current location [%s], try next location if any", - loc.getPath().getAbsolutePath() - ).addData("location", loc.getPath().getAbsolutePath()).emit(); - } - finally { - loc.removeSegmentDir(storageDir, segment); - cleanupCacheFiles(loc.getPath(), storageDir); - } + boolean success = loadInLocationWithStartMarkerQuietly(loc, segment, storageDir, true); + if (success) { + return storageDir; } } } throw new SegmentLoadingException("Failed to load segment %s in all locations.", segment.getId()); } + /** + * A helper method over {@link #loadInLocationWithStartMarker(DataSegment, File)} that catches the {@link SegmentLoadingException} + * and emits alerts. + * @param loc - {@link StorageLocation} where segment is to be downloaded in. + * @param segment - {@link DataSegment} to download + * @param storageDir - {@link File} pointing to segment directory + * @param releaseLocation - Whether to release the location in case of failures + * @return - True if segment was downloaded successfully, false otherwise. + */ + private boolean loadInLocationWithStartMarkerQuietly(StorageLocation loc, DataSegment segment, File storageDir, boolean releaseLocation) + { + try { + loadInLocationWithStartMarker(segment, storageDir); + return true; + } + catch (SegmentLoadingException e) { + try { + log.makeAlert( + e, + "Failed to load segment in current location [%s], try next location if any", + loc.getPath().getAbsolutePath() + ).addData("location", loc.getPath().getAbsolutePath()).emit(); + } + finally { + if (releaseLocation) { + loc.removeSegmentDir(storageDir, segment); + } + cleanupCacheFiles(loc.getPath(), storageDir); + } + } + return false; + } + private void loadInLocationWithStartMarker(DataSegment segment, File storageDir) throws SegmentLoadingException { // We use a marker to prevent the case where a segment is downloaded, but before the download completes, @@ -277,6 +335,73 @@ private void loadInLocation(DataSegment segment, File storageDir) throws Segment } } + @Override + public boolean reserve(final DataSegment segment) + { + final ReferenceCountingLock lock = createOrGetLock(segment); + synchronized (lock) { + try { + // May be the segment was already loaded [This check is required to account for restart scenarios] + if (null != findStoragePathIfCached(segment)) { + return true; + } + + String storageDirStr = getSegmentDir(segment); + + // check if we already reserved the segment + for (StorageLocation location : locations) { + if (location.isReserved(storageDirStr)) { + return true; + } + } + + // Not found in any location, reserve now + for (Iterator it = strategy.getLocations(); it.hasNext(); ) { + StorageLocation location = it.next(); + if (null != location.reserve(storageDirStr, segment)) { + return true; + } + } + } + finally { + unlock(segment, lock); + } + } + + return false; + } + + @Override + public boolean release(final DataSegment segment) + { + final ReferenceCountingLock lock = createOrGetLock(segment); + synchronized (lock) { + try { + String storageDir = getSegmentDir(segment); + + // Release the first location encountered + for (StorageLocation location : locations) { + if (location.isReserved(storageDir)) { + File localStorageDir = location.segmentDirectoryAsFile(storageDir); + if (localStorageDir.exists()) { + throw new ISE( + "Asking to release a location '%s' while the segment directory '%s' is present on disk. Any state on disk must be deleted before releasing", + location.getPath().getAbsolutePath(), + localStorageDir.getAbsolutePath() + ); + } + return location.release(storageDir, segment.getSize()); + } + } + } + finally { + unlock(segment, lock); + } + } + + return false; + } + @Override public void cleanup(DataSegment segment) { @@ -287,18 +412,17 @@ public void cleanup(DataSegment segment) final ReferenceCountingLock lock = createOrGetLock(segment); synchronized (lock) { try { - StorageLocation loc = findStorageLocationIfLoaded(segment); + File loc = findStoragePathIfCached(segment); if (loc == null) { log.warn("Asked to cleanup something[%s] that didn't exist. Skipping.", segment.getId()); return; } - // If storageDir.mkdirs() success, but downloadStartMarker.createNewFile() failed, // in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not. // So we should always clean all possible locations here for (StorageLocation location : locations) { - File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment, false)); + File localStorageDir = new File(location.getPath(), getSegmentDir(segment)); if (localStorageDir.exists()) { // Druid creates folders of the form dataSource/interval/version/partitionNum. // We need to clean up all these directories if they are all empty. diff --git a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java index efaefe96d21e..60f1831856b3 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java +++ b/server/src/main/java/org/apache/druid/segment/loading/StorageLocation.java @@ -26,6 +26,7 @@ import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; + import java.io.File; import java.util.HashSet; import java.util.Set; @@ -117,6 +118,16 @@ public synchronized File reserve(String segmentDir, DataSegment segment) return reserve(segmentDir, segment.getId().toString(), segment.getSize()); } + public synchronized boolean isReserved(String segmentDir) + { + return files.contains(segmentDirectoryAsFile(segmentDir)); + } + + public File segmentDirectoryAsFile(String segmentDir) + { + return new File(path, segmentDir); //lgtm [java/path-injection] + } + /** * Reserves space to store the given segment, only if it has not been done already. This can be used * when segment is already downloaded on the disk. Unlike {@link #reserve(String, DataSegment)}, this function diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 486ad46920b8..dcd5a1d151ef 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -30,7 +30,6 @@ import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.ReferenceCountingSegment; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable; @@ -217,7 +216,7 @@ private TableDataSource getTableDataSource(DataSourceAnalysis analysis) */ public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { - final Segment adapter = getAdapter(segment, lazy, loadFailed); + final ReferenceCountingSegment adapter = getSegmentReference(segment, lazy, loadFailed); final SettableSupplier resultSupplier = new SettableSupplier<>(); @@ -252,9 +251,7 @@ public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyL loadedIntervals.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk( - ReferenceCountingSegment.wrapSegment(adapter, segment.getShardSpec()) - ) + segment.getShardSpec().createChunk(adapter) ); dataSourceState.addSegment(segment); resultSupplier.set(true); @@ -268,21 +265,21 @@ public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyL return resultSupplier.get(); } - private Segment getAdapter(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException + private ReferenceCountingSegment getSegmentReference(final DataSegment dataSegment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { - final Segment adapter; + final ReferenceCountingSegment segment; try { - adapter = segmentLoader.getSegment(segment, lazy, loadFailed); + segment = segmentLoader.getSegment(dataSegment, lazy, loadFailed); } catch (SegmentLoadingException e) { - segmentLoader.cleanup(segment); + segmentLoader.cleanup(dataSegment); throw e; } - if (adapter == null) { - throw new SegmentLoadingException("Null adapter from loadSpec[%s]", segment.getLoadSpec()); + if (segment == null) { + throw new SegmentLoadingException("Null adapter from loadSpec[%s]", dataSegment.getLoadSpec()); } - return adapter; + return segment; } public void dropSegment(final DataSegment segment) diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java index a2686816325c..ca314b97b9c7 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentCacheManager.java @@ -47,6 +47,18 @@ public File getSegmentFiles(DataSegment segment) throw new UnsupportedOperationException(); } + @Override + public boolean reserve(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(DataSegment segment) + { + throw new UnsupportedOperationException(); + } + @Override public void cleanup(DataSegment segment) { diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index cf47755ba942..831a1a434b19 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.loading; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.StorageAdapter; @@ -33,9 +34,9 @@ public class CacheTestSegmentLoader implements SegmentLoader { @Override - public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) + public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { - return new Segment() + Segment baseSegment = new Segment() { @Override public SegmentId getId() @@ -66,6 +67,7 @@ public void close() { } }; + return ReferenceCountingSegment.wrapSegment(baseSegment, segment.getShardSpec()); } @Override diff --git a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java index 26c9cbdabf4c..5d116d12d96f 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerTest.java @@ -38,6 +38,7 @@ import java.io.File; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; public class SegmentLocalCacheManagerTest @@ -762,4 +763,151 @@ public void testGetSegmentFilesWhenDownloadStartMarkerExists() throws Exception Assert.assertFalse("Expect cache miss for corrupted segment file", manager.isSegmentCached(segmentToDownload)); Assert.assertFalse(cachedSegmentDir.exists()); } + + @Test + public void testReserveSegment() + { + final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L); + final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 200L, 0.0d); + final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d); + + manager = new SegmentLocalCacheManager( + Arrays.asList(secondLocation, firstLocation), + new SegmentLoaderConfig(), + new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)), + jsonMapper + ); + Assert.assertTrue(manager.reserve(dataSegment)); + Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertEquals(100L, firstLocation.availableSizeBytes()); + Assert.assertEquals(150L, secondLocation.availableSizeBytes()); + + // Reserving again should be no-op + Assert.assertTrue(manager.reserve(dataSegment)); + Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertEquals(100L, firstLocation.availableSizeBytes()); + Assert.assertEquals(150L, secondLocation.availableSizeBytes()); + + // Reserving a second segment should now go to a different location + final DataSegment otherSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D").withSize(100L); + Assert.assertTrue(manager.reserve(otherSegment)); + Assert.assertTrue(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertFalse(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(otherSegment, false))); + Assert.assertTrue(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(otherSegment, false))); + Assert.assertEquals(100L, firstLocation.availableSizeBytes()); + Assert.assertEquals(50L, secondLocation.availableSizeBytes()); + } + + @Test + public void testReserveNotEnoughSpace() + { + final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L); + final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 50L, 0.0d); + final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d); + + manager = new SegmentLocalCacheManager( + Arrays.asList(secondLocation, firstLocation), + new SegmentLoaderConfig(), + new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)), + jsonMapper + ); + + // should go to second location if first one doesn't have enough space + Assert.assertTrue(manager.reserve(dataSegment)); + Assert.assertTrue(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertEquals(50L, firstLocation.availableSizeBytes()); + Assert.assertEquals(50L, secondLocation.availableSizeBytes()); + + final DataSegment otherSegment = dataSegmentWithInterval("2014-10-21T00:00:00Z/P1D").withSize(100L); + Assert.assertFalse(manager.reserve(otherSegment)); + Assert.assertEquals(50L, firstLocation.availableSizeBytes()); + Assert.assertEquals(50L, secondLocation.availableSizeBytes()); + } + + @Test + public void testSegmentDownloadWhenLocationReserved() throws Exception + { + final List locationConfigs = new ArrayList<>(); + final StorageLocationConfig locationConfig = createStorageLocationConfig("local_storage_folder", 10000000000L, true); + final StorageLocationConfig locationConfig2 = createStorageLocationConfig("local_storage_folder2", 1000000000L, true); + final StorageLocationConfig locationConfig3 = createStorageLocationConfig("local_storage_folder3", 1000000000L, true); + locationConfigs.add(locationConfig); + locationConfigs.add(locationConfig2); + locationConfigs.add(locationConfig3); + + List locations = new ArrayList<>(); + for (StorageLocationConfig locConfig : locationConfigs) { + locations.add( + new StorageLocation( + locConfig.getPath(), + locConfig.getMaxSize(), + locConfig.getFreeSpacePercent() + ) + ); + } + + manager = new SegmentLocalCacheManager( + new SegmentLoaderConfig().withLocations(locationConfigs), + new RoundRobinStorageLocationSelectorStrategy(locations), + jsonMapper + ); + + StorageLocation location3 = manager.getLocations().get(2); + Assert.assertEquals(locationConfig3.getPath(), location3.getPath()); + final File segmentSrcFolder = tmpFolder.newFolder("segmentSrcFolder"); + + // Segment should be downloaded in local_storage_folder3 even if that is the third location + final DataSegment segmentToDownload = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withLoadSpec( + ImmutableMap.of( + "type", + "local", + "path", + segmentSrcFolder.getCanonicalPath() + + "/test_segment_loader" + + "/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z" + + "/0/index.zip" + ) + ); + String segmentDir = DataSegmentPusher.getDefaultStorageDir(segmentToDownload, false); + location3.reserve(segmentDir, segmentToDownload); + // manually create a local segment under segmentSrcFolder + createLocalSegmentFile(segmentSrcFolder, "test_segment_loader/2014-10-20T00:00:00.000Z_2014-10-21T00:00:00.000Z/2015-05-27T03:38:35.683Z/0"); + + Assert.assertFalse("Expect cache miss before downloading segment", manager.isSegmentCached(segmentToDownload)); + + File segmentFile = manager.getSegmentFiles(segmentToDownload); + Assert.assertTrue(segmentFile.getAbsolutePath().contains("/local_storage_folder3/")); + Assert.assertTrue("Expect cache hit after downloading segment", manager.isSegmentCached(segmentToDownload)); + + manager.cleanup(segmentToDownload); + Assert.assertFalse("Expect cache miss after dropping segment", manager.isSegmentCached(segmentToDownload)); + Assert.assertFalse(location3.isReserved(segmentDir)); + } + + @Test + public void testRelease() + { + final DataSegment dataSegment = dataSegmentWithInterval("2014-10-20T00:00:00Z/P1D").withSize(100L); + final StorageLocation firstLocation = new StorageLocation(localSegmentCacheFolder, 50L, 0.0d); + final StorageLocation secondLocation = new StorageLocation(localSegmentCacheFolder, 150L, 0.0d); + + manager = new SegmentLocalCacheManager( + Arrays.asList(secondLocation, firstLocation), + new SegmentLoaderConfig(), + new RoundRobinStorageLocationSelectorStrategy(Arrays.asList(firstLocation, secondLocation)), + jsonMapper + ); + + manager.reserve(dataSegment); + manager.release(dataSegment); + Assert.assertEquals(50L, firstLocation.availableSizeBytes()); + Assert.assertEquals(150L, secondLocation.availableSizeBytes()); + Assert.assertFalse(firstLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + Assert.assertFalse(secondLocation.isReserved(DataSegmentPusher.getDefaultStorageDir(dataSegment, false))); + + // calling release again should have no effect + manager.release(dataSegment); + Assert.assertEquals(50L, firstLocation.availableSizeBytes()); + Assert.assertEquals(150L, secondLocation.availableSizeBytes()); + } } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 8698146c9d53..83f0a8ff1e08 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -64,12 +64,12 @@ public class SegmentManagerTest private static final SegmentLoader SEGMENT_LOADER = new SegmentLoader() { @Override - public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) + public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) { - return new SegmentForTesting( + return ReferenceCountingSegment.wrapSegment(new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), (Interval) segment.getLoadSpec().get("interval") - ); + ), segment.getShardSpec()); } @Override diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 16832b3dfcb3..a76cc5b25bec 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -148,12 +148,12 @@ public void setUp() new SegmentLoader() { @Override - public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) + public ReferenceCountingSegment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { - return new SegmentForTesting( + return ReferenceCountingSegment.wrapSegment(new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), (Interval) segment.getLoadSpec().get("interval") - ); + ), segment.getShardSpec()); } @Override