diff --git a/.travis.yml b/.travis.yml index a25ea08f2ab2..aa03eee01466 100644 --- a/.travis.yml +++ b/.travis.yml @@ -154,7 +154,7 @@ jobs: # Set MAVEN_OPTS for Surefire launcher. Skip remoteresources to avoid intermittent connection timeouts when # resolving the SIGAR dependency. - > - MAVEN_OPTS='-Xmx800m' ${MVN} test -pl ${MAVEN_PROJECTS} + MAVEN_OPTS='-Xmx1100m' ${MVN} test -pl ${MAVEN_PROJECTS} ${MAVEN_SKIP} -Dremoteresources.skip=true -Ddruid.generic.useDefaultValueForNull=${DRUID_USE_DEFAULT_VALUE_FOR_NULL} - sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0" - free -m diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java index b57cedeebbd4..d06d17c39309 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java @@ -51,6 +51,7 @@ import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; @@ -282,7 +283,7 @@ private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir) final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper()) .manufacturate(tempSegmentDir); try { - return loader.getSegment(dataSegment, false); + return loader.getSegment(dataSegment, false, SegmentLazyLoadFailCallback.NOOP); } catch (SegmentLoadingException e) { throw new RuntimeException(e); diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java b/processing/src/main/java/org/apache/druid/segment/IndexIO.java index 655807275737..df43f6374a58 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java @@ -182,16 +182,17 @@ public void validateTwoSegments(final IndexableAdapter adapter1, final Indexable public QueryableIndex loadIndex(File inDir) throws IOException { - return loadIndex(inDir, false); + return loadIndex(inDir, false, SegmentLazyLoadFailCallback.NOOP); } - public QueryableIndex loadIndex(File inDir, boolean lazy) throws IOException + + public QueryableIndex loadIndex(File inDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException { final int version = SegmentUtils.getVersionFromDir(inDir); final IndexLoader loader = indexLoaders.get(version); if (loader != null) { - return loader.load(inDir, mapper, lazy); + return loader.load(inDir, mapper, lazy, loadFailed); } else { throw new ISE("Unknown index version[%s]", version); } @@ -412,7 +413,7 @@ public MMappedIndex mapDir(File inDir) throws IOException interface IndexLoader { - QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException; + QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException; } static class LegacyIndexLoader implements IndexLoader @@ -427,7 +428,7 @@ static class LegacyIndexLoader implements IndexLoader } @Override - public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException + public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException { MMappedIndex index = legacyHandler.mapDir(inDir); @@ -522,7 +523,7 @@ static class V9IndexLoader implements IndexLoader } @Override - public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException + public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException { log.debug("Mapping v9 index[%s]", inDir); long startTime = System.currentTimeMillis(); @@ -598,7 +599,9 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws try { return deserializeColumn(mapper, colBuffer, smooshedFiles); } - catch (IOException e) { + catch (IOException | RuntimeException e) { + log.warn(e, "Throw exceptions when deserialize column [%s].", columnName); + loadFailed.execute(); throw Throwables.propagate(e); } } @@ -618,7 +621,9 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws try { return deserializeColumn(mapper, timeBuffer, smooshedFiles); } - catch (IOException e) { + catch (IOException | RuntimeException e) { + log.warn(e, "Throw exceptions when deserialize column [%s]", ColumnHolder.TIME_COLUMN_NAME); + loadFailed.execute(); throw Throwables.propagate(e); } } diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index cb65289d78d6..02618daf929a 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -1011,7 +1011,7 @@ private File multiphaseMerge( // convert Files to QueryableIndexIndexableAdapter and do another merge phase List qIndexAdapters = new ArrayList<>(); for (File outputFile : currentOutputs) { - QueryableIndex qIndex = indexIO.loadIndex(outputFile, true); + QueryableIndex qIndex = indexIO.loadIndex(outputFile, true, SegmentLazyLoadFailCallback.NOOP); qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex)); } currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge); diff --git a/processing/src/main/java/org/apache/druid/segment/SegmentLazyLoadFailCallback.java b/processing/src/main/java/org/apache/druid/segment/SegmentLazyLoadFailCallback.java new file mode 100644 index 000000000000..babc6ba11237 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/SegmentLazyLoadFailCallback.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +public interface SegmentLazyLoadFailCallback +{ + void execute(); + SegmentLazyLoadFailCallback NOOP = () -> {}; +} diff --git a/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java index c675ea5ff7bb..75f2882529a7 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java @@ -25,6 +25,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.timeline.DataSegment; @@ -57,10 +58,10 @@ public Set getKeyColumns() } @Override - public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException + public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { try { - return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()) { + return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy, loadFailed), dataSegment.getId()) { @Nullable @Override public T as(Class clazz) diff --git a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java index ea5290e204c0..d3124cb122f9 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/MMappedQueryableSegmentizerFactory.java @@ -24,6 +24,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; import java.io.File; @@ -42,10 +43,10 @@ public MMappedQueryableSegmentizerFactory(@JacksonInject IndexIO indexIO) } @Override - public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException + public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { try { - return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()); + return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy, loadFailed), dataSegment.getId()); } catch (IOException e) { throw new SegmentLoadingException(e, "%s", e.getMessage()); diff --git a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java index 09bc048448b8..25df3c56f317 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/SegmentizerFactory.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; import java.io.File; @@ -31,5 +32,5 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class) public interface SegmentizerFactory { - Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException; + Segment factorize(DataSegment segment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException; } diff --git a/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java index 8b4a8d3f5313..9728f230f024 100644 --- a/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CustomSegmentizerFactoryTest.java @@ -126,10 +126,10 @@ public void testCustomSegmentizerPersist() throws IOException private static class CustomSegmentizerFactory implements SegmentizerFactory { @Override - public Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException + public Segment factorize(DataSegment segment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { try { - return new QueryableIndexSegment(INDEX_IO.loadIndex(parentDir, lazy), segment.getId()); + return new QueryableIndexSegment(INDEX_IO.loadIndex(parentDir, lazy, loadFailed), segment.getId()); } catch (IOException e) { throw new SegmentLoadingException(e, "%s", e.getMessage()); diff --git a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java index 5066bef4ac54..b6a9155566be 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java @@ -19,6 +19,7 @@ package org.apache.druid.segment; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; @@ -29,6 +30,7 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.UOE; import org.apache.druid.query.aggregation.Aggregator; @@ -50,6 +52,7 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; +import java.io.File; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -338,4 +341,66 @@ public void testRowValidatorEquals() throws Exception } } } + + @Test + public void testLoadSegmentDamagedFileWithLazy() + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final IndexIO indexIO = new IndexIO(mapper, () -> 0); + String path = this.getClass().getClassLoader().getResource("v9SegmentPersistDir/segmentWithDamagedFile/").getPath(); + + ForkSegmentLoadDropHandler segmentLoadDropHandler = new ForkSegmentLoadDropHandler(); + ForkSegment segment = new ForkSegment(true); + Assert.assertTrue(segment.getSegmentExist()); + File inDir = new File(path); + Exception e = null; + + try { + QueryableIndex queryableIndex = indexIO.loadIndex(inDir, true, () -> segmentLoadDropHandler.removeSegment(segment)); + Assert.assertNotNull(queryableIndex); + queryableIndex.getDimensionHandlers(); + List columnNames = queryableIndex.getColumnNames(); + for (String columnName : columnNames) { + queryableIndex.getColumnHolder(columnName).toString(); + } + } + catch (Exception ex) { + // Do nothing. Can ignore exceptions here. + e = ex; + } + Assert.assertNotNull(e); + Assert.assertFalse(segment.getSegmentExist()); + + } + + private static class ForkSegmentLoadDropHandler + { + public void addSegment() + { + } + public void removeSegment(ForkSegment segment) + { + segment.setSegmentExist(false); + } + } + + private static class ForkSegment + { + private Boolean segmentExist; + + ForkSegment(Boolean segmentExist) + { + this.segmentExist = segmentExist; + } + + void setSegmentExist(Boolean value) + { + this.segmentExist = value; + } + + Boolean getSegmentExist() + { + return this.segmentExist; + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java index 524f2ac1f185..4635a7bf25a8 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java @@ -42,6 +42,7 @@ import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.SimpleAscendingOffset; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.BaseColumn; @@ -137,7 +138,7 @@ public void setup() throws IOException, SegmentLoadingException null, segment.getTotalSpace() ); - backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false); + backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false, SegmentLazyLoadFailCallback.NOOP); columnNames = ImmutableList.builder().add(ColumnHolder.TIME_COLUMN_NAME) .addAll(backingSegment.asQueryableIndex().getColumnNames()).build(); diff --git a/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java index ab4c45339cdc..0e8ac933a6ce 100644 --- a/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable; @@ -119,7 +120,7 @@ public void testSegmentizer() throws IOException, SegmentLoadingException null, persistedSegmentRoot.getTotalSpace() ); - final Segment loaded = factory.factorize(dataSegment, persistedSegmentRoot, false); + final Segment loaded = factory.factorize(dataSegment, persistedSegmentRoot, false, SegmentLazyLoadFailCallback.NOOP); final BroadcastSegmentIndexedTable table = (BroadcastSegmentIndexedTable) loaded.as(IndexedTable.class); Assert.assertNotNull(table); diff --git a/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/00000.smoosh b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/00000.smoosh new file mode 100644 index 000000000000..aaf3449a90ef Binary files /dev/null and b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/00000.smoosh differ diff --git a/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/factory.json b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/factory.json new file mode 100644 index 000000000000..f7b2cc330765 --- /dev/null +++ b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/factory.json @@ -0,0 +1 @@ +{"type":"mMapSegmentFactory"} \ No newline at end of file diff --git a/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/meta.smoosh b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/meta.smoosh new file mode 100644 index 000000000000..70a831feef32 --- /dev/null +++ b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/meta.smoosh @@ -0,0 +1,9 @@ +v1,2147483647,1 +__time,0,0,141 +count,0,141,282 +dstIP,0,564,805 +index.drd,0,1046,1205 +metadata.drd,0,1205,1587 +srcIP,0,805,1046 +sum_bytes,0,282,423 +sum_packets,0,423,564 diff --git a/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/version.bin b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/version.bin new file mode 100644 index 000000000000..3dd5ace49c65 Binary files /dev/null and b/processing/src/test/resources/v9SegmentPersistDir/segmentWithDamagedFile/version.bin differ diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java index f63024a58cc6..741cfa1373ad 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoader.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.loading; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; import java.io.File; @@ -31,7 +32,7 @@ public interface SegmentLoader { boolean isSegmentLoaded(DataSegment segment); - Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException; + Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException; File getSegmentFiles(DataSegment segment) throws SegmentLoadingException; void cleanup(DataSegment segment); } diff --git a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 16bf499a4070..cd4d3fd86e5f 100644 --- a/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nonnull; @@ -177,7 +178,7 @@ private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir) } @Override - public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException + public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { final ReferenceCountingLock lock = createOrGetLock(segment); final File segmentFiles; @@ -203,7 +204,7 @@ public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadi factory = new MMappedQueryableSegmentizerFactory(indexIO); } - return factory.factorize(segment, segmentFiles, lazy); + return factory.factorize(segment, segmentFiles, lazy, loadFailed); } /** diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index 3b3d891596bb..c3636d3a9780 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -31,6 +31,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.join.table.IndexedTable; import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable; import org.apache.druid.segment.loading.SegmentLoader; @@ -214,14 +215,15 @@ private TableDataSource getTableDataSource(DataSourceAnalysis analysis) * * @param segment segment to load * @param lazy whether to lazy load columns metadata + * @param loadFailed callBack to execute when segment lazy load failed * * @return true if the segment was newly loaded, false if it was already loaded * * @throws SegmentLoadingException if the segment cannot be loaded */ - public boolean loadSegment(final DataSegment segment, boolean lazy) throws SegmentLoadingException + public boolean loadSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { - final Segment adapter = getAdapter(segment, lazy); + final Segment adapter = getAdapter(segment, lazy, loadFailed); final SettableSupplier resultSupplier = new SettableSupplier<>(); @@ -271,11 +273,11 @@ public boolean loadSegment(final DataSegment segment, boolean lazy) throws Segme return resultSupplier.get(); } - private Segment getAdapter(final DataSegment segment, boolean lazy) throws SegmentLoadingException + private Segment getAdapter(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException { final Segment adapter; try { - adapter = segmentLoader.getSegment(segment, lazy); + adapter = segmentLoader.getSegment(segment, lazy, loadFailed); } catch (SegmentLoadingException e) { segmentLoader.cleanup(segment); diff --git a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java index 0cb0ec0218b8..dcc03097b558 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java +++ b/server/src/main/java/org/apache/druid/server/coordination/SegmentLoadDropHandler.java @@ -269,7 +269,10 @@ private void loadSegment(DataSegment segment, DataSegmentChangeCallback callback { final boolean loaded; try { - loaded = segmentManager.loadSegment(segment, lazy); + loaded = segmentManager.loadSegment(segment, + lazy, + () -> this.removeSegment(segment, DataSegmentChangeCallback.NOOP, false) + ); } catch (Exception e) { removeSegment(segment, callback, false); diff --git a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java index a42c553bad5a..557537c06811 100644 --- a/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java +++ b/server/src/test/java/org/apache/druid/segment/loading/CacheTestSegmentLoader.java @@ -22,6 +22,7 @@ import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; @@ -46,7 +47,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment, boolean lazy) + public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { return new Segment() { diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java index 3bb87686b759..38c410019400 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java @@ -41,6 +41,7 @@ import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.join.BroadcastTableJoinableFactory; @@ -159,7 +160,7 @@ public void testLoadIndexedTable() throws IOException, SegmentLoadingException IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv"); final String interval = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"; DataSegment segment = createSegment(data, interval, version); - Assert.assertTrue(segmentManager.loadSegment(segment, false)); + Assert.assertTrue(segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP)); Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource)); Optional maybeJoinable = makeJoinable(dataSource); @@ -208,8 +209,8 @@ public void testLoadMultipleIndexedTableOverwrite() throws IOException, SegmentL IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom"); DataSegment segment1 = createSegment(data, interval, version); DataSegment segment2 = createSegment(data2, interval2, version2); - Assert.assertTrue(segmentManager.loadSegment(segment1, false)); - Assert.assertTrue(segmentManager.loadSegment(segment2, false)); + Assert.assertTrue(segmentManager.loadSegment(segment1, false, SegmentLazyLoadFailCallback.NOOP)); + Assert.assertTrue(segmentManager.loadSegment(segment2, false, SegmentLazyLoadFailCallback.NOOP)); Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource)); Optional maybeJoinable = makeJoinable(dataSource); @@ -271,7 +272,7 @@ public void testLoadMultipleIndexedTable() throws IOException, SegmentLoadingExc final String interval2 = "2011-01-12T00:00:00.000Z/2011-03-28T00:00:00.000Z"; IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom"); IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.top"); - Assert.assertTrue(segmentManager.loadSegment(createSegment(data, interval, version), false)); + Assert.assertTrue(segmentManager.loadSegment(createSegment(data, interval, version), false, SegmentLazyLoadFailCallback.NOOP)); Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource)); Optional maybeJoinable = makeJoinable(dataSource); @@ -293,7 +294,7 @@ public void testLoadMultipleIndexedTable() throws IOException, SegmentLoadingExc ); // add another segment with smaller interval, only partially overshadows so there will be 2 segments in timeline - Assert.assertTrue(segmentManager.loadSegment(createSegment(data2, interval2, version2), false)); + Assert.assertTrue(segmentManager.loadSegment(createSegment(data2, interval2, version2), false, SegmentLazyLoadFailCallback.NOOP)); expectedException.expect(ISE.class); diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java index a91411b8db73..762339f7f1f8 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerTest.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -69,7 +70,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment, boolean lazy) + public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) { return new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), @@ -222,7 +223,7 @@ public void testLoadSegment() throws ExecutionException, InterruptedException, S final List> futures = SEGMENTS.stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment, false) + () -> segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP) ) ) .collect(Collectors.toList()); @@ -238,7 +239,7 @@ public void testLoadSegment() throws ExecutionException, InterruptedException, S public void testDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException { for (DataSegment eachSegment : SEGMENTS) { - Assert.assertTrue(segmentManager.loadSegment(eachSegment, false)); + Assert.assertTrue(segmentManager.loadSegment(eachSegment, false, SegmentLazyLoadFailCallback.NOOP)); } final List> futures = ImmutableList.of(SEGMENTS.get(0), SEGMENTS.get(2)).stream() @@ -264,14 +265,14 @@ public void testDropSegment() throws SegmentLoadingException, ExecutionException @Test public void testLoadDropSegment() throws SegmentLoadingException, ExecutionException, InterruptedException { - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false)); - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2), false)); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP)); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(2), false, SegmentLazyLoadFailCallback.NOOP)); final List> loadFutures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(3), SEGMENTS.get(4)) .stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment, false) + () -> segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP) ) ) .collect(Collectors.toList()); @@ -302,10 +303,10 @@ public void testLoadDropSegment() throws SegmentLoadingException, ExecutionExcep public void testLoadDuplicatedSegmentsSequentially() throws SegmentLoadingException { for (DataSegment segment : SEGMENTS) { - Assert.assertTrue(segmentManager.loadSegment(segment, false)); + Assert.assertTrue(segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP)); } // try to load an existing segment - Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0), false)); + Assert.assertFalse(segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP)); assertResult(SEGMENTS); } @@ -318,7 +319,7 @@ public void testLoadDuplicatedSegmentsInParallel() .stream() .map( segment -> executor.submit( - () -> segmentManager.loadSegment(segment, false) + () -> segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP) ) ) .collect(Collectors.toList()); @@ -339,7 +340,7 @@ public void testLoadDuplicatedSegmentsInParallel() @Test public void testNonExistingSegmentsSequentially() throws SegmentLoadingException { - Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false)); + Assert.assertTrue(segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP)); // try to drop a non-existing segment of different data source segmentManager.dropSegment(SEGMENTS.get(2)); @@ -352,7 +353,7 @@ public void testNonExistingSegmentsSequentially() throws SegmentLoadingException public void testNonExistingSegmentsInParallel() throws SegmentLoadingException, ExecutionException, InterruptedException { - segmentManager.loadSegment(SEGMENTS.get(0), false); + segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP); final List> futures = ImmutableList.of(SEGMENTS.get(1), SEGMENTS.get(2)) .stream() .map( @@ -375,7 +376,7 @@ public void testNonExistingSegmentsInParallel() @Test public void testRemoveEmptyTimeline() throws SegmentLoadingException { - segmentManager.loadSegment(SEGMENTS.get(0), false); + segmentManager.loadSegment(SEGMENTS.get(0), false, SegmentLazyLoadFailCallback.NOOP); assertResult(ImmutableList.of(SEGMENTS.get(0))); Assert.assertEquals(1, segmentManager.getDataSources().size()); segmentManager.dropSegment(SEGMENTS.get(0)); @@ -412,7 +413,7 @@ public void testLoadAndDropNonRootGenerationSegment() throws SegmentLoadingExcep 10 ); - segmentManager.loadSegment(segment, false); + segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP); assertResult(ImmutableList.of(segment)); segmentManager.dropSegment(segment); @@ -442,7 +443,7 @@ private void assertResult(List expectedExistingSegments) throws Seg segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk( - ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment, false), segment.getShardSpec()) + ReferenceCountingSegment.wrapSegment(SEGMENT_LOADER.getSegment(segment, false, SegmentLazyLoadFailCallback.NOOP), segment.getShardSpec()) ) ); } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java index e5593f131988..87587ce69a66 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java @@ -34,6 +34,7 @@ import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPuller; @@ -129,7 +130,7 @@ public void testLoadSameSegment() throws IOException, ExecutionException, Interr final DataSegment segment = createSegment("2019-01-01/2019-01-02"); final List futures = IntStream .range(0, 16) - .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment, false))) + .mapToObj(i -> exec.submit(() -> segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP))) .collect(Collectors.toList()); for (Future future : futures) { future.get(); @@ -154,7 +155,7 @@ public void testLoadMultipleSegments() throws IOException, ExecutionException, I .mapToObj(i -> exec.submit(() -> { for (DataSegment segment : segments) { try { - segmentManager.loadSegment(segment, false); + segmentManager.loadSegment(segment, false, SegmentLazyLoadFailCallback.NOOP); } catch (SegmentLoadingException e) { throw new RuntimeException(e); @@ -222,7 +223,7 @@ public long size() private static class TestSegmentizerFactory implements SegmentizerFactory { @Override - public Segment factorize(DataSegment segment, File parentDir, boolean lazy) + public Segment factorize(DataSegment segment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { return new Segment() { diff --git a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java index a9797846e64f..8aa9f3678350 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/SegmentLoadDropHandlerTest.java @@ -539,7 +539,7 @@ public void testProcessBatch() throws Exception public void testProcessBatchDuplicateLoadRequestsWhenFirstRequestFailsSecondRequestShouldSucceed() throws Exception { final SegmentManager segmentManager = Mockito.mock(SegmentManager.class); - Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())) + Mockito.when(segmentManager.loadSegment(ArgumentMatchers.any(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.any())) .thenThrow(new RuntimeException("segment loading failure test")) .thenReturn(true); final SegmentLoadDropHandler segmentLoadDropHandler = new SegmentLoadDropHandler( diff --git a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java index 4f00cded4d16..6f8d5a63e9db 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ServerManagerTest.java @@ -75,6 +75,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.SegmentLazyLoadFailCallback; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; @@ -151,7 +152,7 @@ public boolean isSegmentLoaded(DataSegment segment) } @Override - public Segment getSegment(final DataSegment segment, boolean lazy) + public Segment getSegment(final DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback SegmentLazyLoadFailCallback) { return new SegmentForTesting( MapUtils.getString(segment.getLoadSpec(), "version"), @@ -674,7 +675,8 @@ public void loadQueryable(String dataSource, String version, Interval interval) IndexIO.CURRENT_VERSION_ID, 123L ), - false + false, + SegmentLazyLoadFailCallback.NOOP ); } catch (SegmentLoadingException e) {