diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 8ee2ab5d9b66..4754daac276a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1359,6 +1359,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| |`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores| |`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`| +|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead. One catch is that if historical crashes while in the process of downloading and creating segment files, it is possible to end up with a corrupted segment on disk, this requires manual intervention to delete corrupted files. When the flag is set to true, historical startup would complete successfully and queries using this segment would fail at runtime.|false| |`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2| In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise. diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c76bc9c0fbf4..45faff2cc409 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -1210,20 +1211,23 @@ private static class TestIndexIO extends IndexIO columnNames.add(ColumnHolder.TIME_COLUMN_NAME); columnNames.addAll(segment.getDimensions()); columnNames.addAll(segment.getMetrics()); - final Map columnMap = new HashMap<>(columnNames.size()); + final Map> columnMap = new HashMap<>(columnNames.size()); final List aggregatorFactories = new ArrayList<>(segment.getMetrics().size()); for (String columnName : columnNames) { if (MIXED_TYPE_COLUMN.equals(columnName)) { - columnMap.put(columnName, createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval()))); + ColumnHolder columnHolder = createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval())); + columnMap.put(columnName, () -> columnHolder); } else if (DIMENSIONS.containsKey(columnName)) { - columnMap.put(columnName, createColumn(DIMENSIONS.get(columnName))); + ColumnHolder columnHolder = createColumn(DIMENSIONS.get(columnName)); + columnMap.put(columnName, () -> columnHolder); } else { final Optional maybeMetric = AGGREGATORS.stream() .filter(agg -> agg.getName().equals(columnName)) .findAny(); if (maybeMetric.isPresent()) { - columnMap.put(columnName, createColumn(maybeMetric.get())); + ColumnHolder columnHolder = createColumn(maybeMetric.get()); + columnMap.put(columnName, () -> columnHolder); aggregatorFactories.add(maybeMetric.get()); } } @@ -1245,7 +1249,8 @@ private static class TestIndexIO extends IndexIO null, columnMap, null, - metadata + metadata, + false ) ); } @@ -1271,7 +1276,7 @@ void removeMetadata(File file) index.getColumns(), index.getFileMapper(), null, - index.getDimensionHandlers() + () -> index.getDimensionHandlers() ) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index c45c5ce09212..880d8065e174 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -275,7 +275,7 @@ private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir) final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()) .manufacturate(tempSegmentDir); try { - return loader.getSegment(dataSegment); + return loader.getSegment(dataSegment, false); } catch (SegmentLoadingException e) { throw new RuntimeException(e); diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index f577fe6fd61b..655807275737 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -24,7 +24,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -179,13 +181,17 @@ public void validateTwoSegments(final IndexableAdapter adapter1, final Indexable } public QueryableIndex loadIndex(File inDir) throws IOException + { + return loadIndex(inDir, false); + } + public QueryableIndex loadIndex(File inDir, boolean lazy) throws IOException { final int version = SegmentUtils.getVersionFromDir(inDir); final IndexLoader loader = indexLoaders.get(version); if (loader != null) { - return loader.load(inDir, mapper); + return loader.load(inDir, mapper, lazy); } else { throw new ISE("Unknown index version[%s]", version); } @@ -406,7 +412,7 @@ public MMappedIndex mapDir(File inDir) throws IOException interface IndexLoader { - QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException; + QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException; } static class LegacyIndexLoader implements IndexLoader @@ -421,11 +427,11 @@ static class LegacyIndexLoader implements IndexLoader } @Override - public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException + public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException { MMappedIndex index = legacyHandler.mapDir(inDir); - Map columns = new HashMap<>(); + Map> columns = new HashMap<>(); for (String dimension : index.getAvailableDimensions()) { ColumnBuilder builder = new ColumnBuilder() @@ -449,61 +455,61 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException if (index.getSpatialIndexes().get(dimension) != null) { builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(index.getSpatialIndexes().get(dimension))); } - columns.put( - dimension, - builder.build() - ); + columns.put(dimension, getColumnHolderSupplier(builder, lazy)); } for (String metric : index.getAvailableMetrics()) { final MetricHolder metricHolder = index.getMetricHolder(metric); if (metricHolder.getType() == MetricHolder.MetricType.FLOAT) { - columns.put( - metric, - new ColumnBuilder() - .setType(ValueType.FLOAT) - .setNumericColumnSupplier( - new FloatNumericColumnSupplier( - metricHolder.floatType, - LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() - ) + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.FLOAT) + .setNumericColumnSupplier( + new FloatNumericColumnSupplier( + metricHolder.floatType, + LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() ) - .build() - ); + ); + columns.put(metric, getColumnHolderSupplier(builder, lazy)); } else if (metricHolder.getType() == MetricHolder.MetricType.COMPLEX) { - columns.put( - metric, - new ColumnBuilder() - .setType(ValueType.COMPLEX) - .setComplexColumnSupplier( - new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType) - ) - .build() - ); + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.COMPLEX) + .setComplexColumnSupplier( + new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType) + ); + columns.put(metric, getColumnHolderSupplier(builder, lazy)); } } - columns.put( - ColumnHolder.TIME_COLUMN_NAME, - new ColumnBuilder() - .setType(ValueType.LONG) - .setNumericColumnSupplier( - new LongNumericColumnSupplier( - index.timestamps, - LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() - ) + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.LONG) + .setNumericColumnSupplier( + new LongNumericColumnSupplier( + index.timestamps, + LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() ) - .build() - ); + ); + columns.put(ColumnHolder.TIME_COLUMN_NAME, getColumnHolderSupplier(builder, lazy)); + return new SimpleQueryableIndex( index.getDataInterval(), index.getAvailableDimensions(), new ConciseBitmapFactory(), columns, index.getFileMapper(), - null + null, + lazy ); } + + private Supplier getColumnHolderSupplier(ColumnBuilder builder, boolean lazy) + { + if (lazy) { + return Suppliers.memoize(() -> builder.build()); + } else { + ColumnHolder columnHolder = builder.build(); + return () -> columnHolder; + } + } } static class V9IndexLoader implements IndexLoader @@ -516,7 +522,7 @@ static class V9IndexLoader implements IndexLoader } @Override - public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException + public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException { log.debug("Mapping v9 index[%s]", inDir); long startTime = System.currentTimeMillis(); @@ -576,17 +582,51 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException } } - Map columns = new HashMap<>(); + Map> columns = new HashMap<>(); for (String columnName : cols) { if (Strings.isNullOrEmpty(columnName)) { log.warn("Null or Empty Dimension found in the file : " + inDir); continue; } - columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName), smooshedFiles)); + + ByteBuffer colBuffer = smooshedFiles.mapFile(columnName); + + if (lazy) { + columns.put(columnName, Suppliers.memoize( + () -> { + try { + return deserializeColumn(mapper, colBuffer, smooshedFiles); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + )); + } else { + ColumnHolder columnHolder = deserializeColumn(mapper, colBuffer, smooshedFiles); + columns.put(columnName, () -> columnHolder); + } + } - columns.put(ColumnHolder.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), smooshedFiles)); + ByteBuffer timeBuffer = smooshedFiles.mapFile("__time"); + + if (lazy) { + columns.put(ColumnHolder.TIME_COLUMN_NAME, Suppliers.memoize( + () -> { + try { + return deserializeColumn(mapper, timeBuffer, smooshedFiles); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + )); + } else { + ColumnHolder columnHolder = deserializeColumn(mapper, timeBuffer, smooshedFiles); + columns.put(ColumnHolder.TIME_COLUMN_NAME, () -> columnHolder); + } final QueryableIndex index = new SimpleQueryableIndex( dataInterval, @@ -594,7 +634,8 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, - metadata + metadata, + lazy ); log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java index bd0d5fe5105b..6cd70636d7ec 100644 --- a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java @@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import org.apache.druid.collections.bitmap.BitmapFactory; @@ -42,19 +44,20 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde private final List columnNames; private final Indexed availableDimensions; private final BitmapFactory bitmapFactory; - private final Map columns; + private final Map> columns; private final SmooshedFileMapper fileMapper; @Nullable private final Metadata metadata; - private final Map dimensionHandlers; + private final Supplier> dimensionHandlers; public SimpleQueryableIndex( Interval dataInterval, Indexed dimNames, BitmapFactory bitmapFactory, - Map columns, + Map> columns, SmooshedFileMapper fileMapper, - @Nullable Metadata metadata + @Nullable Metadata metadata, + boolean lazy ) { Preconditions.checkNotNull(columns.get(ColumnHolder.TIME_COLUMN_NAME)); @@ -71,8 +74,27 @@ public SimpleQueryableIndex( this.columns = columns; this.fileMapper = fileMapper; this.metadata = metadata; - this.dimensionHandlers = Maps.newLinkedHashMap(); - initDimensionHandlers(); + + if (lazy) { + this.dimensionHandlers = Suppliers.memoize(() -> { + Map dimensionHandlerMap = Maps.newLinkedHashMap(); + for (String dim : availableDimensions) { + ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + dimensionHandlerMap.put(dim, handler); + } + return dimensionHandlerMap; + } + ); + } else { + Map dimensionHandlerMap = Maps.newLinkedHashMap(); + for (String dim : availableDimensions) { + ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + dimensionHandlerMap.put(dim, handler); + } + this.dimensionHandlers = () -> dimensionHandlerMap; + } } @VisibleForTesting @@ -81,10 +103,10 @@ public SimpleQueryableIndex( List columnNames, Indexed availableDimensions, BitmapFactory bitmapFactory, - Map columns, + Map> columns, SmooshedFileMapper fileMapper, @Nullable Metadata metadata, - Map dimensionHandlers + Supplier> dimensionHandlers ) { this.dataInterval = interval; @@ -106,7 +128,7 @@ public Interval getDataInterval() @Override public int getNumRows() { - return columns.get(ColumnHolder.TIME_COLUMN_NAME).getLength(); + return columns.get(ColumnHolder.TIME_COLUMN_NAME).get().getLength(); } @Override @@ -137,11 +159,12 @@ public BitmapFactory getBitmapFactoryForDimensions() @Override public ColumnHolder getColumnHolder(String columnName) { - return columns.get(columnName); + Supplier columnHolderSupplier = columns.get(columnName); + return columnHolderSupplier == null ? null : columnHolderSupplier.get(); } @VisibleForTesting - public Map getColumns() + public Map> getColumns() { return columns; } @@ -167,15 +190,7 @@ public Metadata getMetadata() @Override public Map getDimensionHandlers() { - return dimensionHandlers; + return dimensionHandlers.get(); } - private void initDimensionHandlers() - { - for (String dim : availableDimensions) { - ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); - dimensionHandlers.put(dim, handler); - } - } } diff --git a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java index d3cd9db84335..ea5290e204c0 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java @@ -42,10 +42,10 @@ public MMappedQueryableSegmentizerFactory(@JacksonInject IndexIO indexIO) } @Override - public Segment factorize(DataSegment dataSegment, File parentDir) throws SegmentLoadingException + public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException { try { - return new QueryableIndexSegment(indexIO.loadIndex(parentDir), dataSegment.getId()); + return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()); } catch (IOException e) { throw new SegmentLoadingException(e, "%s", e.getMessage()); diff --git a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java index 5bf17a6ea268..09bc048448b8 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java @@ -31,5 +31,5 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class) public interface SegmentizerFactory { - Segment factorize(DataSegment segment, File parentDir) throws SegmentLoadingException; + Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException; } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java index 301e73702294..f63024a58cc6 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java @@ -31,7 +31,7 @@ public interface SegmentLoader { boolean isSegmentLoaded(DataSegment segment); - Segment getSegment(DataSegment segment) throws SegmentLoadingException; + Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException; File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; void cleanup(DataSegment segment); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index 58eb77317e5b..c6c57233738c 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -37,6 +37,9 @@ public class SegmentLoaderConfig @NotEmpty private List locations = null; + @JsonProperty("lazyLoadOnStart") + private boolean lazyLoadOnStart = false; + @JsonProperty("deleteOnRemove") private boolean deleteOnRemove = true; @@ -66,6 +69,11 @@ public List getLocations() return locations; } + public boolean isLazyLoadOnStart() + { + return lazyLoadOnStart; + } + public boolean isDeleteOnRemove() { return deleteOnRemove; diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 4a8ce98923f0..50a4b6a20e4c 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -121,7 +121,7 @@ private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) } @Override - public Segment getSegment(DataSegment segment) throws SegmentLoadingException + public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException { final ReferenceCountingLock lock = createOrGetLock(segment); final File segmentFiles; @@ -147,7 +147,7 @@ public Segment getSegment(DataSegment segment) throws SegmentLoadingException factory = new MMappedQueryableSegmentizerFactory(indexIO); } - return factory.factorize(segment, segmentFiles); + return factory.factorize(segment, segmentFiles, lazy); } @Override diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 04c84aa51ab1..dbb48474defc 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -146,13 +146,15 @@ public VersionedIntervalTimeline getTimeline(S * * @param segment segment to load * + * @param lazy whether to lazy load columns metadata + * * @return true if the segment was newly loaded, false if it was already loaded * * @throws SegmentLoadingException if the segment cannot be loaded */ - public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException + public boolean loadSegment(final DataSegment segment, boolean lazy) throws SegmentLoadingException { - final Segment adapter = getAdapter(segment); + final Segment adapter = getAdapter(segment, lazy); final SettableSupplier resultSupplier = new SettableSupplier<>(); @@ -189,11 +191,11 @@ public boolean loadSegment(final DataSegment segment) throws SegmentLoadingExcep return resultSupplier.get(); } - private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException + private Segment getAdapter(final DataSegment segment, boolean lazy) throws SegmentLoadingException { final Segment adapter; try { - adapter = segmentLoader.getSegment(segment); + adapter = segmentLoader.getSegment(segment, lazy); } catch (SegmentLoadingException e) { segmentLoader.cleanup(segment); diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index cb08182e37f6..308c0d477d47 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -252,11 +252,11 @@ private void loadLocalCache() * * @throws SegmentLoadingException if it fails to load the given segment */ - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) throws SegmentLoadingException { final boolean loaded; try { - loaded = segmentManager.loadSegment(segment); + loaded = segmentManager.loadSegment(segment, lazy); } catch (Exception e) { removeSegment(segment, callback, false); @@ -304,7 +304,7 @@ each time when addSegment() is called, it has to wait for the lock in order to m segmentsToDelete.remove(segment); } } - loadSegment(segment, DataSegmentChangeCallback.NOOP); + loadSegment(segment, DataSegmentChangeCallback.NOOP, false); // announce segment even if the segment file already exists. try { announcer.announceSegment(segment); @@ -351,7 +351,7 @@ private void addSegments(Collection segments, final DataSegmentChan numSegments, segment.getId() ); - loadSegment(segment, callback); + loadSegment(segment, callback, config.isLazyLoadOnStart()); try { backgroundSegmentAnnouncer.announceSegment(segment); } diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index 0b291d402106..e604b29f7631 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -47,7 +47,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new AbstractSegment() { diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 317e9c7f3aca..b6c9c1671fc3 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -66,7 +66,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), @@ -219,7 +219,7 @@ public void testLoadSegment() throws ExecutionException, InterruptedException, S final List> futures = SEGMENTS.stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -235,7 +235,7 @@ public void testLoadSegment() throws ExecutionException, InterruptedException, S public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException { for (DataSegment eachSegment : SEGMENTS) { - Assert.assertTrue(segmentManager.loadSegment(eachSegment)); + Assert.assertTrue(segmentManager.loadSegment(eachSegment, false)); } final List> futures = ImmutableList.of(SEGMENTS.get(0), SEGMENTS.get(2)).stream() @@ -261,14 +261,14 @@ public void testDropSegment() throws SegmentLoadingException, ExecutionException @Test public void testLoadDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException { - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0))); - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2))); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false)); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2), false)); final List> loadFutures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4)) .stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -299,10 +299,10 @@ public void testLoadDropSegment() throws SegmentLoadingException, ExecutionExcep public void testLoadDuplicatedSegmentsSequentially() throws SegmentLoadingException { for (DataSegment segment : SEGMENTS) { - Assert.assertTrue(segmentManager.loadSegment(segment)); + Assert.assertTrue(segmentManager.loadSegment(segment, false)); } // try to load an existing segment - Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0))); + Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0), false)); assertResult(SEGMENTS); } @@ -315,7 +315,7 @@ public void testLoadDuplicatedSegmentsInParallel() .stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -336,7 +336,7 @@ public void testLoadDuplicatedSegmentsInParallel() @Test public void testNonExistingSegmentsSequentially() throws SegmentLoadingException { - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0))); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false)); // try to drop a non-existing segment of different data source segmentManager.dropSegment(SEGMENTS.get(2)); @@ -349,7 +349,7 @@ public void testNonExistingSegmentsSequentially() throws SegmentLoadingException public void testNonExistingSegmentsInParallel() throws SegmentLoadingException, ExecutionException, InterruptedException { - segmentManager.loadSegment(SEGMENTS.get(0)); + segmentManager.loadSegment(SEGMENTS.get(0), false); final List> futures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(2)) .stream() .map( @@ -372,7 +372,7 @@ public void testNonExistingSegmentsInParallel() @Test public void testRemoveEmptyTimeline() throws SegmentLoadingException { - segmentManager.loadSegment(SEGMENTS.get(0)); + segmentManager.loadSegment(SEGMENTS.get(0), false); assertResult(ImmutableList.of(SEGMENTS.get(0))); Assert.assertEquals(1, segmentManager.getDataSources().size()); segmentManager.dropSegment(SEGMENTS.get(0)); @@ -406,7 +406,7 @@ public void testLoadAndDropNonRootGenerationSegment() throws SegmentLoadingExcep 10 ); - segmentManager.loadSegment(segment); + segmentManager.loadSegment(segment, false); assertResult(ImmutableList.of(segment)); segmentManager.dropSegment(segment); @@ -434,7 +434,7 @@ private void assertResult(List expectedExistingSegments) throws Seg segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk( - ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment), segment.getShardSpec()) + ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment, false), segment.getShardSpec()) ) ); } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java index fdd89d1962ca..638852fa3c2c 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java @@ -129,7 +129,7 @@ public void testLoadSameSegment() throws IOException, ExecutionException, Interr final DataSegment segment = createSegment("2019-01-01/2019-01-02"); final List futures = IntStream .range(0, 16) - .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment))) + .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment, false))) .collect(Collectors.toList()); for (Future future : futures) { future.get(); @@ -154,7 +154,7 @@ public void testLoadMultipleSegments() throws IOException, ExecutionException, I .mapToObj(i -> exec.submit(() -> { for (DataSegment segment : segments) { try { - segmentManager.loadSegment(segment); + segmentManager.loadSegment(segment, false); } catch (SegmentLoadingException e) { throw new RuntimeException(e); @@ -222,7 +222,7 @@ public long size() private static class TestSegmentizerFactory implements SegmentizerFactory { @Override - public Segment factorize(DataSegment segment, File parentDir) + public Segment factorize(DataSegment segment, File parentDir, boolean lazy) { return new Segment() { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 6be9055be70a..24828e2bbaac 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -121,7 +121,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), @@ -478,7 +478,8 @@ public void loadQueryable(String dataSource, String version, Interval interval) NoneShardSpec.instance(), IndexIO.CURRENT_VERSION_ID, 123L - ) + ), + false ); } catch (SegmentLoadingException e) {