From 6f964e148dbee6b496e68b39cc036bdbed8e1295 Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Sat, 2 Feb 2019 11:10:41 +0800 Subject: [PATCH 1/7] historical fast restart by lazy load columns metadata --- .../common/task/CompactionTaskTest.java | 17 ++- .../task/SameIntervalMergeTaskTest.java | 2 +- .../org/apache/druid/segment/IndexIO.java | 143 ++++++++++++------ .../druid/segment/SimpleQueryableIndex.java | 55 ++++--- .../MMappedQueryableSegmentizerFactory.java | 4 +- .../segment/loading/SegmentizerFactory.java | 2 +- .../druid/segment/loading/SegmentLoader.java | 2 +- .../segment/loading/SegmentLoaderConfig.java | 8 + .../SegmentLoaderLocalCacheManager.java | 4 +- .../apache/druid/server/SegmentManager.java | 10 +- .../coordination/SegmentLoadDropHandler.java | 8 +- .../loading/CacheTestSegmentLoader.java | 2 +- .../druid/server/SegmentManagerTest.java | 26 ++-- .../coordination/ServerManagerTest.java | 5 +- 14 files changed, 186 insertions(+), 102 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index f759cc7c1c62..1c3162970a65 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -1312,20 +1313,23 @@ private static class TestIndexIO extends IndexIO columnNames.add(ColumnHolder.TIME_COLUMN_NAME); columnNames.addAll(segment.getDimensions()); columnNames.addAll(segment.getMetrics()); - final Map columnMap = new HashMap<>(columnNames.size()); + final Map> columnMap = new HashMap<>(columnNames.size()); final List aggregatorFactories = new ArrayList<>(segment.getMetrics().size()); for (String columnName : columnNames) { if (MIXED_TYPE_COLUMN.equals(columnName)) { - columnMap.put(columnName, createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval()))); + ColumnHolder columnHolder = createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval())); + columnMap.put(columnName, () -> columnHolder); } else if (DIMENSIONS.containsKey(columnName)) { - columnMap.put(columnName, createColumn(DIMENSIONS.get(columnName))); + ColumnHolder columnHolder = createColumn(DIMENSIONS.get(columnName)); + columnMap.put(columnName, () -> columnHolder); } else { final Optional maybeMetric = AGGREGATORS.stream() .filter(agg -> agg.getName().equals(columnName)) .findAny(); if (maybeMetric.isPresent()) { - columnMap.put(columnName, createColumn(maybeMetric.get())); + ColumnHolder columnHolder = createColumn(maybeMetric.get()); + columnMap.put(columnName, () -> columnHolder); aggregatorFactories.add(maybeMetric.get()); } } @@ -1347,7 +1351,8 @@ private static class TestIndexIO extends IndexIO null, columnMap, null, - metadata + metadata, + false ) ); } @@ -1373,7 +1378,7 @@ void removeMetadata(File file) index.getColumns(), index.getFileMapper(), null, - index.getDimensionHandlers() + () -> index.getDimensionHandlers() ) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SameIntervalMergeTaskTest.java index 6284503d9010..f740c8cc4ec7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -230,7 +230,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(DataSegment segment) + public Segment getSegment(DataSegment segment, boolean lazy) { return null; } diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index 2293991c5515..53d5b3580040 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -24,7 +24,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -179,13 +181,17 @@ public void validateTwoSegments(final IndexableAdapter adapter1, final Indexable } public QueryableIndex loadIndex(File inDir) throws IOException + { + return loadIndex(inDir, false); + } + public QueryableIndex loadIndex(File inDir, boolean lazy) throws IOException { final int version = SegmentUtils.getVersionFromDir(inDir); final IndexLoader loader = indexLoaders.get(version); if (loader != null) { - return loader.load(inDir, mapper); + return loader.load(inDir, mapper, lazy); } else { throw new ISE("Unknown index version[%s]", version); } @@ -406,7 +412,7 @@ public MMappedIndex mapDir(File inDir) throws IOException interface IndexLoader { - QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException; + QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException; } static class LegacyIndexLoader implements IndexLoader @@ -421,11 +427,11 @@ static class LegacyIndexLoader implements IndexLoader } @Override - public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException + public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException { MMappedIndex index = legacyHandler.mapDir(inDir); - Map columns = new HashMap<>(); + Map> columns = new HashMap<>(); for (String dimension : index.getAvailableDimensions()) { ColumnBuilder builder = new ColumnBuilder() @@ -449,59 +455,71 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException if (index.getSpatialIndexes().get(dimension) != null) { builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(index.getSpatialIndexes().get(dimension))); } - columns.put( - dimension, - builder.build() - ); + if (lazy) { + columns.put(dimension, Suppliers.memoize(() -> builder.build())); + } else { + ColumnHolder columnHolder = builder.build(); + columns.put(dimension, () -> columnHolder); + } + } for (String metric : index.getAvailableMetrics()) { final MetricHolder metricHolder = index.getMetricHolder(metric); if (metricHolder.getType() == MetricHolder.MetricType.FLOAT) { - columns.put( - metric, - new ColumnBuilder() - .setType(ValueType.FLOAT) - .setNumericColumnSupplier( - new FloatNumericColumnSupplier( - metricHolder.floatType, - LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() - ) + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.FLOAT) + .setNumericColumnSupplier( + new FloatNumericColumnSupplier( + metricHolder.floatType, + LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() ) - .build() - ); + ); + if (lazy) { + columns.put(metric, Suppliers.memoize(() -> builder.build())); + } else { + ColumnHolder columnHolder = builder.build(); + columns.put(metric, () -> columnHolder); + } + } else if (metricHolder.getType() == MetricHolder.MetricType.COMPLEX) { - columns.put( - metric, - new ColumnBuilder() - .setType(ValueType.COMPLEX) - .setComplexColumnSupplier( - new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType) - ) - .build() - ); + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.COMPLEX) + .setComplexColumnSupplier( + new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType) + ); + if (lazy) { + columns.put(metric, Suppliers.memoize(() -> builder.build())); + } else { + ColumnHolder columnHolder = builder.build(); + columns.put(metric, () -> columnHolder); + } } } - columns.put( - ColumnHolder.TIME_COLUMN_NAME, - new ColumnBuilder() - .setType(ValueType.LONG) - .setNumericColumnSupplier( - new LongNumericColumnSupplier( - index.timestamps, - LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() - ) + ColumnBuilder builder = new ColumnBuilder() + .setType(ValueType.LONG) + .setNumericColumnSupplier( + new LongNumericColumnSupplier( + index.timestamps, + LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() ) - .build() - ); + ); + if (lazy) { + columns.put(ColumnHolder.TIME_COLUMN_NAME, Suppliers.memoize(() -> builder.build())); + } else { + ColumnHolder columnHolder = builder.build(); + columns.put(ColumnHolder.TIME_COLUMN_NAME, () -> columnHolder); + } + return new SimpleQueryableIndex( index.getDataInterval(), index.getAvailableDimensions(), new ConciseBitmapFactory(), columns, index.getFileMapper(), - null + null, + lazy ); } } @@ -516,7 +534,7 @@ static class V9IndexLoader implements IndexLoader } @Override - public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException + public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException { log.debug("Mapping v9 index[%s]", inDir); long startTime = System.currentTimeMillis(); @@ -576,17 +594,51 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException } } - Map columns = new HashMap<>(); + Map> columns = new HashMap<>(); for (String columnName : cols) { if (Strings.isNullOrEmpty(columnName)) { log.warn("Null or Empty Dimension found in the file : " + inDir); continue; } - columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName), smooshedFiles)); + + ByteBuffer colBuffer = smooshedFiles.mapFile(columnName); + + if (lazy) { + columns.put(columnName, Suppliers.memoize( + () -> { + try { + return deserializeColumn(mapper, colBuffer, smooshedFiles); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + )); + } else { + ColumnHolder columnHolder = deserializeColumn(mapper, colBuffer, smooshedFiles); + columns.put(columnName, () -> columnHolder); + } + } - columns.put(ColumnHolder.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), smooshedFiles)); + ByteBuffer timeBuffer = smooshedFiles.mapFile("__time"); + + if (lazy) { + columns.put(ColumnHolder.TIME_COLUMN_NAME, Suppliers.memoize( + () -> { + try { + return deserializeColumn(mapper, timeBuffer, smooshedFiles); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + )); + } else { + ColumnHolder columnHolder = deserializeColumn(mapper, timeBuffer, smooshedFiles); + columns.put(ColumnHolder.TIME_COLUMN_NAME, () -> columnHolder); + } final QueryableIndex index = new SimpleQueryableIndex( dataInterval, @@ -594,7 +646,8 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, - metadata + metadata, + lazy ); log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java index bd0d5fe5105b..6cd70636d7ec 100644 --- a/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/SimpleQueryableIndex.java @@ -21,6 +21,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import org.apache.druid.collections.bitmap.BitmapFactory; @@ -42,19 +44,20 @@ public class SimpleQueryableIndex extends AbstractIndex implements QueryableInde private final List columnNames; private final Indexed availableDimensions; private final BitmapFactory bitmapFactory; - private final Map columns; + private final Map> columns; private final SmooshedFileMapper fileMapper; @Nullable private final Metadata metadata; - private final Map dimensionHandlers; + private final Supplier> dimensionHandlers; public SimpleQueryableIndex( Interval dataInterval, Indexed dimNames, BitmapFactory bitmapFactory, - Map columns, + Map> columns, SmooshedFileMapper fileMapper, - @Nullable Metadata metadata + @Nullable Metadata metadata, + boolean lazy ) { Preconditions.checkNotNull(columns.get(ColumnHolder.TIME_COLUMN_NAME)); @@ -71,8 +74,27 @@ public SimpleQueryableIndex( this.columns = columns; this.fileMapper = fileMapper; this.metadata = metadata; - this.dimensionHandlers = Maps.newLinkedHashMap(); - initDimensionHandlers(); + + if (lazy) { + this.dimensionHandlers = Suppliers.memoize(() -> { + Map dimensionHandlerMap = Maps.newLinkedHashMap(); + for (String dim : availableDimensions) { + ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + dimensionHandlerMap.put(dim, handler); + } + return dimensionHandlerMap; + } + ); + } else { + Map dimensionHandlerMap = Maps.newLinkedHashMap(); + for (String dim : availableDimensions) { + ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); + DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); + dimensionHandlerMap.put(dim, handler); + } + this.dimensionHandlers = () -> dimensionHandlerMap; + } } @VisibleForTesting @@ -81,10 +103,10 @@ public SimpleQueryableIndex( List columnNames, Indexed availableDimensions, BitmapFactory bitmapFactory, - Map columns, + Map> columns, SmooshedFileMapper fileMapper, @Nullable Metadata metadata, - Map dimensionHandlers + Supplier> dimensionHandlers ) { this.dataInterval = interval; @@ -106,7 +128,7 @@ public Interval getDataInterval() @Override public int getNumRows() { - return columns.get(ColumnHolder.TIME_COLUMN_NAME).getLength(); + return columns.get(ColumnHolder.TIME_COLUMN_NAME).get().getLength(); } @Override @@ -137,11 +159,12 @@ public BitmapFactory getBitmapFactoryForDimensions() @Override public ColumnHolder getColumnHolder(String columnName) { - return columns.get(columnName); + Supplier columnHolderSupplier = columns.get(columnName); + return columnHolderSupplier == null ? null : columnHolderSupplier.get(); } @VisibleForTesting - public Map getColumns() + public Map> getColumns() { return columns; } @@ -167,15 +190,7 @@ public Metadata getMetadata() @Override public Map getDimensionHandlers() { - return dimensionHandlers; + return dimensionHandlers.get(); } - private void initDimensionHandlers() - { - for (String dim : availableDimensions) { - ColumnCapabilities capabilities = getColumnHolder(dim).getCapabilities(); - DimensionHandler handler = DimensionHandlerUtils.getHandlerFromCapabilities(dim, capabilities, null); - dimensionHandlers.put(dim, handler); - } - } } diff --git a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java index d3cd9db84335..ea5290e204c0 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java @@ -42,10 +42,10 @@ public MMappedQueryableSegmentizerFactory(@JacksonInject IndexIO indexIO) } @Override - public Segment factorize(DataSegment dataSegment, File parentDir) throws SegmentLoadingException + public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException { try { - return new QueryableIndexSegment(indexIO.loadIndex(parentDir), dataSegment.getId()); + return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()); } catch (IOException e) { throw new SegmentLoadingException(e, "%s", e.getMessage()); diff --git a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java index 5bf17a6ea268..09bc048448b8 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java @@ -31,5 +31,5 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class) public interface SegmentizerFactory { - Segment factorize(DataSegment segment, File parentDir) throws SegmentLoadingException; + Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException; } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java index 3db4672f3fb1..b4571649bb53 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java @@ -29,7 +29,7 @@ public interface SegmentLoader { boolean isSegmentLoaded(DataSegment segment); - Segment getSegment(DataSegment segment) throws SegmentLoadingException; + Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException; File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; void cleanup(DataSegment segment); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java index b09bd0c5bea8..15ae83f21c8e 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderConfig.java @@ -36,6 +36,9 @@ public class SegmentLoaderConfig @NotEmpty private List locations = null; + @JsonProperty("lazyLoadOnStart") + private boolean lazyLoadOnStart = false; + @JsonProperty("deleteOnRemove") private boolean deleteOnRemove = true; @@ -62,6 +65,11 @@ public List getLocations() return locations; } + public boolean isLazyLoadOnStart() + { + return lazyLoadOnStart; + } + public boolean isDeleteOnRemove() { return deleteOnRemove; diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 68e8a20b62b5..7aecaec4124d 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -102,7 +102,7 @@ private StorageLocation findStorageLocationIfLoaded(final DataSegment segment) } @Override - public Segment getSegment(DataSegment segment) throws SegmentLoadingException + public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException { File segmentFiles = getSegmentFiles(segment); File factoryJson = new File(segmentFiles, "factory.json"); @@ -119,7 +119,7 @@ public Segment getSegment(DataSegment segment) throws SegmentLoadingException factory = new MMappedQueryableSegmentizerFactory(indexIO); } - return factory.factorize(segment, segmentFiles); + return factory.factorize(segment, segmentFiles, lazy); } @Override diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index a1975ffb7a8b..d1561fedb441 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -148,13 +148,15 @@ public VersionedIntervalTimeline getTimeline(S * * @param segment segment to load * + * @param lazy whether to lazy load columns metadata + * * @return true if the segment was newly loaded, false if it was already loaded * * @throws SegmentLoadingException if the segment cannot be loaded */ - public boolean loadSegment(final DataSegment segment) throws SegmentLoadingException + public boolean loadSegment(final DataSegment segment, boolean lazy) throws SegmentLoadingException { - final Segment adapter = getAdapter(segment); + final Segment adapter = getAdapter(segment, lazy); final SettableSupplier resultSupplier = new SettableSupplier<>(); @@ -189,11 +191,11 @@ public boolean loadSegment(final DataSegment segment) throws SegmentLoadingExcep return resultSupplier.get(); } - private Segment getAdapter(final DataSegment segment) throws SegmentLoadingException + private Segment getAdapter(final DataSegment segment, boolean lazy) throws SegmentLoadingException { final Segment adapter; try { - adapter = segmentLoader.getSegment(segment); + adapter = segmentLoader.getSegment(segment, lazy); } catch (SegmentLoadingException e) { segmentLoader.cleanup(segment); diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 53b12c9aee5f..532ec423e335 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -252,11 +252,11 @@ public void execute() * * @throws SegmentLoadingException */ - private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback) throws SegmentLoadingException + private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback, boolean lazy) throws SegmentLoadingException { final boolean loaded; try { - loaded = segmentManager.loadSegment(segment); + loaded = segmentManager.loadSegment(segment, lazy); } catch (Exception e) { removeSegment(segment, callback, false); @@ -304,7 +304,7 @@ each time when addSegment() is called, it has to wait for the lock in order to m segmentsToDelete.remove(segment); } } - loadSegment(segment, DataSegmentChangeCallback.NOOP); + loadSegment(segment, DataSegmentChangeCallback.NOOP, false); try { announcer.announceSegment(segment); } @@ -354,7 +354,7 @@ public void run() numSegments, segment.getId() ); - loadSegment(segment, callback); + loadSegment(segment, callback, config.isLazyLoadOnStart()); try { backgroundSegmentAnnouncer.announceSegment(segment); } diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index 5723359b77ad..01d25ffd2f7b 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -48,7 +48,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new AbstractSegment() { diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index 826f249a7b0a..d6fa4eda8dcd 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -64,7 +64,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), @@ -217,7 +217,7 @@ public void testLoadSegment() throws ExecutionException, InterruptedException, S final List> futures = segments.stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -233,7 +233,7 @@ public void testLoadSegment() throws ExecutionException, InterruptedException, S public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException { for (DataSegment eachSegment : segments) { - Assert.assertTrue(segmentManager.loadSegment(eachSegment)); + Assert.assertTrue(segmentManager.loadSegment(eachSegment, false)); } final List> futures = ImmutableList.of(segments.get(0), segments.get(2)).stream() @@ -259,14 +259,14 @@ public void testDropSegment() throws SegmentLoadingException, ExecutionException @Test public void testLoadDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException { - Assert.assertTrue(segmentManager.loadSegment(segments.get(0))); - Assert.assertTrue(segmentManager.loadSegment(segments.get(2))); + Assert.assertTrue(segmentManager.loadSegment(segments.get(0), false)); + Assert.assertTrue(segmentManager.loadSegment(segments.get(2), false)); final List> loadFutures = ImmutableList.of(segments.get(1), segments.get(3), segments.get(4)) .stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -297,10 +297,10 @@ public void testLoadDropSegment() throws SegmentLoadingException, ExecutionExcep public void testLoadDuplicatedSegmentsSequentially() throws SegmentLoadingException { for (DataSegment segment : segments) { - Assert.assertTrue(segmentManager.loadSegment(segment)); + Assert.assertTrue(segmentManager.loadSegment(segment, false)); } // try to load an existing segment - Assert.assertFalse(segmentManager.loadSegment(segments.get(0))); + Assert.assertFalse(segmentManager.loadSegment(segments.get(0), false)); assertResult(segments); } @@ -312,7 +312,7 @@ public void testLoadDuplicatedSegmentsInParallel() final List> futures = ImmutableList.of(segments.get(0), segments.get(0), segments.get(0)).stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment) + () -> segmentManager.loadSegment(segment, false) ) ) .collect(Collectors.toList()); @@ -333,7 +333,7 @@ public void testLoadDuplicatedSegmentsInParallel() @Test public void testNonExistingSegmentsSequentially() throws SegmentLoadingException { - Assert.assertTrue(segmentManager.loadSegment(segments.get(0))); + Assert.assertTrue(segmentManager.loadSegment(segments.get(0), false)); // try to drop a non-existing segment of different data source segmentManager.dropSegment(segments.get(2)); @@ -346,7 +346,7 @@ public void testNonExistingSegmentsSequentially() throws SegmentLoadingException public void testNonExistingSegmentsInParallel() throws SegmentLoadingException, ExecutionException, InterruptedException { - segmentManager.loadSegment(segments.get(0)); + segmentManager.loadSegment(segments.get(0), false); final List> futures = ImmutableList.of(segments.get(1), segments.get(2)).stream() .map( segment -> executor.submit( @@ -368,7 +368,7 @@ public void testNonExistingSegmentsInParallel() @Test public void testRemoveEmptyTimeline() throws SegmentLoadingException { - segmentManager.loadSegment(segments.get(0)); + segmentManager.loadSegment(segments.get(0), false); assertResult(ImmutableList.of(segments.get(0))); Assert.assertEquals(1, segmentManager.getDataSources().size()); segmentManager.dropSegment(segments.get(0)); @@ -401,7 +401,7 @@ private void assertResult(List expectedExistingSegments) throws Seg expectedTimeline.add( segment.getInterval(), segment.getVersion(), - segment.getShardSpec().createChunk(new ReferenceCountingSegment(segmentLoader.getSegment(segment))) + segment.getShardSpec().createChunk(new ReferenceCountingSegment(segmentLoader.getSegment(segment, false))) ); } diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index e1399219ce04..893275291594 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -123,7 +123,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment) + public Segment getSegment(final DataSegment segment, boolean lazy) { return new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), @@ -481,7 +481,8 @@ public void loadQueryable(String dataSource, String version, Interval interval) NoneShardSpec.instance(), IndexIO.CURRENT_VERSION_ID, 123L - ) + ), + false ); } catch (SegmentLoadingException e) { From 7d11ca24a55a48cac9d989ed214119cdbed3ab25 Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Fri, 26 Jul 2019 19:42:49 +0800 Subject: [PATCH 2/7] delete repeated code --- .../org/apache/druid/segment/IndexIO.java | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index 53d5b3580040..00bb0746331e 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -455,13 +455,7 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws if (index.getSpatialIndexes().get(dimension) != null) { builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(index.getSpatialIndexes().get(dimension))); } - if (lazy) { - columns.put(dimension, Suppliers.memoize(() -> builder.build())); - } else { - ColumnHolder columnHolder = builder.build(); - columns.put(dimension, () -> columnHolder); - } - + columns.put(dimension, getColumnHolderSupplier(builder, lazy)); } for (String metric : index.getAvailableMetrics()) { @@ -475,25 +469,14 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() ) ); - if (lazy) { - columns.put(metric, Suppliers.memoize(() -> builder.build())); - } else { - ColumnHolder columnHolder = builder.build(); - columns.put(metric, () -> columnHolder); - } - + columns.put(metric, getColumnHolderSupplier(builder, lazy)); } else if (metricHolder.getType() == MetricHolder.MetricType.COMPLEX) { ColumnBuilder builder = new ColumnBuilder() .setType(ValueType.COMPLEX) .setComplexColumnSupplier( new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType) ); - if (lazy) { - columns.put(metric, Suppliers.memoize(() -> builder.build())); - } else { - ColumnHolder columnHolder = builder.build(); - columns.put(metric, () -> columnHolder); - } + columns.put(metric, getColumnHolderSupplier(builder, lazy)); } } @@ -505,12 +488,7 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap() ) ); - if (lazy) { - columns.put(ColumnHolder.TIME_COLUMN_NAME, Suppliers.memoize(() -> builder.build())); - } else { - ColumnHolder columnHolder = builder.build(); - columns.put(ColumnHolder.TIME_COLUMN_NAME, () -> columnHolder); - } + columns.put(ColumnHolder.TIME_COLUMN_NAME, getColumnHolderSupplier(builder, lazy)); return new SimpleQueryableIndex( index.getDataInterval(), @@ -522,6 +500,16 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws lazy ); } + + private Supplier getColumnHolderSupplier(ColumnBuilder builder, boolean lazy) + { + if (lazy) { + return Suppliers.memoize(() -> builder.build()); + } else { + ColumnHolder columnHolder = builder.build(); + return () -> columnHolder; + } + } } static class V9IndexLoader implements IndexLoader From e45c4fb753f2744526ec2f5c82a5e06166860535 Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Fri, 22 Nov 2019 17:23:02 +0800 Subject: [PATCH 3/7] add documentation for druid.segmentCache.lazyLoadOnStart --- docs/configuration/index.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 8ee2ab5d9b66..37b01999e19b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1359,6 +1359,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| |`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores| |`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`| +|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. Set this value to true will have a definite improvement during historical startup with HDD(may be 20X faster). This features is unnecessary for historical with SSD, because it is fast enough.|false| |`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2| In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise. From db7318c25c9468b4e0b7656080923d4fbaa24f73 Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Fri, 22 Nov 2019 17:50:40 +0800 Subject: [PATCH 4/7] fix unit test fail --- .../batch/parallel/AbstractMultiPhaseParallelIndexingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index c45c5ce09212..880d8065e174 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -275,7 +275,7 @@ private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir) final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()) .manufacturate(tempSegmentDir); try { - return loader.getSegment(dataSegment); + return loader.getSegment(dataSegment, false); } catch (SegmentLoadingException e) { throw new RuntimeException(e); From 22b1d47f3fb63a9d0400d5d4e825b8dd924e6c55 Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Fri, 22 Nov 2019 19:49:15 +0800 Subject: [PATCH 5/7] fix spellcheck --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 37b01999e19b..31c1d9ad42ac 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1359,7 +1359,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| |`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores| |`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`| -|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. Set this value to true will have a definite improvement during historical startup with HDD(may be 20X faster). This features is unnecessary for historical with SSD, because it is fast enough.|false| +|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. Set this value to true will have a definite improvement during historical startup with HDD(may be 20 times faster). This features is unnecessary for historical with SSD, because it is fast enough.|false| |`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2| In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise. From e37ac12452895cc0f462ddc6dd1ba53c3e2f048e Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Wed, 27 Nov 2019 10:33:55 +0800 Subject: [PATCH 6/7] update docs --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 31c1d9ad42ac..4f3be845d2df 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1359,7 +1359,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| |`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores| |`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`| -|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. Set this value to true will have a definite improvement during historical startup with HDD(may be 20 times faster). This features is unnecessary for historical with SSD, because it is fast enough.|false| +|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead.|false| |`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2| In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise. From 3ca2ca595fd94d0a98a9ac6f4fa3927e4c533b3e Mon Sep 17 00:00:00 2001 From: dengfangyuan Date: Tue, 3 Dec 2019 20:42:26 +0800 Subject: [PATCH 7/7] update docs mentioning a catch --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 4f3be845d2df..4754daac276a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1359,7 +1359,7 @@ These Historical configurations can be defined in the `historical/runtime.proper |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| |`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores| |`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`| -|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead.|false| +|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. When set to true, Historical startup time will be dramatically improved by deferring segment loading until the first time that segment takes part in a query, which will incur this cost instead. One catch is that if historical crashes while in the process of downloading and creating segment files, it is possible to end up with a corrupted segment on disk, this requires manual intervention to delete corrupted files. When the flag is set to true, historical startup would complete successfully and queries using this segment would fail at runtime.|false| |`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2| In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.