Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,7 @@ Additional peon configs include:
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2782,6 +2782,7 @@ private void makeToolboxFactory() throws IOException
null,
null,
null,
false,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2868,6 +2868,7 @@ private void makeToolboxFactory() throws IOException
null,
null,
null,
false,
false
);
final TestDerbyConnector derbyConnector = derby.getConnector();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ public class TaskConfig
@JsonProperty
private final boolean ignoreTimestampSpecForDruidInputSource;

@JsonProperty
private final boolean batchMemoryMappedIndex;

@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
Expand All @@ -87,7 +90,8 @@ public TaskConfig(
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
@JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
Expand All @@ -113,6 +117,7 @@ public TaskConfig(
this.shuffleDataLocations = shuffleDataLocations;
}
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
this.batchMemoryMappedIndex = batchMemoryMapIndex;
}

@JsonProperty
Expand Down Expand Up @@ -195,6 +200,13 @@ public boolean isIgnoreTimestampSpecForDruidInputSource()
return ignoreTimestampSpecForDruidInputSource;
}

@JsonProperty
public boolean getBatchMemoryMappedIndex()
{
return batchMemoryMappedIndex;
}


private String defaultDir(@Nullable String configParameter, final String defaultVal)
{
if (configParameter == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public static Appenderator newAppenderator(
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
parseExceptionHandler,
toolbox.getConfig().getBatchMemoryMappedIndex()
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.appenderator;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorTester;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
import java.util.stream.Collectors;

public class BatchAppenderatorTest extends InitializedNullHandlingTest
{
private static final List<SegmentIdWithShardSpec> IDENTIFIERS = ImmutableList.of(
createSegmentId("2000/2001", "A", 0),
createSegmentId("2000/2001", "A", 1),
createSegmentId("2001/2002", "A", 0)
);

@Test
public void testSimpleIngestionWithIndexesNotMapped() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
false,
false)) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;

// startJob
Assert.assertEquals(null, appenderator.startJob());

// getDataSource
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());

// add
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);

Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);

Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
.getNumRowsInSegment()
);

// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList()));

// getRowCount
Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
thrown = false;
try {
appenderator.getRowCount(IDENTIFIERS.get(2));
}
catch (IllegalStateException e) {
thrown = true;
}
Assert.assertTrue(thrown);

// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
new Function<DataSegment, SegmentIdWithShardSpec>()
{
@Override
public SegmentIdWithShardSpec apply(DataSegment input)
{
return SegmentIdWithShardSpec.fromDataSegment(input);
}
}
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));

appenderator.clear();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}

@Test
public void testSimpleIngestionWithIndexesMapped() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2,
false,
true)) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;

// startJob
Assert.assertEquals(null, appenderator.startJob());

// getDataSource
Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource());

// add
Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null)
.getNumRowsInSegment()
);

Assert.assertEquals(
2,
appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null)
.getNumRowsInSegment()
);

Assert.assertEquals(
1,
appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null)
.getNumRowsInSegment()
);

// getSegments
Assert.assertEquals(IDENTIFIERS.subList(0, 2),
appenderator.getSegments().stream().sorted().collect(Collectors.toList()));

// getRowCount
Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0)));
Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1)));
thrown = false;
try {
appenderator.getRowCount(IDENTIFIERS.get(2));
}
catch (IllegalStateException e) {
thrown = true;
}
Assert.assertTrue(thrown);

// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
new Function<DataSegment, SegmentIdWithShardSpec>()
{
@Override
public SegmentIdWithShardSpec apply(DataSegment input)
{
return SegmentIdWithShardSpec.fromDataSegment(input);
}
}
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList()));

appenderator.clear();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(
AppenderatorTester.DATASOURCE,
Intervals.of(interval),
version,
new LinearShardSpec(partitionNum)

);
}

static InputRow createInputRow(String ts, String dim, Object met)
{
return new MapBasedInputRow(
DateTimes.of(ts).getMillis(),
ImmutableList.of("dim"),
ImmutableMap.of(
"dim",
dim,
"met",
met
)
);
}


}

Loading