Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
0a6eb30
ready to test
Jan 5, 2021
4c3e697
tested on dev cluster
Jan 5, 2021
7775658
tested
Jan 5, 2021
6b9595e
code review
Jan 7, 2021
ff6deb5
add UTs
Jan 7, 2021
9fefdff
add UTs
Jan 7, 2021
13710d9
ut passed
Jan 7, 2021
f67b334
ut passed
Jan 7, 2021
7dd33e2
opti imports
Jan 7, 2021
dd4fd45
Merge branch 'master' into historical-lazyOnStart-with-fileCheck
Jan 7, 2021
eac8919
done
Jan 7, 2021
e1eca76
done
Jan 7, 2021
3d567da
fix checkstyle
Jan 8, 2021
34f98f0
Merge branch 'master' into historical-lazyOnStart-with-fileCheck
Jan 8, 2021
6b82e6c
modify uts
Jan 8, 2021
c573772
Merge branch 'master' into historical-lazyOnStart-with-fileCheck
Jan 9, 2021
f113ed3
modify logs
Jan 9, 2021
5bfca47
changing the package of SegmentLazyLoadFailCallback.java to org.apach…
Jan 12, 2021
6284c5f
Merge branch 'master' into historical-lazyOnStart-with-fileCheck
Jan 12, 2021
c7d9115
merge from master
Jan 12, 2021
ba5dd23
modify import orders
Jan 12, 2021
5c89232
merge from master
Jan 12, 2021
7714f81
merge from master
Jan 12, 2021
65f17df
modify logs
Jan 12, 2021
dd46d75
modify docs
Jan 12, 2021
15a4c3e
modify logs to rerun ci
Jan 12, 2021
80accce
modify logs to rerun ci
Jan 12, 2021
78f5ff3
modify logs to rerun ci
Jan 13, 2021
9dec7bd
modify logs to rerun ci
Jan 13, 2021
6bb59ce
modify logs to rerun ci
Jan 13, 2021
b5aa610
modify logs to rerun ci
Jan 13, 2021
4fdbc25
Merge branch 'master' into historical-lazyOnStart-with-fileCheck
Jan 14, 2021
a510bee
modify logs to rerun ci
Jan 14, 2021
fef72a7
modify logs to rerun ci
Jan 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ jobs:
# Set MAVEN_OPTS for Surefire launcher. Skip remoteresources to avoid intermittent connection timeouts when
# resolving the SIGAR dependency.
- >
MAVEN_OPTS='-Xmx800m' ${MVN} test -pl ${MAVEN_PROJECTS}
MAVEN_OPTS='-Xmx1100m' ${MVN} test -pl ${MAVEN_PROJECTS}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this change for?

Copy link
Copy Markdown
Contributor Author

@zhangyue19921010 zhangyue19921010 Jan 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current UTs don't cover load segment in a lazy way. This PR add a UT to test lazy loading which needs extra memory.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 was just surprised how much more it needed, if travis passes then I assume the change is fine, was just curious mostly

${MAVEN_SKIP} -Dremoteresources.skip=true -Ddruid.generic.useDefaultValueForNull=${DRUID_USE_DEFAULT_VALUE_FOR_NULL}
- sh -c "dmesg | egrep -i '(oom|out of memory|kill process|killed).*' -C 1 || exit 0"
- free -m
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
Expand Down Expand Up @@ -282,7 +283,7 @@ private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir)
final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper())
.manufacturate(tempSegmentDir);
try {
return loader.getSegment(dataSegment, false);
return loader.getSegment(dataSegment, false, SegmentLazyLoadFailCallback.NOOP);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
Expand Down
21 changes: 13 additions & 8 deletions processing/src/main/java/org/apache/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,17 @@ public void validateTwoSegments(final IndexableAdapter adapter1, final Indexable

public QueryableIndex loadIndex(File inDir) throws IOException
{
return loadIndex(inDir, false);
return loadIndex(inDir, false, SegmentLazyLoadFailCallback.NOOP);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add blank line here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

public QueryableIndex loadIndex(File inDir, boolean lazy) throws IOException

public QueryableIndex loadIndex(File inDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException
{
final int version = SegmentUtils.getVersionFromDir(inDir);

final IndexLoader loader = indexLoaders.get(version);

if (loader != null) {
return loader.load(inDir, mapper, lazy);
return loader.load(inDir, mapper, lazy, loadFailed);
} else {
throw new ISE("Unknown index version[%s]", version);
}
Expand Down Expand Up @@ -412,7 +413,7 @@ public MMappedIndex mapDir(File inDir) throws IOException

interface IndexLoader
{
QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException;
QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException;
}

static class LegacyIndexLoader implements IndexLoader
Expand All @@ -427,7 +428,7 @@ static class LegacyIndexLoader implements IndexLoader
}

@Override
public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException
public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException
{
MMappedIndex index = legacyHandler.mapDir(inDir);

Expand Down Expand Up @@ -522,7 +523,7 @@ static class V9IndexLoader implements IndexLoader
}

@Override
public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException
public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws IOException
{
log.debug("Mapping v9 index[%s]", inDir);
long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -598,7 +599,9 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws
try {
return deserializeColumn(mapper, colBuffer, smooshedFiles);
}
catch (IOException e) {
catch (IOException | RuntimeException e) {
log.warn(e, "Throw exceptions when deserialize column [%s].", columnName);
loadFailed.execute();
throw Throwables.propagate(e);
}
}
Expand All @@ -618,7 +621,9 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws
try {
return deserializeColumn(mapper, timeBuffer, smooshedFiles);
}
catch (IOException e) {
catch (IOException | RuntimeException e) {
log.warn(e, "Throw exceptions when deserialize column [%s]", ColumnHolder.TIME_COLUMN_NAME);
loadFailed.execute();
throw Throwables.propagate(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ private File multiphaseMerge(
// convert Files to QueryableIndexIndexableAdapter and do another merge phase
List<IndexableAdapter> qIndexAdapters = new ArrayList<>();
for (File outputFile : currentOutputs) {
QueryableIndex qIndex = indexIO.loadIndex(outputFile, true);
QueryableIndex qIndex = indexIO.loadIndex(outputFile, true, SegmentLazyLoadFailCallback.NOOP);
qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex));
}
currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment;

public interface SegmentLazyLoadFailCallback
{
void execute();
SegmentLazyLoadFailCallback NOOP = () -> {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -57,10 +58,10 @@ public Set<String> getKeyColumns()
}

@Override
public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException
public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
try {
return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()) {
return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy, loadFailed), dataSegment.getId()) {
@Nullable
@Override
public <T> T as(Class<T> clazz)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;

import java.io.File;
Expand All @@ -42,10 +43,10 @@ public MMappedQueryableSegmentizerFactory(@JacksonInject IndexIO indexIO)
}

@Override
public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException
public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
try {
return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId());
return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy, loadFailed), dataSegment.getId());
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;

import java.io.File;
Expand All @@ -31,5 +32,5 @@
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = MMappedQueryableSegmentizerFactory.class)
public interface SegmentizerFactory
{
Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException;
Segment factorize(DataSegment segment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ public void testCustomSegmentizerPersist() throws IOException
private static class CustomSegmentizerFactory implements SegmentizerFactory
{
@Override
public Segment factorize(DataSegment segment, File parentDir, boolean lazy) throws SegmentLoadingException
public Segment factorize(DataSegment segment, File parentDir, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
try {
return new QueryableIndexSegment(INDEX_IO.loadIndex(parentDir, lazy), segment.getId());
return new QueryableIndexSegment(INDEX_IO.loadIndex(parentDir, lazy, loadFailed), segment.getId());
}
catch (IOException e) {
throw new SegmentLoadingException(e, "%s", e.getMessage());
Expand Down
65 changes: 65 additions & 0 deletions processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.segment;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
Expand All @@ -29,6 +30,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.aggregation.Aggregator;
Expand All @@ -50,6 +52,7 @@
import org.junit.runners.Parameterized;

import javax.annotation.Nullable;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
Expand Down Expand Up @@ -338,4 +341,66 @@ public void testRowValidatorEquals() throws Exception
}
}
}

@Test
public void testLoadSegmentDamagedFileWithLazy()
{
final ObjectMapper mapper = new DefaultObjectMapper();
final IndexIO indexIO = new IndexIO(mapper, () -> 0);
String path = this.getClass().getClassLoader().getResource("v9SegmentPersistDir/segmentWithDamagedFile/").getPath();

ForkSegmentLoadDropHandler segmentLoadDropHandler = new ForkSegmentLoadDropHandler();
ForkSegment segment = new ForkSegment(true);
Assert.assertTrue(segment.getSegmentExist());
File inDir = new File(path);
Exception e = null;

try {
QueryableIndex queryableIndex = indexIO.loadIndex(inDir, true, () -> segmentLoadDropHandler.removeSegment(segment));
Assert.assertNotNull(queryableIndex);
queryableIndex.getDimensionHandlers();
List<String> columnNames = queryableIndex.getColumnNames();
for (String columnName : columnNames) {
queryableIndex.getColumnHolder(columnName).toString();
}
}
catch (Exception ex) {
// Do nothing. Can ignore exceptions here.
e = ex;
}
Assert.assertNotNull(e);
Assert.assertFalse(segment.getSegmentExist());

}

private static class ForkSegmentLoadDropHandler
{
public void addSegment()
{
}
public void removeSegment(ForkSegment segment)
{
segment.setSegmentExist(false);
}
}

private static class ForkSegment
{
private Boolean segmentExist;

ForkSegment(Boolean segmentExist)
{
this.segmentExist = segmentExist;
}

void setSegmentExist(Boolean value)
{
this.segmentExist = value;
}

Boolean getSegmentExist()
{
return this.segmentExist;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.BaseColumn;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void setup() throws IOException, SegmentLoadingException
null,
segment.getTotalSpace()
);
backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false);
backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false, SegmentLazyLoadFailCallback.NOOP);

columnNames = ImmutableList.<String>builder().add(ColumnHolder.TIME_COLUMN_NAME)
.addAll(backingSegment.asQueryableIndex().getColumnNames()).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable;
Expand Down Expand Up @@ -119,7 +120,7 @@ public void testSegmentizer() throws IOException, SegmentLoadingException
null,
persistedSegmentRoot.getTotalSpace()
);
final Segment loaded = factory.factorize(dataSegment, persistedSegmentRoot, false);
final Segment loaded = factory.factorize(dataSegment, persistedSegmentRoot, false, SegmentLazyLoadFailCallback.NOOP);

final BroadcastSegmentIndexedTable table = (BroadcastSegmentIndexedTable) loaded.as(IndexedTable.class);
Assert.assertNotNull(table);
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"mMapSegmentFactory"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
v1,2147483647,1
__time,0,0,141
count,0,141,282
dstIP,0,564,805
index.drd,0,1046,1205
metadata.drd,0,1205,1587
srcIP,0,805,1046
sum_bytes,0,282,423
sum_packets,0,423,564
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.loading;

import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;

import java.io.File;
Expand All @@ -31,7 +32,7 @@
public interface SegmentLoader
{
boolean isSegmentLoaded(DataSegment segment);
Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException;
Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;
void cleanup(DataSegment segment);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -177,7 +178,7 @@ private boolean checkSegmentFilesIntactWithStartMarker(File localStorageDir)
}

@Override
public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadingException
public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final ReferenceCountingLock lock = createOrGetLock(segment);
final File segmentFiles;
Expand All @@ -203,7 +204,7 @@ public Segment getSegment(DataSegment segment, boolean lazy) throws SegmentLoadi
factory = new MMappedQueryableSegmentizerFactory(indexIO);
}

return factory.factorize(segment, segmentFiles, lazy);
return factory.factorize(segment, segmentFiles, lazy, loadFailed);
}

/**
Expand Down
Loading