diff --git a/docs/configuration/index.md b/docs/configuration/index.md index e422a3fd4fdd..25c3ca108266 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1329,6 +1329,7 @@ Additional peon configs include: |`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote| |`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`| |`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`| +|`druid.indexer.task.batchMemoryMappedIndex`|If false, native batch ingestion will not map indexes thus saving heap space. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new batch ingestion code that avoids memory mapping indices. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 128810463d91..34ef75cc87a3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2782,6 +2782,7 @@ private void makeToolboxFactory() throws IOException null, null, null, + false, false ); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 6b497598534a..19d2445f7ee2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2868,6 +2868,7 @@ private void makeToolboxFactory() throws IOException null, null, null, + false, false ); final TestDerbyConnector derbyConnector = derby.getConnector(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java index f0999d55a4cd..0285b33cd85a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/config/TaskConfig.java @@ -76,6 +76,9 @@ public class TaskConfig @JsonProperty private final boolean ignoreTimestampSpecForDruidInputSource; + @JsonProperty + private final boolean batchMemoryMappedIndex; + @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @@ -87,7 +90,8 @@ public TaskConfig( @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @JsonProperty("directoryLockTimeout") Period directoryLockTimeout, @JsonProperty("shuffleDataLocations") List shuffleDataLocations, - @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource + @JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource, + @JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMapIndex // only set to true to fall back to older behavior ) { this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir; @@ -113,6 +117,7 @@ public TaskConfig( this.shuffleDataLocations = shuffleDataLocations; } this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource; + this.batchMemoryMappedIndex = batchMemoryMapIndex; } @JsonProperty @@ -195,6 +200,13 @@ public boolean isIgnoreTimestampSpecForDruidInputSource() return ignoreTimestampSpecForDruidInputSource; } + @JsonProperty + public boolean getBatchMemoryMappedIndex() + { + return batchMemoryMappedIndex; + } + + private String defaultDir(@Nullable String configParameter, final String defaultVal) { if (configParameter == null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java index 711f9d478b95..bff138fb7172 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/BatchAppenderators.java @@ -80,7 +80,8 @@ public static Appenderator newAppenderator( toolbox.getIndexIO(), toolbox.getIndexMergerV9(), rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + toolbox.getConfig().getBatchMemoryMappedIndex() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java new file mode 100644 index 000000000000..e4417639f01f --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.appenderator; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.AppenderatorTester; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.stream.Collectors; + +public class BatchAppenderatorTest extends InitializedNullHandlingTest +{ + private static final List IDENTIFIERS = ImmutableList.of( + createSegmentId("2000/2001", "A", 0), + createSegmentId("2000/2001", "A", 1), + createSegmentId("2001/2002", "A", 0) + ); + + @Test + public void testSimpleIngestionWithIndexesNotMapped() throws Exception + { + try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, + false, + false)) { + final Appenderator appenderator = tester.getAppenderator(); + boolean thrown; + + // startJob + Assert.assertEquals(null, appenderator.startJob()); + + // getDataSource + Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource()); + + // add + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 2, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null) + .getNumRowsInSegment() + ); + + // getSegments + Assert.assertEquals(IDENTIFIERS.subList(0, 2), + appenderator.getSegments().stream().sorted().collect(Collectors.toList())); + + // getRowCount + Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0))); + Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1))); + thrown = false; + try { + appenderator.getRowCount(IDENTIFIERS.get(2)); + } + catch (IllegalStateException e) { + thrown = true; + } + Assert.assertTrue(thrown); + + // push all + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push( + appenderator.getSegments(), + null, + false + ).get(); + Assert.assertEquals( + IDENTIFIERS.subList(0, 2), + Lists.transform( + segmentsAndCommitMetadata.getSegments(), + new Function() + { + @Override + public SegmentIdWithShardSpec apply(DataSegment input) + { + return SegmentIdWithShardSpec.fromDataSegment(input); + } + } + ).stream().sorted().collect(Collectors.toList()) + ); + Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), + segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())); + + appenderator.clear(); + Assert.assertTrue(appenderator.getSegments().isEmpty()); + } + } + + @Test + public void testSimpleIngestionWithIndexesMapped() throws Exception + { + try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(2, + false, + true)) { + final Appenderator appenderator = tester.getAppenderator(); + boolean thrown; + + // startJob + Assert.assertEquals(null, appenderator.startJob()); + + // getDataSource + Assert.assertEquals(AppenderatorTester.DATASOURCE, appenderator.getDataSource()); + + // add + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "foo", 1), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 2, + appenderator.add(IDENTIFIERS.get(0), createInputRow("2000", "bar", 2), null) + .getNumRowsInSegment() + ); + + Assert.assertEquals( + 1, + appenderator.add(IDENTIFIERS.get(1), createInputRow("2000", "qux", 4), null) + .getNumRowsInSegment() + ); + + // getSegments + Assert.assertEquals(IDENTIFIERS.subList(0, 2), + appenderator.getSegments().stream().sorted().collect(Collectors.toList())); + + // getRowCount + Assert.assertEquals(2, appenderator.getRowCount(IDENTIFIERS.get(0))); + Assert.assertEquals(1, appenderator.getRowCount(IDENTIFIERS.get(1))); + thrown = false; + try { + appenderator.getRowCount(IDENTIFIERS.get(2)); + } + catch (IllegalStateException e) { + thrown = true; + } + Assert.assertTrue(thrown); + + // push all + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push( + appenderator.getSegments(), + null, + false + ).get(); + Assert.assertEquals( + IDENTIFIERS.subList(0, 2), + Lists.transform( + segmentsAndCommitMetadata.getSegments(), + new Function() + { + @Override + public SegmentIdWithShardSpec apply(DataSegment input) + { + return SegmentIdWithShardSpec.fromDataSegment(input); + } + } + ).stream().sorted().collect(Collectors.toList()) + ); + Assert.assertEquals(tester.getPushedSegments().stream().sorted().collect(Collectors.toList()), + segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())); + + appenderator.clear(); + Assert.assertTrue(appenderator.getSegments().isEmpty()); + } + } + private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum) + { + return new SegmentIdWithShardSpec( + AppenderatorTester.DATASOURCE, + Intervals.of(interval), + version, + new LinearShardSpec(partitionNum) + + ); + } + + static InputRow createInputRow(String ts, String dim, Object met) + { + return new MapBasedInputRow( + DateTimes.of(ts).getMillis(), + ImmutableList.of("dim"), + ImmutableMap.of( + "dim", + dim, + "met", + met + ) + ); + } + + +} + diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java new file mode 100644 index 000000000000..4058aff938e5 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/appenderator/BatchAppenderatorTester.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.appenderator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JSONParseSpec; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.IndexTask; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.ParseExceptionHandler; +import org.apache.druid.segment.incremental.RowIngestionMeters; +import org.apache.druid.segment.incremental.SimpleRowIngestionMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.realtime.FireDepartmentMetrics; +import org.apache.druid.segment.realtime.appenderator.Appenderator; +import org.apache.druid.segment.realtime.appenderator.Appenderators; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.LinearShardSpec; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; + +public class BatchAppenderatorTester implements AutoCloseable +{ + public static final String DATASOURCE = "foo"; + + private final DataSchema schema; + private final IndexTask.IndexTuningConfig tuningConfig; + private final FireDepartmentMetrics metrics; + private final DataSegmentPusher dataSegmentPusher; + private final ObjectMapper objectMapper; + private final Appenderator appenderator; + private final IndexIO indexIO; + private final IndexMerger indexMerger; + private final ServiceEmitter emitter; + + private final List pushedSegments = new CopyOnWriteArrayList<>(); + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final boolean enablePushFailure, + boolean batchMemoryMappedIndex + ) + { + this(maxRowsInMemory, -1, null, enablePushFailure, batchMemoryMappedIndex); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + final File basePersistDirectory, + final boolean enablePushFailure, + boolean batchMemoryMappedIndex + ) + { + this( + maxRowsInMemory, + maxSizeInBytes, + basePersistDirectory, + enablePushFailure, + new SimpleRowIngestionMeters(), + false, + batchMemoryMappedIndex + ); + } + + public BatchAppenderatorTester( + final int maxRowsInMemory, + final long maxSizeInBytes, + final File basePersistDirectory, + final boolean enablePushFailure, + final RowIngestionMeters rowIngestionMeters, + final boolean skipBytesInMemoryOverheadCheck, + boolean batchMemoryMappedIndex + ) + { + objectMapper = new DefaultObjectMapper(); + objectMapper.registerSubtypes(LinearShardSpec.class); + + final Map parserMap = objectMapper.convertValue( + new MapInputRowParser( + new JSONParseSpec( + new TimestampSpec("ts", "auto", null), + new DimensionsSpec(null, null, null), + null, + null, + null + ) + ), + Map.class + ); + schema = new DataSchema( + DATASOURCE, + parserMap, + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("met", "met") + }, + new UniformGranularitySpec(Granularities.MINUTE, Granularities.NONE, null), + null, + objectMapper + ); + tuningConfig = new IndexTask.IndexTuningConfig( + null, + 2, + null, + maxRowsInMemory, + maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, + false, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + OffHeapMemorySegmentWriteOutMediumFactory.instance(), + true, + null, + null, + null, + null + ).withBasePersistDirectory(createNewBasePersistDirectory()); + + metrics = new FireDepartmentMetrics(); + + indexIO = new IndexIO( + objectMapper, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + indexMerger = new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + + emitter = new ServiceEmitter( + "test", + "test", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + dataSegmentPusher = new DataSegmentPusher() + { + private boolean mustFail = true; + + @Deprecated + @Override + public String getPathForHadoop(String dataSource) + { + return getPathForHadoop(); + } + + @Override + public String getPathForHadoop() + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException + { + if (enablePushFailure && mustFail) { + mustFail = false; + throw new IOException("Push failure test"); + } else if (enablePushFailure) { + mustFail = true; + } + pushedSegments.add(segment); + return segment; + } + + @Override + public Map makeLoadSpec(URI uri) + { + throw new UnsupportedOperationException(); + } + }; + appenderator = Appenderators.createOffline( + schema.getDataSource(), + schema, + tuningConfig, + metrics, + dataSegmentPusher, + objectMapper, + indexIO, + indexMerger, + rowIngestionMeters, + new ParseExceptionHandler(rowIngestionMeters, false, Integer.MAX_VALUE, 0), + batchMemoryMappedIndex + ); + } + + private long getDefaultMaxBytesInMemory() + { + return (Runtime.getRuntime().totalMemory()) / 3; + } + + public DataSchema getSchema() + { + return schema; + } + + public IndexTask.IndexTuningConfig getTuningConfig() + { + return tuningConfig; + } + + public FireDepartmentMetrics getMetrics() + { + return metrics; + } + + public DataSegmentPusher getDataSegmentPusher() + { + return dataSegmentPusher; + } + + public ObjectMapper getObjectMapper() + { + return objectMapper; + } + + public Appenderator getAppenderator() + { + return appenderator; + } + + public List getPushedSegments() + { + return pushedSegments; + } + + @Override + public void close() throws Exception + { + appenderator.close(); + emitter.close(); + FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory()); + } + + private static File createNewBasePersistDirectory() + { + return FileUtils.createTempDir("druid-batch-persist"); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java index b5a8d736301a..671556975a96 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TaskToolboxTest.java @@ -112,6 +112,7 @@ public void setUp() throws IOException null, null, null, + false, false ), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 46fa4fac6a1c..113d41a864af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1516,6 +1516,7 @@ public SegmentPublishResult announceHistoricalSegments( null, null, null, + false, false ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 03acabd620b9..a958ee66203f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -1298,7 +1298,7 @@ public List getLocations() ); return new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), null, createActionClient(task), null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c4faa5b2754e..330ccbf64945 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -1747,7 +1747,7 @@ private static class TestTaskToolbox extends TaskToolbox ) { super( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), null, taskActionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java index caaeea253c73..97d6df15b328 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopTaskTest.java @@ -117,6 +117,7 @@ public TaskStatus runTask(TaskToolbox toolbox) null, null, null, + false, false )).once(); EasyMock.replay(toolbox); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 4123688dc479..c42bf20dad2a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -314,7 +314,7 @@ public ListenableFuture run(Task task) ); final TaskToolbox box = new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), taskActionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 571566223a0e..0a2e34d7d6d7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -898,6 +898,7 @@ private TaskToolbox makeToolbox( null, null, null, + false, false ); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, mdc); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index 2a342b7ccdd3..6e963e6f9136 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -105,7 +105,8 @@ public Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { return Appenderators.createOffline( @@ -118,7 +119,8 @@ public Appenderator createOfflineAppenderatorForTask( indexIO, indexMerger, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 23b5b9d53ef9..a0d544968702 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -241,6 +241,7 @@ public void setUpAbstractParallelIndexSupervisorTaskTest() throws IOException null, null, ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)), + false, false ), null @@ -597,7 +598,7 @@ public Set getPublishedSegments(Task task) public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO) { - final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false); + final TaskConfig taskConfig = new TaskConfig(null, null, null, null, null, false, null, null, null, false, false); objectMapper.setInjectableValues( new InjectableValues.Std() @@ -632,7 +633,7 @@ public void prepareObjectMapper(ObjectMapper objectMapper, IndexIO indexIO) protected TaskToolbox createTaskToolbox(Task task, TaskActionClient actionClient) throws IOException { return new TaskToolbox( - new TaskConfig(null, null, null, null, null, false, null, null, null, false), + new TaskConfig(null, null, null, null, null, false, null, null, null, false, false), new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false), actionClient, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java index 120c7090d93d..c05cdb8a90b6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunnerTest.java @@ -90,6 +90,7 @@ public void setup() throws IOException null, null, null, + false, false ); final ServiceEmitter emitter = new NoopServiceEmitter(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6e824721a9ec..7c41ba984a77 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -600,7 +600,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( new TaskAuditLogConfig(true) ); File tmpDir = temporaryFolder.newFolder(); - taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false); + taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null, null, false, false); return new TaskToolboxFactory( taskConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java index 1bb33f0b457c..2dd94e8c2875 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskManagerTest.java @@ -89,6 +89,7 @@ private WorkerTaskManager createWorkerTaskManager() null, null, null, + false, false ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java index 2a6e79316328..c1845097996d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -163,6 +163,7 @@ private WorkerTaskMonitor createTaskMonitor() null, null, null, + false, false ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java index a10776bdf617..9736fc72dba5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerAutoCleanupTest.java @@ -92,6 +92,7 @@ public Period getIntermediaryPartitionTimeout() null, null, ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java index 162993218fe2..6e52b30b9eb7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManagerManualAddAndDeleteTest.java @@ -75,6 +75,7 @@ public void setup() throws IOException null, null, ImmutableList.of(new StorageLocationConfig(intermediarySegmentsLocation, 600L, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java index ea6fb5fbfe0d..f110cc20b1e9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleDataSegmentPusherTest.java @@ -71,6 +71,7 @@ public void setup() throws IOException null, null, ImmutableList.of(new StorageLocationConfig(temporaryFolder.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java index e4327d3baed3..54a6b020d86b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/shuffle/ShuffleResourceTest.java @@ -97,6 +97,7 @@ public Period getIntermediaryPartitionTimeout() null, null, ImmutableList.of(new StorageLocationConfig(tempDir.newFolder(), null, null)), + false, false ); final IndexingServiceClient indexingServiceClient = new NoopIndexingServiceClient() diff --git a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java index 9c59387d102d..29a8986a04cf 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/FireHydrant.java @@ -213,7 +213,7 @@ public String toString() // Do not include IncrementalIndex in toString as AbstractIndex.toString() actually prints // all the rows in the index return "FireHydrant{" + - "queryable=" + adapter.get().getId() + + "queryable=" + (adapter.get() == null ? "null" : adapter.get().getId()) + ", count=" + count + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java index 14796df44a2a..1d7da70837a7 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderator.java @@ -214,6 +214,15 @@ ListenableFuture push( */ void closeNow(); + /** + * Flag to tell internals whether appenderator is working on behalf of a real time task. + * This is to manage certain aspects as needed. For example, for batch, non-real time tasks, + * physical segments (i.e. hydrants) do not need to memory map their persisted + * files. In this case, the code will avoid memory mapping them thus ameliorating the occurance + * of OOMs. + */ + boolean isRealTime(); + /** * Result of {@link Appenderator#add} containing following information * - {@link SegmentIdWithShardSpec} - identifier of segment to which rows are being added @@ -242,7 +251,8 @@ SegmentIdWithShardSpec getSegmentIdentifier() return segmentIdentifier; } - int getNumRowsInSegment() + @VisibleForTesting + public int getNumRowsInSegment() { return numRowsInSegment; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 9121e0aae459..e6cd9c52aa92 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.commons.lang.mutable.MutableLong; import org.apache.druid.client.cache.Cache; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; @@ -59,6 +60,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -70,6 +72,7 @@ import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.joda.time.Interval; @@ -83,7 +86,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -161,6 +166,19 @@ public class AppenderatorImpl implements Appenderator private volatile Throwable persistError; + private final boolean isRealTime; + /** + * Use next Map to store metadata (File, SegmentId) for a hydrant for batch appenderator + * in order to facilitate the mapping of the QueryableIndex associated with a given hydrant + * at merge time. This is necessary since batch appenderator will not map the QueryableIndex + * at persist time in order to minimize its memory footprint. This has to be synchronized since the + * map may be accessed from multiple threads. + * Use {@link IdentityHashMap} to better reflect the fact that the key needs to be interpreted + * with reference semantics. + */ + private final Map> persistedHydrantMetadata = + Collections.synchronizedMap(new IdentityHashMap<>()); + /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. * @@ -183,7 +201,8 @@ public class AppenderatorImpl implements Appenderator IndexMerger indexMerger, Cache cache, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean isRealTime ) { this.myId = id; @@ -199,6 +218,7 @@ public class AppenderatorImpl implements Appenderator this.texasRanger = sinkQuerySegmentWalker; this.rowIngestionMeters = Preconditions.checkNotNull(rowIngestionMeters, "rowIngestionMeters"); this.parseExceptionHandler = Preconditions.checkNotNull(parseExceptionHandler, "parseExceptionHandler"); + this.isRealTime = isRealTime; if (sinkQuerySegmentWalker == null) { this.sinkTimeline = new VersionedIntervalTimeline<>( @@ -339,7 +359,8 @@ public AppenderatorAddResult add( if (sinkEntry != null) { bytesToBePersisted += sinkEntry.getBytesInMemory(); if (sinkEntry.swappable()) { - // After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory. + // After swapping the sink, we use memory mapped segment instead (but only for real time appenderators!). + // However, the memory mapped segment still consumes memory. // These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant()); bytesCurrentlyInMemory.addAndGet(memoryStillInUse); @@ -352,10 +373,14 @@ public AppenderatorAddResult add( // This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion) final String alertMessage = StringUtils.format( "Task has exceeded safe estimated heap usage limits, failing " - + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])", + + "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])" + + "(bytesCurrentlyInMemory: [%d] - bytesToBePersisted: [%d] > maxBytesTuningConfig: [%d])", sinks.size(), sinks.values().stream().mapToInt(Iterables::size).sum(), - getTotalRowCount() + getTotalRowCount(), + bytesCurrentlyInMemory.get(), + bytesToBePersisted, + maxBytesTuningConfig ); final String errorMessage = StringUtils.format( "%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to " @@ -530,6 +555,9 @@ public void clear() throws InterruptedException futures.add(abandonSegment(entry.getKey(), entry.getValue(), true)); } + // Re-initialize hydrant map: + persistedHydrantMetadata.clear(); + // Await dropping. Futures.allAsList(futures).get(); } @@ -558,6 +586,9 @@ public ListenableFuture persistAll(@Nullable final Committer committer) final List> indexesToPersist = new ArrayList<>(); int numPersistedRows = 0; long bytesPersisted = 0L; + MutableLong totalHydrantsCount = new MutableLong(); + MutableLong totalHydrantsPersisted = new MutableLong(); + final long totalSinks = sinks.size(); for (Map.Entry entry : sinks.entrySet()) { final SegmentIdWithShardSpec identifier = entry.getKey(); final Sink sink = entry.getValue(); @@ -565,21 +596,26 @@ public ListenableFuture persistAll(@Nullable final Committer committer) throw new ISE("No sink for identifier: %s", identifier); } final List hydrants = Lists.newArrayList(sink); + totalHydrantsCount.add(hydrants.size()); currentHydrants.put(identifier.toString(), hydrants.size()); numPersistedRows += sink.getNumRowsInMemory(); bytesPersisted += sink.getBytesInMemory(); final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); + // gather hydrants that have not been persisted: for (FireHydrant hydrant : hydrants.subList(0, limit)) { if (!hydrant.hasSwapped()) { log.debug("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier); indexesToPersist.add(Pair.of(hydrant, identifier)); + totalHydrantsPersisted.add(1); } } if (sink.swappable()) { + // It is swappable. Get the old one to persist it and create a new one: indexesToPersist.add(Pair.of(sink.swap(), identifier)); + totalHydrantsPersisted.add(1); } } log.debug("Submitting persist runnable for dataSource[%s]", schema.getDataSource()); @@ -587,6 +623,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) final Object commitMetadata = committer == null ? null : committer.getMetadata(); final Stopwatch runExecStopwatch = Stopwatch.createStarted(); final Stopwatch persistStopwatch = Stopwatch.createStarted(); + AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows); final ListenableFuture future = persistExecutor.submit( new Callable() { @@ -640,6 +677,14 @@ public Object call() throws IOException .distinct() .collect(Collectors.joining(", ")) ); + log.info( + "Persisted stats: processed rows: [%d], persisted rows[%d], sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across sinks): [%d]", + rowIngestionMeters.getProcessed(), + totalPersistedRows.get(), + totalSinks, + totalHydrantsCount.longValue(), + totalHydrantsPersisted.longValue() + ); // return null if committer is null return commitMetadata; @@ -682,6 +727,7 @@ public ListenableFuture push( ) { final Map theSinks = new HashMap<>(); + AtomicLong pushedHydrantsCount = new AtomicLong(); for (final SegmentIdWithShardSpec identifier : identifiers) { final Sink sink = sinks.get(identifier); if (sink == null) { @@ -691,6 +737,8 @@ public ListenableFuture push( if (sink.finishWriting()) { totalRows.addAndGet(-sink.getNumRows()); } + // count hydrants for stats: + pushedHydrantsCount.addAndGet(Iterables.size(sink)); } return Futures.transform( @@ -700,6 +748,10 @@ public ListenableFuture push( (Function) commitMetadata -> { final List dataSegments = new ArrayList<>(); + log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]", + rowIngestionMeters.getProcessed(), theSinks.size(), pushedHydrantsCount.get() + ); + log.debug( "Building and pushing segments: %s", theSinks.keySet().stream().map(SegmentIdWithShardSpec::toString).collect(Collectors.joining(", ")) @@ -723,6 +775,8 @@ public ListenableFuture push( } } + log.info("Push complete..."); + return new SegmentsAndCommitMetadata(dataSegments, commitMetadata); }, pushExecutor @@ -813,6 +867,34 @@ private DataSegment mergeAndPush( Closer closer = Closer.create(); try { for (FireHydrant fireHydrant : sink) { + + // if batch, swap/persist did not memory map the incremental index, we need it mapped now: + if (!isRealTime()) { + + // sanity + Pair persistedMetadata = persistedHydrantMetadata.get(fireHydrant); + if (persistedMetadata == null) { + throw new ISE("Persisted metadata for batch hydrant [%s] is null!", fireHydrant); + } + + File persistedFile = persistedMetadata.lhs; + SegmentId persistedSegmentId = persistedMetadata.rhs; + + // sanity: + if (persistedFile == null) { + throw new ISE("Persisted file for batch hydrant [%s] is null!", fireHydrant); + } else if (persistedSegmentId == null) { + throw new ISE( + "Persisted segmentId for batch hydrant in file [%s] is null!", + persistedFile.getPath() + ); + } + fireHydrant.swapSegment(new QueryableIndexSegment( + indexIO.loadIndex(persistedFile), + persistedSegmentId + )); + } + Pair segmentAndCloseable = fireHydrant.getAndIncrementSegment(); final QueryableIndex queryableIndex = segmentAndCloseable.lhs.asQueryableIndex(); log.debug("Segment[%s] adding hydrant[%s]", identifier, fireHydrant); @@ -860,6 +942,15 @@ private DataSegment mergeAndPush( 5 ); + if (!isRealTime()) { + // Drop the queryable indexes behind the hydrants... they are not needed anymore and their + // mapped file references + // can generate OOMs during merge if enough of them are held back... + for (FireHydrant fireHydrant : sink) { + fireHydrant.swapSegment(null); + } + } + final long pushFinishTime = System.nanoTime(); objectMapper.writeValue(descriptorFile, segment); @@ -986,6 +1077,13 @@ public void closeNow() } } + @Override + public boolean isRealTime() + { + return isRealTime; + } + + private void lockBasePersistDirectory() { if (basePersistDirLock == null) { @@ -1303,6 +1401,8 @@ public Void apply(@Nullable Object input) cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); } hydrant.swapSegment(null); + // remove hydrant from persisted metadata: + persistedHydrantMetadata.remove(hydrant); } if (removeOnDiskData) { @@ -1417,9 +1517,15 @@ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec id numRows ); - indexToPersist.swapSegment( - new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()) - ); + // Map only when this appenderator is being driven by a real time task: + Segment segmentToSwap = null; + if (isRealTime()) { + segmentToSwap = new QueryableIndexSegment(indexIO.loadIndex(persistedFile), indexToPersist.getSegmentId()); + } else { + // remember file path & segment id to rebuild the queryable index for merge: + persistedHydrantMetadata.put(indexToPersist, new Pair<>(persistedFile, indexToPersist.getSegmentId())); + } + indexToPersist.swapSegment(segmentToSwap); return numRows; } @@ -1457,10 +1563,15 @@ private int calculateMMappedHydrantMemoryInUsed(FireHydrant hydrant) // These calculations are approximated from actual heap dumps. // Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment, // Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.) - return Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT + - (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) + - (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + - ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER; + int total; + total = Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT; + if (isRealTime()) { + // for real time add references to byte memory mapped references.. + total += (hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) + + (hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) + + ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER; + } + return total; } private int calculateSinkMemoryInUsed(Sink sink) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index c59e4e053e50..ff8799daeba4 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -89,7 +89,8 @@ public static Appenderator createRealtime( indexMerger, cache, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + true ); } @@ -103,7 +104,8 @@ public static Appenderator createOffline( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { return new AppenderatorImpl( @@ -119,7 +121,8 @@ public static Appenderator createOffline( indexMerger, null, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex // This is a task config (default false) to fallback to "old" code in case of bug with the new memory optimization code ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java index 76c64d26a410..c5b22cfb1b57 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorsManager.java @@ -98,7 +98,8 @@ Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ); /** diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java index 7a0f1dc6fcd9..8abd0c297672 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactory.java @@ -73,7 +73,8 @@ public Appenderator build(DataSchema schema, RealtimeTuningConfig config, FireDe false, config.isReportParseExceptions() ? 0 : Integer.MAX_VALUE, 0 - ) + ), + false ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java index 87de2449a479..f1e2f3c91347 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DummyForInjectionAppenderatorsManager.java @@ -90,7 +90,8 @@ public Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { throw new UOE(ERROR_MSG); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index 7fa4f4c7c9f0..88a4f5720c18 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -122,7 +122,8 @@ public Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { // CompactionTask does run multiple sub-IndexTasks, so we allow multiple batch appenderators @@ -139,7 +140,8 @@ public Appenderator createOfflineAppenderatorForTask( indexIO, indexMerger, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); return batchAppenderator; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 369709882a76..3780c3735753 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -189,7 +189,8 @@ public Appenderator createRealtimeAppenderatorForTask( wrapIndexMerger(indexMerger), cache, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + true ); datasourceBundle.addAppenderator(taskId, appenderator); @@ -208,7 +209,8 @@ public Appenderator createOfflineAppenderatorForTask( IndexIO indexIO, IndexMerger indexMerger, RowIngestionMeters rowIngestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + boolean batchMemoryMappedIndex ) { synchronized (this) { @@ -227,7 +229,8 @@ public Appenderator createOfflineAppenderatorForTask( indexIO, wrapIndexMerger(indexMerger), rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + batchMemoryMappedIndex ); datasourceBundle.addAppenderator(taskId, appenderator); return appenderator; diff --git a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java index 4f288426e50f..464141a42a11 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/FireHydrantTest.java @@ -210,4 +210,11 @@ public void testGetSegmentForQueryButNotAbleToAcquireReferencesSegmentClosed() Function.identity() ); } + + @Test + public void testToStringWhenSwappedWithNull() + { + hydrant.swapSegment(null); + hydrant.toString(); + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 4f9cd3c34158..408aa97e400b 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -545,5 +545,11 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable