diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 2c09b64e15f0..ed2e594d425b 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -115,7 +115,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |Field|Type|Description|Required| |-----|----|-----------|--------| |`type`|String|The indexing task type, this should always be `kafka`.|yes| -|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)| +|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists). Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)| +|`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. Normally this is computed internally and user does not need to set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). |no (default == One-sixth of max JVM memory)| |`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index 743ead5b071e..f0f3040cb989 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -154,7 +154,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |workingPath|String|The working path to use for intermediate results (results between Hadoop jobs).|no (default == '/tmp/druid-indexing')| |version|String|The version of created segments. Ignored for HadoopIndexTask unless useExplicitVersion is set to true|no (default == datetime that indexing starts at)| |partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See 'Partitioning specification' below.|no (default == 'hashed')| -|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size.|no (default == 75000)| +|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)| +|maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).|no (default == One-sixth of max JVM memory)| |leaveIntermediate|Boolean|Leave behind intermediate files (for debugging) in the workingPath when a job completes, whether it passes or fails.|no (default == false)| |cleanupOnFailure|Boolean|Clean up intermediate files when a job fails (unless leaveIntermediate is on).|no (default == true)| |overwriteFiles|Boolean|Override existing files found during indexing.|no (default == false)| diff --git a/docs/content/ingestion/stream-pull.md b/docs/content/ingestion/stream-pull.md index e57975276871..1bd33caad821 100644 --- a/docs/content/ingestion/stream-pull.md +++ b/docs/content/ingestion/stream-pull.md @@ -92,7 +92,7 @@ The property `druid.realtime.specFile` has the path of a file (absolute or relat }, "tuningConfig": { "type" : "realtime", - "maxRowsInMemory": 75000, + "maxRowsInMemory": 1000000, "intermediatePersistPeriod": "PT10m", "windowPeriod": "PT10m", "basePersistDirectory": "\/tmp\/realtime\/basePersist", @@ -141,7 +141,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |Field|Type|Description|Required| |-----|----|-----------|--------| |type|String|This should always be 'realtime'.|no| -|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)| +|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 1000000)| +|maxBytesInMemory|Long|The maximum number of bytes to keep in memory to aggregate before persisting. This is used to manage the required JVM heap size.|no (default == One-sixth of max JVM memory)| |windowPeriod|ISO 8601 Period String|The amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.|no (default == PT10m)| |intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|no (default == PT10m)| |basePersistDirectory|String|The directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.|no (default == java tmp dir)| @@ -287,7 +288,8 @@ The following table summarizes constraints between settings in the spec file for |segmentGranularity| Time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than queryGranularity| |queryGranularity| Time granularity (minute, hour, day, week, month) for rollup | less than segmentGranularity| minute, hour, day, week, month | |intermediatePersistPeriod| The max time (ISO8601 Period) between flushes of ingested rows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory | -|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | +|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set| number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | +|maxBytesInMemory| The number of bytes to keep in memory before a flush to disk. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)| number of un-persisted post-aggregation bytes in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | The normal, expected use cases have the following overall constraints: `intermediatePersistPeriod ≤ windowPeriod < segmentGranularity` and `queryGranularity ≤ segmentGranularity` diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 4d424efcadb4..22e4cfddae22 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -77,7 +77,7 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed "tuningConfig" : { "type" : "index", "targetPartitionSize" : 5000000, - "maxRowsInMemory" : 75000 + "maxRowsInMemory" : 1000000 } } } @@ -137,7 +137,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |--------|-----------|-------|---------| |type|The task type, this should always be "index".|none|yes| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| -|maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no| +|maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no| +|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |maxTotalRows|Total number of rows in segments waiting for being published. Used in determining when intermediate publish should occur.|150000|no| |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| diff --git a/examples/conf-quickstart/tranquility/kafka.json b/examples/conf-quickstart/tranquility/kafka.json index ffe537870d56..38858d257845 100644 --- a/examples/conf-quickstart/tranquility/kafka.json +++ b/examples/conf-quickstart/tranquility/kafka.json @@ -53,7 +53,6 @@ }, "tuningConfig" : { "type" : "realtime", - "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } diff --git a/examples/conf-quickstart/tranquility/server.json b/examples/conf-quickstart/tranquility/server.json index cbc1d14e7724..a17f71665502 100644 --- a/examples/conf-quickstart/tranquility/server.json +++ b/examples/conf-quickstart/tranquility/server.json @@ -53,7 +53,6 @@ }, "tuningConfig" : { "type" : "realtime", - "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } diff --git a/examples/conf/tranquility/kafka.json b/examples/conf/tranquility/kafka.json index b0adb3248931..fb7c9aeb4ca3 100644 --- a/examples/conf/tranquility/kafka.json +++ b/examples/conf/tranquility/kafka.json @@ -53,7 +53,6 @@ }, "tuningConfig" : { "type" : "realtime", - "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } diff --git a/examples/conf/tranquility/server.json b/examples/conf/tranquility/server.json index dabeed18228b..ff810f72fbb4 100644 --- a/examples/conf/tranquility/server.json +++ b/examples/conf/tranquility/server.json @@ -53,7 +53,6 @@ }, "tuningConfig" : { "type" : "realtime", - "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java index 46b5e4d01e73..816cfd4f2ded 100644 --- a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -222,6 +222,7 @@ public void setUp() throws Exception null, null, null, + null, false, false, false, diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 4c0277646368..a31fb1a92b31 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -38,6 +38,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; private final int maxRowsInMemory; + private final long maxBytesInMemory; private final int maxRowsPerSegment; private final Period intermediatePersistPeriod; private final File basePersistDirectory; @@ -58,6 +59,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig @JsonCreator public KafkaTuningConfig( @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @@ -80,6 +82,9 @@ public KafkaTuningConfig( this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaults.getIntermediatePersistPeriod() : intermediatePersistPeriod; @@ -116,6 +121,7 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config) { return new KafkaTuningConfig( config.maxRowsInMemory, + config.maxBytesInMemory, config.maxRowsPerSegment, config.intermediatePersistPeriod, config.basePersistDirectory, @@ -140,6 +146,13 @@ public int getMaxRowsInMemory() return maxRowsInMemory; } + @Override + @JsonProperty + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @JsonProperty public int getMaxRowsPerSegment() { @@ -240,6 +253,7 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) { return new KafkaTuningConfig( maxRowsInMemory, + maxBytesInMemory, maxRowsPerSegment, intermediatePersistPeriod, dir, @@ -269,6 +283,7 @@ public boolean equals(Object o) KafkaTuningConfig that = (KafkaTuningConfig) o; return maxRowsInMemory == that.maxRowsInMemory && maxRowsPerSegment == that.maxRowsPerSegment && + maxBytesInMemory == that.maxBytesInMemory && maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && handoffConditionTimeout == that.handoffConditionTimeout && @@ -289,6 +304,7 @@ public int hashCode() return Objects.hash( maxRowsInMemory, maxRowsPerSegment, + maxBytesInMemory, intermediatePersistPeriod, basePersistDirectory, maxPendingPersists, @@ -310,6 +326,7 @@ public String toString() return "KafkaTuningConfig{" + "maxRowsInMemory=" + maxRowsInMemory + ", maxRowsPerSegment=" + maxRowsPerSegment + + ", maxBytesInMemory=" + maxBytesInMemory + ", intermediatePersistPeriod=" + intermediatePersistPeriod + ", basePersistDirectory=" + basePersistDirectory + ", maxPendingPersists=" + maxPendingPersists + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index c796b3eb9b6c..53678810cdc3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -93,6 +93,7 @@ public KafkaSupervisorSpec( null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 4467a65d4e13..4fb2b2409b02 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.indexing.kafka.KafkaTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.segment.IndexSpec; import org.joda.time.Duration; @@ -40,6 +41,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig public KafkaSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -65,6 +67,7 @@ public KafkaSupervisorTuningConfig( { super( maxRowsInMemory, + maxBytesInMemory, maxRowsPerSegment, intermediatePersistPeriod, basePersistDirectory, @@ -131,6 +134,7 @@ public String toString() return "KafkaSupervisorTuningConfig{" + "maxRowsInMemory=" + getMaxRowsInMemory() + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + ", basePersistDirectory=" + getBasePersistDirectory() + ", maxPendingPersists=" + getMaxPendingPersists() + diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 511fb4da20fe..e3b15b0c1950 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1965,6 +1965,7 @@ private KafkaIndexTask createTask( { final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( 1000, + null, maxRowsPerSegment, new Period("P1Y"), null, @@ -2007,6 +2008,7 @@ private KafkaIndexTask createTask( { final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( 1000, + null, maxRowsPerSegment, new Period("P1Y"), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index 78916e7d27b7..04a07b81ed70 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -56,7 +56,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertNotNull(config.getBasePersistDirectory()); - Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); @@ -103,6 +103,7 @@ public void testCopyOf() { KafkaTuningConfig original = new KafkaTuningConfig( 1, + null, 2, new Period("PT3S"), new File("/tmp/xxx"), diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c44425c17e9b..cbea2344b22e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -185,6 +185,7 @@ public void setupTest() tuningConfig = new KafkaSupervisorTuningConfig( 1000, + null, 50000, new Period("P1Y"), new File("/test"), diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 34477c143727..36fe290d2a2d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -58,7 +58,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertNotNull(config.getBasePersistDirectory()); - Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index a997e40d2994..babc1bc515ce 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -43,7 +43,7 @@ public class HadoopTuningConfig implements TuningConfig private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); private static final Map> DEFAULT_SHARD_SPECS = ImmutableMap.of(); private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); - private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000; + private static final int DEFAULT_ROW_FLUSH_BOUNDARY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final boolean DEFAULT_USE_COMBINER = false; private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0; @@ -56,6 +56,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_SHARD_SPECS, DEFAULT_INDEX_SPEC, DEFAULT_ROW_FLUSH_BOUNDARY, + 0L, false, true, false, @@ -80,6 +81,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final Map> shardSpecs; private final IndexSpec indexSpec; private final int rowFlushBoundary; + private final long maxBytesInMemory; private final boolean leaveIntermediate; private final Boolean cleanupOnFailure; private final boolean overwriteFiles; @@ -102,6 +104,7 @@ public HadoopTuningConfig( final @JsonProperty("shardSpecs") Map> shardSpecs, final @JsonProperty("indexSpec") IndexSpec indexSpec, final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + final @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, final @JsonProperty("overwriteFiles") boolean overwriteFiles, @@ -129,6 +132,9 @@ public HadoopTuningConfig( this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemoryCOMPAT : maxRowsInMemory; + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure; this.overwriteFiles = overwriteFiles; @@ -190,6 +196,12 @@ public int getRowFlushBoundary() return rowFlushBoundary; } + @JsonProperty + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @JsonProperty public boolean isLeaveIntermediate() { @@ -288,6 +300,7 @@ public HadoopTuningConfig withWorkingPath(String path) shardSpecs, indexSpec, rowFlushBoundary, + maxBytesInMemory, leaveIntermediate, cleanupOnFailure, overwriteFiles, @@ -315,6 +328,7 @@ public HadoopTuningConfig withVersion(String ver) shardSpecs, indexSpec, rowFlushBoundary, + maxBytesInMemory, leaveIntermediate, cleanupOnFailure, overwriteFiles, @@ -342,6 +356,7 @@ public HadoopTuningConfig withShardSpecs(Map> specs specs, indexSpec, rowFlushBoundary, + maxBytesInMemory, leaveIntermediate, cleanupOnFailure, overwriteFiles, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index a848714c8eb9..077a642d44d0 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -51,6 +51,7 @@ import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.indexing.TuningConfigs; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.ShardSpec; @@ -288,6 +289,7 @@ private static IncrementalIndex makeIncrementalIndex( .setIndexSchema(indexSchema) .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) .setMaxRowCount(tuningConfig.getRowFlushBoundary()) + .setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) .buildOnheap(); if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) { diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 3970e23fc1b7..88a797cc4805 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -480,6 +480,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 8d656f20d4b8..7f1e76f547e9 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -198,6 +198,7 @@ public DetermineHashedPartitionsJobTest( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 908425198783..f96c5957db08 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -259,6 +259,7 @@ public DeterminePartitionsJobTest( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 2ed052eafec9..6678b13d075c 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -84,6 +84,7 @@ public void testHashedBucketSelection() ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(), specs), null, null, + null, false, false, false, @@ -160,6 +161,7 @@ public void testNoneShardSpecBucketSelection() ), null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java index 9e4a26a22b43..b389d8ecc91c 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java @@ -46,6 +46,7 @@ public void testSerde() throws Exception null, null, 100, + null, true, true, true, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 1b422b6c223e..af1d7d944b74 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -91,7 +91,7 @@ public class IndexGeneratorJobTest @Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " + "data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " + - "aggs={8}, datasourceName={9}, forceExtendableShardSpecs={10}") + "maxBytesInMemory={8}, aggs={9}, datasourceName={10}, forceExtendableShardSpecs={11}") public static Collection constructFeed() { final List baseConstructors = Arrays.asList( @@ -151,6 +151,7 @@ public static Collection constructFeed() null ), null, + null, aggs1, "website" }, @@ -198,6 +199,7 @@ public static Collection constructFeed() ) ), null, + null, aggs1, "website" }, @@ -246,6 +248,7 @@ public static Collection constructFeed() null ), null, + null, aggs1, "website" }, @@ -303,6 +306,7 @@ public static Collection constructFeed() ) ), null, + null, aggs1, "website" }, @@ -335,6 +339,7 @@ public static Collection constructFeed() null ), 1, // force 1 row max per index for easier testing + null, aggs2, "inherit_dims" }, @@ -367,6 +372,7 @@ public static Collection constructFeed() null ), 1, // force 1 row max per index for easier testing + null, aggs2, "inherit_dims2" } @@ -398,6 +404,7 @@ public static Collection constructFeed() private final String inputFormatName; private final InputRowParser inputRowParser; private final Integer maxRowsInMemory; + private final Long maxBytesInMemory; private final AggregatorFactory[] aggs; private final String datasourceName; private final boolean forceExtendableShardSpecs; @@ -416,6 +423,7 @@ public IndexGeneratorJobTest( String inputFormatName, InputRowParser inputRowParser, Integer maxRowsInMemory, + Long maxBytesInMemory, AggregatorFactory[] aggs, String datasourceName, boolean forceExtendableShardSpecs @@ -429,6 +437,7 @@ public IndexGeneratorJobTest( this.inputFormatName = inputFormatName; this.inputRowParser = inputRowParser; this.maxRowsInMemory = maxRowsInMemory; + this.maxBytesInMemory = maxBytesInMemory; this.aggs = aggs; this.datasourceName = datasourceName; this.forceExtendableShardSpecs = forceExtendableShardSpecs; @@ -511,6 +520,7 @@ public void setUp() throws Exception null, null, maxRowsInMemory, + maxBytesInMemory, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index c768e2c8e10d..a7613249d050 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -108,6 +108,7 @@ public void setup() throws Exception null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index b4caeed21f43..a24d2615dbcb 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -61,6 +61,7 @@ public class GranularityPathSpecTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index c8d763544340..e673c3e661e7 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -200,6 +200,7 @@ public InputStream openStream() throws IOException null, null, null, + null, false, false, false, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 06c6069ae009..800c0300d43e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -37,7 +37,7 @@ @JsonTypeName("realtime_appenderator") public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int defaultMaxRowsInMemory = 75000; + private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final int defaultMaxRowsPerSegment = 5_000_000; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final int defaultMaxPendingPersists = 0; @@ -52,8 +52,11 @@ private static File createNewBasePersistDirectory() return Files.createTempDir(); } + + private final int maxRowsInMemory; private final int maxRowsPerSegment; + private final long maxBytesInMemory; private final Period intermediatePersistPeriod; private final File basePersistDirectory; private final int maxPendingPersists; @@ -73,6 +76,7 @@ private static File createNewBasePersistDirectory() public RealtimeAppenderatorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @@ -89,6 +93,9 @@ public RealtimeAppenderatorTuningConfig( { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; this.maxRowsPerSegment = maxRowsPerSegment == null ? defaultMaxRowsPerSegment : maxRowsPerSegment; + // initializing this to 0, it will be lazily intialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -127,6 +134,12 @@ public int getMaxRowsInMemory() return maxRowsInMemory; } + @Override + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @JsonProperty public int getMaxRowsPerSegment() { @@ -217,6 +230,7 @@ public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) return new RealtimeAppenderatorTuningConfig( maxRowsInMemory, maxRowsPerSegment, + maxBytesInMemory, intermediatePersistPeriod, dir, maxPendingPersists, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 6b3cfa978663..91d8e6b133bf 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -43,6 +43,7 @@ import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; @@ -105,6 +106,7 @@ public Plumber findPlumber( config.getShardSpec(), version, config.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), config.isReportParseExceptions() ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 47bae5290e0a..554969ca993c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -76,9 +76,9 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorConfig; -import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentIdentifier; @@ -1231,7 +1231,6 @@ public boolean isAppendToExisting() @JsonTypeName("index") public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000; private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; @@ -1244,6 +1243,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final Integer targetPartitionSize; private final int maxRowsInMemory; + private final long maxBytesInMemory; private final Long maxTotalRows; private final Integer numShards; private final IndexSpec indexSpec; @@ -1276,6 +1276,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi public IndexTuningConfig( @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED @JsonProperty("numShards") @Nullable Integer numShards, @@ -1297,6 +1298,7 @@ public IndexTuningConfig( this( targetPartitionSize, maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, + maxBytesInMemory != null ? maxBytesInMemory : 0, maxTotalRows, numShards, indexSpec, @@ -1315,12 +1317,13 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @Nullable Integer targetPartitionSize, @Nullable Integer maxRowsInMemory, + @Nullable Long maxBytesInMemory, @Nullable Long maxTotalRows, @Nullable Integer numShards, @Nullable IndexSpec indexSpec, @@ -1342,7 +1345,10 @@ private IndexTuningConfig( ); this.targetPartitionSize = initializeTargetPartitionSize(numShards, targetPartitionSize); - this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; + this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows); this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; @@ -1396,6 +1402,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) return new IndexTuningConfig( targetPartitionSize, maxRowsInMemory, + maxBytesInMemory, maxTotalRows, numShards, indexSpec, @@ -1425,6 +1432,13 @@ public int getMaxRowsInMemory() return maxRowsInMemory; } + @JsonProperty + @Override + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @JsonProperty public Long getMaxTotalRows() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 5e09e978fb32..6da9807536e0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -711,6 +711,7 @@ public void testNoReportParseExceptions() throws Exception ) ); + // Wait for the task to finish. final TaskStatus taskStatus = statusFuture.get(); Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); @@ -1245,6 +1246,7 @@ private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( null, null, null, + null, reportParseExceptions, handoffTimeout, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index cefe50155295..ebac68e8b88a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -247,6 +247,7 @@ private static IndexTuningConfig createTuningConfig() 1000000L, null, null, + null, new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 6090e9012d54..b6d72494c59a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -631,7 +631,7 @@ public void testWithSmallMaxTotalRows() throws Exception Granularities.MINUTE, null ), - createTuningConfig(2, 2, 2L, null, false, false, true), + createTuningConfig(2, 2, null, 2L, null, false, false, true), false ), null, @@ -675,7 +675,7 @@ public void testPerfectRollup() throws Exception true, null ), - createTuningConfig(3, 2, 2L, null, false, true, true), + createTuningConfig(3, 2, null, 2L, null, false, true, true), false ), null, @@ -718,7 +718,7 @@ public void testBestEffortRollup() throws Exception true, null ), - createTuningConfig(3, 2, 2L, null, false, false, true), + createTuningConfig(3, 2, null, 2L, null, false, false, true), false ), null, @@ -790,7 +790,7 @@ public void testIgnoreParseException() throws Exception 0 ), null, - createTuningConfig(2, null, null, null, false, false, false), // ignore parse exception, + createTuningConfig(2, null, null, null, null, false, false, false), // ignore parse exception, false ); @@ -842,7 +842,7 @@ public void testReportParseException() throws Exception 0 ), null, - createTuningConfig(2, null, null, null, false, false, true), // report parse exception + createTuningConfig(2, null, null, null, null, false, false, true), // report parse exception false ); @@ -894,6 +894,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, null, null, + null, indexSpec, null, true, @@ -1012,6 +1013,7 @@ public void testMultipleParseExceptionsFailure() throws Exception null, null, null, + null, indexSpec, null, true, @@ -1117,6 +1119,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc null, null, null, + null, indexSpec, null, true, @@ -1245,7 +1248,7 @@ public void testCsvWithHeaderOfEmptyColumns() throws Exception 0 ), null, - createTuningConfig(2, 1, null, null, false, true, true), // report parse exception + createTuningConfig(2, 1, null, null, null, false, true, true), // report parse exception false ); @@ -1314,7 +1317,7 @@ public void testCsvWithHeaderOfEmptyTimestamp() throws Exception 0 ), null, - createTuningConfig(2, null, null, null, false, false, true), // report parse exception + createTuningConfig(2, null, null, null, null, false, false, true), // report parse exception false ); @@ -1553,6 +1556,7 @@ private static IndexTuningConfig createTuningConfig( targetPartitionSize, 1, null, + null, numShards, forceExtendableShardSpecs, forceGuaranteedRollup, @@ -1563,6 +1567,7 @@ private static IndexTuningConfig createTuningConfig( private static IndexTuningConfig createTuningConfig( Integer targetPartitionSize, Integer maxRowsInMemory, + Long maxBytesInMemory, Long maxTotalRows, Integer numShards, boolean forceExtendableShardSpecs, @@ -1573,6 +1578,7 @@ private static IndexTuningConfig createTuningConfig( return new IndexTask.IndexTuningConfig( targetPartitionSize, maxRowsInMemory, + maxBytesInMemory, maxTotalRows, null, numShards, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index e7e44e4d8419..9d02ed1c63ee 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -904,6 +904,7 @@ private RealtimeIndexTask makeRealtimeTask( ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, + null, new Period("P1Y"), new Period("PT10M"), null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 589dcf3af913..545bb9ec08e7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -108,7 +108,7 @@ public void testIndexTaskTuningConfigDefaults() throws Exception Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec()); Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod()); Assert.assertEquals(0, tuningConfig.getMaxPendingPersists()); - Assert.assertEquals(75000, tuningConfig.getMaxRowsInMemory()); + Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory()); Assert.assertEquals(null, tuningConfig.getNumShards()); Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize()); } @@ -195,6 +195,7 @@ public void testIndexTaskSerde() throws Exception 10000, 10, null, + null, 9999, null, indexSpec, @@ -280,6 +281,7 @@ public void testIndexTaskwithResourceSerde() throws Exception null, null, null, + null, indexSpec, 3, true, @@ -537,6 +539,7 @@ public Plumber findPlumber( new RealtimeTuningConfig( 1, + null, new Period("PT10M"), null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index bd23760153cb..091dff01dbaa 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -674,6 +674,7 @@ public void testIndexTask() throws Exception null, null, null, + null, indexSpec, 3, true, @@ -752,6 +753,7 @@ public void testIndexTaskFailure() throws Exception null, null, null, + null, indexSpec, 3, true, @@ -1137,6 +1139,7 @@ public void testResumeTasks() throws Exception null, null, null, + null, indexSpec, null, false, @@ -1260,6 +1263,7 @@ private RealtimeIndexTask newRealtimeIndexTask() ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, + null, new Period("P1Y"), null, //default window period of 10 minutes null, // base persist dir ignored by Realtime Index task diff --git a/processing/src/main/java/io/druid/segment/DimensionIndexer.java b/processing/src/main/java/io/druid/segment/DimensionIndexer.java index 444703abf659..4b762e5ba8f9 100644 --- a/processing/src/main/java/io/druid/segment/DimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DimensionIndexer.java @@ -127,6 +127,14 @@ public interface DimensionIndexer */ EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions); + /** + * Gives the estimated size in bytes for the given key + * + * @param key dimension value array from a TimeAndDims key + * + * @return the estimated size in bytes of the key + */ + long estimateEncodedKeyComponentSize(EncodedKeyComponentType key); /** * Given an encoded value that was ordered by associated actual value, return the equivalent diff --git a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java index 7449e75df7a1..423f3a6424c0 100644 --- a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java @@ -49,6 +49,12 @@ public Double processRowValsToUnsortedEncodedKeyComponent(Object dimValues, bool return ret == null ? DimensionHandlerUtils.ZERO_DOUBLE : ret; } + @Override + public long estimateEncodedKeyComponentSize(Double key) + { + return Double.BYTES; + } + @Override public Double getUnsortedEncodedValueFromSorted(Double sortedIntermediateValue) { diff --git a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java index cdcc43188972..adca2802085f 100644 --- a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java @@ -50,6 +50,12 @@ public Float processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boole return ret == null ? DimensionHandlerUtils.ZERO_FLOAT : ret; } + @Override + public long estimateEncodedKeyComponentSize(Float key) + { + return Float.BYTES; + } + @Override public Float getUnsortedEncodedValueFromSorted(Float sortedIntermediateValue) { diff --git a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java index 387fdf4d357f..4b7df0a20b47 100644 --- a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java @@ -50,6 +50,12 @@ public Long processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolea return ret == null ? DimensionHandlerUtils.ZERO_LONG : ret; } + @Override + public long estimateEncodedKeyComponentSize(Long key) + { + return Long.BYTES; + } + @Override public Long getUnsortedEncodedValueFromSorted(Long sortedIntermediateValue) { diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index ee8920102129..df28aa43f2e8 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -270,6 +270,20 @@ public int[] processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boole return encodedDimensionValues; } + @Override + public long estimateEncodedKeyComponentSize(int[] key) + { + // string length is being accounted for each time they are referenced, based on dimension handler interface, + // even though they are stored just once. It may overestimate the size by a bit, but we wanted to leave + // more buffer to be safe + long estimatedSize = key.length * Integer.BYTES; + estimatedSize += Arrays.stream(key) + .filter(element -> dimLookup.getValue(element) != null) + .mapToLong(element -> dimLookup.getValue(element).length() * Character.BYTES) + .sum(); + return estimatedSize; + } + public Integer getSortedEncodedValueFromUnsorted(Integer unsortedIntermediateValue) { return sortedLookup().getSortedIdFromUnsortedId(unsortedIntermediateValue); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 315c9c58ad89..f496d53a7dec 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -93,6 +93,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -237,6 +238,7 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final List dimensionDescsList; private final Map columnCapabilities; private final AtomicInteger numEntries = new AtomicInteger(); + private final AtomicLong bytesInMemory = new AtomicLong(); // This is modified on add() in a critical section. private final ThreadLocal in = new ThreadLocal<>(); @@ -333,6 +335,7 @@ public static class Builder private boolean concurrentEventAdd; private boolean sortFacts; private int maxRowCount; + private long maxBytesInMemory; public Builder() { @@ -342,6 +345,7 @@ public Builder() concurrentEventAdd = false; sortFacts = true; maxRowCount = 0; + maxBytesInMemory = 0; } public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema) @@ -398,6 +402,13 @@ public Builder setMaxRowCount(final int maxRowCount) return this; } + //maxBytesInMemory only applies to OnHeapIncrementalIndex + public Builder setMaxBytesInMemory(final long maxBytesInMemory) + { + this.maxBytesInMemory = maxBytesInMemory; + return this; + } + public IncrementalIndex buildOnheap() { if (maxRowCount <= 0) { @@ -410,7 +421,8 @@ public IncrementalIndex buildOnheap() reportParseExceptions, concurrentEventAdd, sortFacts, - maxRowCount + maxRowCount, + maxBytesInMemory ); } @@ -457,6 +469,7 @@ protected abstract AddToFactsResult addToFacts( boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, + AtomicLong sizeInBytes, IncrementalIndexRow key, ThreadLocal rowContainer, Supplier rowSupplier, @@ -504,11 +517,17 @@ List getParseExceptionMessages() static class AddToFactsResult { private int rowCount; + private final long bytesInMemory; private List parseExceptionMessages; - AddToFactsResult(int rowCount, List parseExceptionMessages) + public AddToFactsResult( + int rowCount, + long bytesInMemory, + List parseExceptionMessages + ) { this.rowCount = rowCount; + this.bytesInMemory = bytesInMemory; this.parseExceptionMessages = parseExceptionMessages; } @@ -517,7 +536,12 @@ int getRowCount() return rowCount; } - List getParseExceptionMessages() + public long getBytesInMemory() + { + return bytesInMemory; + } + + public List getParseExceptionMessages() { return parseExceptionMessages; } @@ -571,6 +595,7 @@ public IncrementalIndexAddResult add(InputRow row, boolean skipMaxRowsInMemoryCh reportParseExceptions, row, numEntries, + bytesInMemory, incrementalIndexRowResult.getIncrementalIndexRow(), in, rowSupplier, @@ -582,7 +607,7 @@ public IncrementalIndexAddResult add(InputRow row, boolean skipMaxRowsInMemoryCh incrementalIndexRowResult.getParseExceptionMessages(), addToFactsResult.getParseExceptionMessages() ); - return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), parseException); + return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), addToFactsResult.getBytesInMemory(), parseException); } @VisibleForTesting @@ -597,6 +622,7 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) Object[] dims; List overflow = null; + long dimsKeySize = 0; List parseExceptionMessages = new ArrayList<>(); synchronized (dimensionDescs) { dims = new Object[dimensionDescs.size()]; @@ -635,7 +661,7 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) catch (ParseException pe) { parseExceptionMessages.add(pe.getMessage()); } - + dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey); // Set column capabilities as data is coming in if (!capabilities.hasMultipleValues() && dimsKey != null && @@ -679,10 +705,11 @@ IncrementalIndexRowResult toIncrementalIndexRow(InputRow row) if (row.getTimestamp() != null) { truncated = gran.bucketStart(row.getTimestamp()).getMillis(); } - IncrementalIndexRow incrementalIndexRow = new IncrementalIndexRow( + IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize( Math.max(truncated, minTimestamp), dims, - dimensionDescsList + dimensionDescsList, + dimsKeySize ); return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages); } @@ -740,6 +767,11 @@ public int size() return numEntries.get(); } + public long getBytesInMemory() + { + return bytesInMemory.get(); + } + private long getMinTimeMillis() { return getFacts().getMinTimeMillis(); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java index 06c537a0aa36..e76d3c15a1ef 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java @@ -26,16 +26,19 @@ public class IncrementalIndexAddResult { private final int rowCount; + private final long bytesInMemory; @Nullable private final ParseException parseException; public IncrementalIndexAddResult( int rowCount, + long bytesInMemory, @Nullable ParseException parseException ) { this.rowCount = rowCount; + this.bytesInMemory = bytesInMemory; this.parseException = parseException; } @@ -44,6 +47,11 @@ public int getRowCount() return rowCount; } + public long getBytesInMemory() + { + return bytesInMemory; + } + @Nullable public ParseException getParseException() { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRow.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRow.java index 3c0a5fdf8818..dd671267c4a6 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRow.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRow.java @@ -45,6 +45,7 @@ public final class IncrementalIndexRow * {@link IncrementalIndex.RollupFactsHolder} needs concurrent collections, that are not present in fastutil. */ private int rowIndex; + private long dimsKeySize; IncrementalIndexRow( long timestamp, @@ -68,6 +69,29 @@ public final class IncrementalIndexRow this.rowIndex = rowIndex; } + private IncrementalIndexRow( + long timestamp, + Object[] dims, + List dimensionDescsList, + long dimsKeySize + ) + { + this.timestamp = timestamp; + this.dims = dims; + this.dimensionDescsList = dimensionDescsList; + this.dimsKeySize = dimsKeySize; + } + + static IncrementalIndexRow createTimeAndDimswithDimsKeySize( + long timestamp, + Object[] dims, + List dimensionDescsList, + long dimsKeySize + ) + { + return new IncrementalIndexRow(timestamp, dims, dimensionDescsList, dimsKeySize); + } + public long getTimestamp() { return timestamp; @@ -88,6 +112,25 @@ void setRowIndex(int rowIndex) this.rowIndex = rowIndex; } + /** + * bytesInMemory estimates the size of IncrementalIndexRow key, it takes into account the timestamp(long), + * dims(Object Array) and dimensionDescsList(List). Each of these are calculated as follows: + *
    + *
  • timestamp : Long.BYTES + *
  • dims array : Integer.BYTES * array length + Long.BYTES (dims object) + dimsKeySize(passed via constructor) + *
  • dimensionDescList : Long.BYTES (shared pointer) + *
  • dimsKeySize : this value is passed in based on the key type (int, long, double, String etc.) + *
+ * + * @return long estimated bytesInMemory + */ + public long estimateBytesInMemory() + { + long sizeInBytes = Long.BYTES + Integer.BYTES * dims.length + Long.BYTES + Long.BYTES; + sizeInBytes += dimsKeySize; + return sizeInBytes; + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 216a4cd706b8..ae2c25f5f611 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -144,6 +145,7 @@ protected AddToFactsResult addToFacts( boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, + AtomicLong sizeInBytes, // ignored, added to make abstract class method impl happy IncrementalIndexRow key, ThreadLocal rowContainer, Supplier rowSupplier, @@ -238,7 +240,7 @@ protected AddToFactsResult addToFacts( } } rowContainer.set(null); - return new AddToFactsResult(numEntries.get(), new ArrayList<>()); + return new AddToFactsResult(numEntries.get(), 0, new ArrayList<>()); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 0445d77b529d..cbe9ba48bc9b 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -38,22 +38,29 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ public class OnheapIncrementalIndex extends IncrementalIndex { private static final Logger log = new Logger(OnheapIncrementalIndex.class); - + /** + * overhead per {@link ConcurrentHashMap.Node} or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object + */ + private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + Integer.BYTES; private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); private final FactsHolder facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); + private final long maxBytesPerRowForAggregators; protected final int maxRowCount; + protected final long maxBytesInMemory; private volatile Map selectors; private String outOfRowsReason = null; @@ -64,14 +71,44 @@ public class OnheapIncrementalIndex extends IncrementalIndex boolean reportParseExceptions, boolean concurrentEventAdd, boolean sortFacts, - int maxRowCount + int maxRowCount, + long maxBytesInMemory ) { super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); this.maxRowCount = maxRowCount; - + this.maxBytesInMemory = maxBytesInMemory == 0 ? -1 : maxBytesInMemory; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) : new PlainFactsHolder(sortFacts); + maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema); + } + + /** + * Gives estimated max size per aggregator. It is assumed that every aggregator will have enough overhead for its own + * object header and for a pointer to a selector. We are adding a overhead-factor for each object as additional 16 + * bytes. + * These 16 bytes or 128 bits is the object metadata for 64-bit JVM process and consists of: + *
    + *
  • Class pointer which describes the object type: 64 bits + *
  • Flags which describe state of the object including hashcode: 64 bits + *
      + * total size estimation consists of: + *
        + *
      • metrics length : Integer.BYTES * len + *
      • maxAggregatorIntermediateSize : getMaxIntermediateSize per aggregator + overhead-factor(16 bytes) + *
      + * + * @param incrementalIndexSchema + * + * @return long max aggregator size in bytes + */ + private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema incrementalIndexSchema) + { + long maxAggregatorIntermediateSize = Integer.BYTES * incrementalIndexSchema.getMetrics().length; + maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics()) + .mapToLong(aggregator -> aggregator.getMaxIntermediateSize() + Long.BYTES * 2) + .sum(); + return maxAggregatorIntermediateSize; } @Override @@ -109,6 +146,7 @@ protected AddToFactsResult addToFacts( boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, + AtomicLong sizeInBytes, IncrementalIndexRow key, ThreadLocal rowContainer, Supplier rowSupplier, @@ -132,14 +170,20 @@ protected AddToFactsResult addToFacts( concurrentSet(rowIndex, aggs); // Last ditch sanity checks - if (numEntries.get() >= maxRowCount + if ((numEntries.get() >= maxRowCount || (maxBytesInMemory > 0 && sizeInBytes.get() >= maxBytesInMemory)) && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX && !skipMaxRowsInMemoryCheck) { - throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); + throw new IndexSizeExceededException( + "Maximum number of rows [%d] or max size in bytes [%d] reached", + maxRowCount, + maxBytesInMemory + ); } final int prev = facts.putIfAbsent(key, rowIndex); if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { numEntries.incrementAndGet(); + long estimatedRowSize = estimateRowSizeInBytes(key, maxBytesPerRowForAggregators); + sizeInBytes.addAndGet(estimatedRowSize); } else { // We lost a race aggs = concurrentGet(prev); @@ -150,7 +194,25 @@ protected AddToFactsResult addToFacts( } } - return new AddToFactsResult(numEntries.get(), parseExceptionMessages); + return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), parseExceptionMessages); + } + + /** + * Gives an estimated size of row in bytes, it accounts for: + *
        + *
      • overhead per Map Entry + *
      • TimeAndDims key size + *
      • aggregator size + *
      + * + * @param key TimeAndDims key + * @param maxBytesPerRowForAggregators max size per aggregator + * + * @return estimated size of row + */ + private long estimateRowSizeInBytes(IncrementalIndexRow key, long maxBytesPerRowForAggregators) + { + return ROUGH_OVERHEAD_PER_MAP_ENTRY + key.estimateBytesInMemory() + maxBytesPerRowForAggregators; } @Override @@ -238,10 +300,24 @@ protected void concurrentRemove(int offset) @Override public boolean canAppendRow() { - final boolean canAdd = size() < maxRowCount; - if (!canAdd) { - outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); + final boolean countCheck = size() < maxRowCount; + // if maxBytesInMemory = -1, then ignore sizeCheck + final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory() < maxBytesInMemory; + final boolean canAdd = countCheck && sizeCheck; + if (!countCheck && !sizeCheck) { + outOfRowsReason = StringUtils.format( + "Maximum number of rows [%d] and maximum size in bytes [%d] reached", + maxRowCount, + maxBytesInMemory + ); + } else { + if (!countCheck) { + outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); + } else if (!sizeCheck) { + outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory); + } } + return canAdd; } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexRowSizeTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexRowSizeTest.java new file mode 100644 index 000000000000..9d77bbb7693b --- /dev/null +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexRowSizeTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.segment.incremental; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.data.input.MapBasedInputRow; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Map; + +/** + */ +public class IncrementalIndexRowSizeTest +{ + @Test + public void testIncrementalIndexRowSizeBasic() + { + IncrementalIndex index = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) + .setMaxRowCount(10000) + .setMaxBytesInMemory(1000) + .buildOnheap(); + long time = System.currentTimeMillis(); + IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(time, "billy", "A", "joe", "B")); + IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow(); + Assert.assertEquals(44, td1.estimateBytesInMemory()); + } + + @Test + public void testIncrementalIndexRowSizeArr() + { + IncrementalIndex index = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) + .setMaxRowCount(10000) + .setMaxBytesInMemory(1000) + .buildOnheap(); + long time = System.currentTimeMillis(); + IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow( + time + 1, + "billy", + "A", + "joe", + Arrays.asList("A", "B") + )); + IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow(); + Assert.assertEquals(50, td1.estimateBytesInMemory()); + } + + @Test + public void testIncrementalIndexRowSizeComplex() + { + IncrementalIndex index = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) + .setMaxRowCount(10000) + .setMaxBytesInMemory(1000) + .buildOnheap(); + long time = System.currentTimeMillis(); + IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow( + time + 1, + "billy", + "nelson", + "joe", + Arrays.asList("123", "abcdef") + )); + IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow(); + Assert.assertEquals(74, td1.estimateBytesInMemory()); + } + + private MapBasedInputRow toMapRow(long time, Object... dimAndVal) + { + Map data = Maps.newHashMap(); + for (int i = 0; i < dimAndVal.length; i += 2) { + data.put((String) dimAndVal[i], dimAndVal[i + 1]); + } + return new MapBasedInputRow(time, Lists.newArrayList(data.keySet()), data); + } +} diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 94734aafad59..43264b959243 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -76,6 +76,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Extending AbstractBenchmark means only runs if explicitly called @@ -118,7 +119,8 @@ public MapIncrementalIndex( boolean reportParseExceptions, boolean concurrentEventAdd, boolean sortFacts, - int maxRowCount + int maxRowCount, + long maxBytesInMemory ) { super( @@ -127,7 +129,8 @@ public MapIncrementalIndex( reportParseExceptions, concurrentEventAdd, sortFacts, - maxRowCount + maxRowCount, + maxBytesInMemory ); } @@ -135,20 +138,22 @@ public MapIncrementalIndex( long minTimestamp, Granularity gran, AggregatorFactory[] metrics, - int maxRowCount + int maxRowCount, + long maxBytesInMemory ) { super( new IncrementalIndexSchema.Builder() - .withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .build(), - true, - true, - false, - true, - maxRowCount + .withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + true, + true, + false, + true, + maxRowCount, + maxBytesInMemory ); } @@ -172,6 +177,7 @@ protected AddToFactsResult addToFacts( boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, + AtomicLong sizeInBytes, IncrementalIndexRow key, ThreadLocal rowContainer, Supplier rowSupplier, @@ -202,12 +208,14 @@ protected AddToFactsResult addToFacts( // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { - throw new IndexSizeExceededException("Maximum number of rows reached"); + if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= maxBytesInMemory) + && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { + throw new IndexSizeExceededException("Maximum number of rows or max bytes reached"); } final int prev = getFacts().putIfAbsent(key, rowIndex); if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { numEntries.incrementAndGet(); + sizeInBytes.incrementAndGet(); } else { // We lost a race aggs = indexedMap.get(prev); @@ -235,7 +243,7 @@ protected AddToFactsResult addToFacts( rowContainer.set(null); - return new AddToFactsResult(numEntries.get(), new ArrayList<>()); + return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), new ArrayList<>()); } @Override diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 179f3c7fc3fe..f3f3ae6a1d97 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -42,7 +42,7 @@ */ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int defaultMaxRowsInMemory = 75000; + private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final Period defaultWindowPeriod = new Period("PT10M"); private static final VersioningPolicy defaultVersioningPolicy = new IntervalStartVersioningPolicy(); @@ -72,6 +72,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File { return new RealtimeTuningConfig( defaultMaxRowsInMemory, + 0L, defaultIntermediatePersistPeriod, defaultWindowPeriod, basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, @@ -91,6 +92,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File } private final int maxRowsInMemory; + private final long maxBytesInMemory; private final Period intermediatePersistPeriod; private final Period windowPeriod; private final File basePersistDirectory; @@ -110,6 +112,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File @JsonCreator public RealtimeTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -129,6 +132,9 @@ public RealtimeTuningConfig( ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -163,6 +169,12 @@ public int getMaxRowsInMemory() return maxRowsInMemory; } + @Override + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @Override @JsonProperty public Period getIntermediatePersistPeriod() @@ -268,6 +280,7 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( maxRowsInMemory, + maxBytesInMemory, intermediatePersistPeriod, windowPeriod, basePersistDirectory, @@ -290,6 +303,7 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) { return new RealtimeTuningConfig( maxRowsInMemory, + maxBytesInMemory, intermediatePersistPeriod, windowPeriod, dir, diff --git a/server/src/main/java/io/druid/segment/indexing/TuningConfig.java b/server/src/main/java/io/druid/segment/indexing/TuningConfig.java index 7fd246d25732..e8cc02fc4503 100644 --- a/server/src/main/java/io/druid/segment/indexing/TuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/TuningConfig.java @@ -33,4 +33,9 @@ public interface TuningConfig boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false; int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE; int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0; + int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000; + // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only + // tracks active index and not the index being flushed to disk, to account for that + // we halved default to 1/6(max jvm memory) + long DEFAULT_MAX_BYTES_IN_MEMORY = Runtime.getRuntime().maxMemory() / 6; } diff --git a/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java b/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java new file mode 100644 index 000000000000..571005f4370c --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java @@ -0,0 +1,34 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.segment.indexing; + +public class TuningConfigs +{ + private TuningConfigs() + { + } + + public static long getMaxBytesInMemoryOrDefault(final long maxBytesInMemory) + { + // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting + // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on + // the actual task node's jvm memory. + return maxBytesInMemory == 0 ? TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY : maxBytesInMemory; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java index 286470cae761..05758b1645a7 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -32,6 +32,8 @@ public interface AppenderatorConfig int getMaxRowsInMemory(); + long getMaxBytesInMemory(); + int getMaxPendingPersists(); Period getIntermediatePersistPeriod(); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 8f63392de8af..cd2ecd9ac691 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -66,6 +66,7 @@ import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; @@ -84,6 +85,7 @@ import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; @@ -97,6 +99,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -128,6 +131,7 @@ public class AppenderatorImpl implements Appenderator // This variable updated in add(), persist(), and drop() private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); private final AtomicInteger totalRows = new AtomicInteger(); + private final AtomicLong bytesCurrentlyInMemory = new AtomicLong(); // Synchronize persisting commitMetadata so that multiple persist threads (if present) // and abandon threads do not step over each other private final Lock commitLock = new ReentrantLock(); @@ -143,7 +147,7 @@ public class AppenderatorImpl implements Appenderator private volatile FileChannel basePersistDirLockChannel = null; private AtomicBoolean closed = new AtomicBoolean(false); - public AppenderatorImpl( + AppenderatorImpl( DataSchema schema, AppenderatorConfig tuningConfig, FireDepartmentMetrics metrics, @@ -219,11 +223,14 @@ public AppenderatorAddResult add( metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch()); final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory(); final int sinkRowsInMemoryAfterAdd; + final long bytesInMemoryBeforeAdd = sink.getBytesInMemory(); + final long bytesInMemoryAfterAdd; final IncrementalIndexAddResult addResult; try { addResult = sink.add(row, !allowIncrementalPersists); sinkRowsInMemoryAfterAdd = addResult.getRowCount(); + bytesInMemoryAfterAdd = addResult.getBytesInMemory(); } catch (IndexSizeExceededException e) { // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we @@ -240,19 +247,51 @@ public AppenderatorAddResult add( final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd; rowsCurrentlyInMemory.addAndGet(numAddedRows); totalRows.addAndGet(numAddedRows); + bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd); boolean isPersistRequired = false; - if (!sink.canAppendRow() - || System.currentTimeMillis() > nextFlush - || rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { + boolean persist = false; + List persistReasons = new ArrayList(); + + if (!sink.canAppendRow()) { + persist = true; + persistReasons.add("No more rows can be appended to sink"); + } + if (System.currentTimeMillis() > nextFlush) { + persist = true; + persistReasons.add(StringUtils.format( + " current time[%d] is greater than nextFlush[%d],", + System.currentTimeMillis(), + nextFlush + )); + } + if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { + persist = true; + persistReasons.add(StringUtils.format( + " rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d],", + rowsCurrentlyInMemory.get(), + tuningConfig.getMaxRowsInMemory() + )); + } + if (tuningConfig.getMaxBytesInMemory() > 0 + && bytesCurrentlyInMemory.get() + >= TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) { + persist = true; + persistReasons.add(StringUtils.format( + " bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", + bytesCurrentlyInMemory.get(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()) + )); + } + if (persist) { if (allowIncrementalPersists) { // persistAll clears rowsCurrentlyInMemory, no need to update it. + log.info("Persisting rows in memory due to: [%s]", String.join(",", persistReasons)); persistAll(committerSupplier == null ? null : committerSupplier.get()); } else { isPersistRequired = true; } } - return new AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired, addResult.getParseException()); } @@ -286,6 +325,24 @@ int getRowsInMemory() return rowsCurrentlyInMemory.get(); } + @VisibleForTesting + long getBytesCurrentlyInMemory() + { + return bytesCurrentlyInMemory.get(); + } + + @VisibleForTesting + long getBytesInMemory(SegmentIdentifier identifier) + { + final Sink sink = sinks.get(identifier); + + if (sink == null) { + throw new ISE("No such sink: %s", identifier); + } else { + return sink.getBytesInMemory(); + } + } + private Sink getOrCreateSink(final SegmentIdentifier identifier) { Sink retVal = sinks.get(identifier); @@ -297,6 +354,7 @@ private Sink getOrCreateSink(final SegmentIdentifier identifier) identifier.getShardSpec(), identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); @@ -397,6 +455,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) final Map currentHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); int numPersistedRows = 0; + long bytesPersisted = 0L; for (SegmentIdentifier identifier : sinks.keySet()) { final Sink sink = sinks.get(identifier); if (sink == null) { @@ -405,6 +464,7 @@ public ListenableFuture persistAll(@Nullable final Committer committer) final List hydrants = Lists.newArrayList(sink); currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size()); numPersistedRows += sink.getNumRowsInMemory(); + bytesPersisted += sink.getBytesInMemory(); final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); @@ -495,7 +555,7 @@ public Object doCall() // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes. rowsCurrentlyInMemory.addAndGet(-numPersistedRows); - + bytesCurrentlyInMemory.addAndGet(-bytesPersisted); return future; } @@ -965,6 +1025,7 @@ public int compare(File o1, File o2) identifier.getShardSpec(), identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions(), hydrants ); @@ -1021,6 +1082,7 @@ private ListenableFuture abandonSegment( // Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks). rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); totalRows.addAndGet(-sink.getNumRows()); + bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory()); // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index a99a5dfd256a..0521a502f509 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -65,6 +65,7 @@ import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; @@ -255,6 +256,7 @@ private Sink getSink(long timestamp) config.getShardSpec(), versioningPolicy.getVersion(sinkInterval), config.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), config.isReportParseExceptions() ); addSink(retVal); @@ -729,6 +731,7 @@ public int compare(File o1, File o2) config.getShardSpec(), versioningPolicy.getVersion(sinkInterval), config.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), config.isReportParseExceptions(), hydrants ); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 92790d2f20ae..50d0cf2a1281 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -54,7 +54,7 @@ public class Sink implements Iterable { - private static final IncrementalIndexAddResult ADD_FAILED = new IncrementalIndexAddResult(-1, null); + private static final IncrementalIndexAddResult ADD_FAILED = new IncrementalIndexAddResult(-1, -1, null); private final Object hydrantLock = new Object(); private final Interval interval; @@ -62,6 +62,7 @@ public class Sink implements Iterable private final ShardSpec shardSpec; private final String version; private final int maxRowsInMemory; + private final long maxBytesInMemory; private final boolean reportParseExceptions; private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); private final LinkedHashSet dimOrder = Sets.newLinkedHashSet(); @@ -75,6 +76,7 @@ public Sink( ShardSpec shardSpec, String version, int maxRowsInMemory, + long maxBytesInMemory, boolean reportParseExceptions ) { @@ -83,6 +85,7 @@ public Sink( this.interval = interval; this.version = version; this.maxRowsInMemory = maxRowsInMemory; + this.maxBytesInMemory = maxBytesInMemory; this.reportParseExceptions = reportParseExceptions; makeNewCurrIndex(interval.getStartMillis(), schema); @@ -94,6 +97,7 @@ public Sink( ShardSpec shardSpec, String version, int maxRowsInMemory, + long maxBytesInMemory, boolean reportParseExceptions, List hydrants ) @@ -103,6 +107,7 @@ public Sink( this.interval = interval; this.version = version; this.maxRowsInMemory = maxRowsInMemory; + this.maxBytesInMemory = maxBytesInMemory; this.reportParseExceptions = reportParseExceptions; int maxCount = -1; @@ -250,6 +255,18 @@ public int getNumRowsInMemory() } } + public long getBytesInMemory() + { + synchronized (hydrantLock) { + IncrementalIndex index = currHydrant.getIndex(); + if (index == null) { + return 0; + } + + return currHydrant.getIndex().getBytesInMemory(); + } + } + private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) { final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() @@ -264,6 +281,7 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) .setIndexSchema(indexSchema) .setReportParseExceptions(reportParseExceptions) .setMaxRowCount(maxRowsInMemory) + .setMaxBytesInMemory(maxBytesInMemory) .buildOnheap(); final FireHydrant old; diff --git a/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java index 5f1820b1b6da..f3aa521c5b0c 100644 --- a/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -90,7 +90,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(NoneShardSpec.instance(), config.getShardSpec()); Assert.assertEquals(0, config.getMaxPendingPersists()); - Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(0, config.getMergeThreadPriority()); Assert.assertEquals(0, config.getPersistThreadPriority()); Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod()); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 0b5c420badc1..665ce3311faf 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -69,6 +69,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -199,6 +200,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseExcept ); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, @@ -221,6 +223,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseExcept tuningConfig.getShardSpec(), DateTimes.nowUtc().toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() )); @@ -241,6 +244,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseExcept tuningConfig.getShardSpec(), DateTimes.nowUtc().toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() )); @@ -258,6 +262,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseExcept tuningConfig_0 = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, @@ -277,6 +282,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseExcept tuningConfig_1 = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index d03bab533e2f..32dcb76b0c85 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -68,6 +68,7 @@ public AppenderatorPlumberTest() throws Exception null, null, null, + null, new IntervalStartVersioningPolicy(), new NoopRejectionPolicyFactory(), null, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java index cfd9106f9261..a052583ed885 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java @@ -145,6 +145,118 @@ public SegmentIdentifier apply(DataSegment input) } } + @Test + public void testMaxBytesInMemory() throws Exception + { + try (final AppenderatorTester tester = new AppenderatorTester(100, 1024, true)) { + final Appenderator appenderator = tester.getAppenderator(); + final AtomicInteger eventCount = new AtomicInteger(0); + final Supplier committerSupplier = () -> { + final Object metadata = ImmutableMap.of(eventCount, eventCount.get()); + + return new Committer() + { + @Override + public Object getMetadata() + { + return metadata; + } + + @Override + public void run() + { + //Do nothing + } + }; + }; + + appenderator.startJob(); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier); + //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 10 (dimsKeySize) = 138 + Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } + } + + @Test + public void testMaxBytesInMemoryInMultipleSinks() throws Exception + { + try (final AppenderatorTester tester = new AppenderatorTester(100, 1024, true)) { + final Appenderator appenderator = tester.getAppenderator(); + final AtomicInteger eventCount = new AtomicInteger(0); + final Supplier committerSupplier = () -> { + final Object metadata = ImmutableMap.of(eventCount, eventCount.get()); + + return new Committer() + { + @Override + public Object getMetadata() + { + return metadata; + } + + @Override + public void run() + { + //Do nothing + } + }; + }; + + appenderator.startJob(); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier); + //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 10 (dimsKeySize) = 138 + Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(276, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } + } + + @Test + public void testIgnoreMaxBytesInMemory() throws Exception + { + try (final AppenderatorTester tester = new AppenderatorTester(100, -1, true)) { + final Appenderator appenderator = tester.getAppenderator(); + final AtomicInteger eventCount = new AtomicInteger(0); + final Supplier committerSupplier = () -> { + final Object metadata = ImmutableMap.of(eventCount, eventCount.get()); + + return new Committer() + { + @Override + public Object getMetadata() + { + return metadata; + } + + @Override + public void run() + { + //Do nothing + } + }; + }; + + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.startJob(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier); + //we still calculate the size even when ignoring it to make persist decision + Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))); + Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(276, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); + Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } + } + @Test public void testMaxRowsInMemory() throws Exception { @@ -288,7 +400,12 @@ public void run() appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 5), committerSupplier); appenderator.close(); - try (final AppenderatorTester tester2 = new AppenderatorTester(2, tuningConfig.getBasePersistDirectory(), true)) { + try (final AppenderatorTester tester2 = new AppenderatorTester( + 2, + -1, + tuningConfig.getBasePersistDirectory(), + true + )) { final Appenderator appenderator2 = tester2.getAppenderator(); Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob()); Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments()); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 42462818fb82..e562e18f1911 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -89,7 +89,7 @@ public AppenderatorTester( final int maxRowsInMemory ) { - this(maxRowsInMemory, null, false); + this(maxRowsInMemory, -1, null, false); } public AppenderatorTester( @@ -97,11 +97,21 @@ public AppenderatorTester( final boolean enablePushFailure ) { - this(maxRowsInMemory, null, enablePushFailure); + this(maxRowsInMemory, -1, null, enablePushFailure); } public AppenderatorTester( final int maxRowsInMemory, + final long maxSizeInBytes, + final boolean enablePushFailure + ) + { + this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure); + } + + public AppenderatorTester( + final int maxRowsInMemory, + long maxSizeInBytes, final File basePersistDirectory, final boolean enablePushFailure ) @@ -131,9 +141,10 @@ public AppenderatorTester( null, objectMapper ); - + maxSizeInBytes = maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes; tuningConfig = new RealtimeTuningConfig( maxRowsInMemory, + maxSizeInBytes, null, null, basePersistDirectory, @@ -267,6 +278,11 @@ public void unannounceSegments(Iterable segments) ); } + private long getDefaultMaxBytesInMemory() + { + return (Runtime.getRuntime().totalMemory()) / 3; + } + public DataSchema getSchema() { return schema; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index 08157eaf7f61..b837bf5ee453 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -136,6 +136,7 @@ public int columnCacheSizeBytes() 75000, null, null, + null, temporaryFolder.newFolder(), null, null, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 32900d8900fc..10e5bd1085a2 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -50,6 +50,7 @@ import io.druid.segment.TestHelper; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -199,6 +200,7 @@ public void setUp() throws Exception null, null, null, + null, new IntervalStartVersioningPolicy(), rejectionPolicy, null, @@ -269,6 +271,7 @@ private void testPersist(final Object commitMetadata) throws Exception tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); plumber.getSinks().put(0L, sink); @@ -313,6 +316,7 @@ public void testPersistFails() throws Exception tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); plumber.getSinks().put(0L, sink); @@ -367,6 +371,7 @@ private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Ex tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); plumber2.getSinks().put(0L, sink); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index dce3e7a89780..a8bf1be421e2 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -31,6 +31,7 @@ import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireHydrant; import org.joda.time.DateTime; @@ -61,6 +62,7 @@ public void testSwap() throws Exception final String version = DateTimes.nowUtc().toString(); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( 100, + null, new Period("P1Y"), null, null, @@ -83,6 +85,7 @@ public void testSwap() throws Exception tuningConfig.getShardSpec(), version, tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index 49518d6483dd..9417e9412904 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -169,6 +169,7 @@ public Plumber findPlumber( new RealtimeTuningConfig( 1, + null, new Period("PT10M"), null, null,