diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index dd385130c761..5388388498f2 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -181,6 +181,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List baseSegments = Collections.singletonList( + new DataSegment( + "base", + Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), + "2015-01-03", + ImmutableMap.of(), + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("m1"), + new HashBasedNumberedShardSpec(0, 1, 0, 1, null, null, null), + 9, + 1024 + ) + ); + + HadoopIndexTask task = spec.createTask( + Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), + "2015-01-03", + baseSegments + ); + + Assert.assertNotNull(task); + } + @Test public void testSuspendedDoesntRun() { diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml index d192b6876d2f..7cc1ebf65bf0 100644 --- a/extensions-core/kafka-indexing-service/pom.xml +++ b/extensions-core/kafka-indexing-service/pom.xml @@ -203,6 +203,11 @@ assertj-core test + + nl.jqno.equalsverifier + equalsverifier + test + diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index f23ee7b0fced..8dc9a2f06617 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; @@ -33,6 +34,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon { @JsonCreator public KafkaIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @@ -55,6 +57,7 @@ public KafkaIndexTaskTuningConfig( ) { super( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, @@ -81,6 +84,7 @@ public KafkaIndexTaskTuningConfig( public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) { return new KafkaIndexTaskTuningConfig( + getAppendableIndexSpec(), getMaxRowsInMemory(), getMaxBytesInMemory(), getMaxRowsPerSegment(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 2dca7f6edfc5..a9e0bfeeb92e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -23,7 +23,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; import org.apache.druid.segment.IndexSpec; -import org.apache.druid.segment.indexing.TuningConfigs; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Duration; import org.joda.time.Period; @@ -67,11 +67,13 @@ public static KafkaSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } public KafkaSupervisorTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @@ -100,6 +102,7 @@ public KafkaSupervisorTuningConfig( ) { super( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, @@ -193,7 +196,7 @@ public String toString() "maxRowsInMemory=" + getMaxRowsInMemory() + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + ", maxTotalRows=" + getMaxTotalRows() + - ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + + ", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + ", basePersistDirectory=" + getBasePersistDirectory() + ", maxPendingPersists=" + getMaxPendingPersists() + @@ -219,6 +222,7 @@ public String toString() public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() { return new KafkaIndexTaskTuningConfig( + getAppendableIndexSpec(), getMaxRowsInMemory(), getMaxBytesInMemory(), getMaxRowsPerSegment(), diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index f4098fc95a22..9b5373522803 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2497,6 +2497,7 @@ private KafkaIndexTask createTask( ) throws JsonProcessingException { final KafkaIndexTaskTuningConfig tuningConfig = new KafkaIndexTaskTuningConfig( + null, 1000, null, maxRowsPerSegment, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 3a930dbeeadd..8888a69a0d89 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -21,11 +21,13 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Period; import org.junit.Assert; @@ -60,6 +62,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertNotNull(config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); Assert.assertNull(config.getMaxTotalRows()); @@ -85,7 +88,8 @@ public void testSerdeWithNonDefaults() throws Exception + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" - + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + "}"; KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue( @@ -99,6 +103,7 @@ public void testSerdeWithNonDefaults() throws Exception ); Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(100, config.getMaxRowsInMemory()); Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue()); Assert.assertNotEquals(null, config.getMaxTotalRows()); @@ -115,6 +120,7 @@ public void testSerdeWithNonDefaults() throws Exception public void testConvert() { KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig( + null, 1, null, 2, @@ -142,6 +148,7 @@ public void testConvert() ); KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) original.convertToTaskTuningConfig(); + Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec()); Assert.assertEquals(1, copy.getMaxRowsInMemory()); Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue()); Assert.assertNotEquals(null, copy.getMaxTotalRows()); @@ -158,6 +165,7 @@ public void testConvert() public void testSerdeWithModifiedTuningConfigAddedField() throws IOException { KafkaIndexTaskTuningConfig base = new KafkaIndexTaskTuningConfig( + null, 1, null, 2, @@ -183,6 +191,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException mapper.readValue(serialized, TestModifiedKafkaIndexTaskTuningConfig.class); Assert.assertEquals(null, deserialized.getExtra()); + Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec()); Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory()); Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory()); Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment()); @@ -206,6 +215,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException { TestModifiedKafkaIndexTaskTuningConfig base = new TestModifiedKafkaIndexTaskTuningConfig( + null, 1, null, 2, @@ -231,6 +241,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException KafkaIndexTaskTuningConfig deserialized = mapper.readValue(serialized, KafkaIndexTaskTuningConfig.class); + Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec()); Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory()); Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory()); Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment()); @@ -249,4 +260,12 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(KafkaIndexTaskTuningConfig.class) + .usingGetClass() + .verify(); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 905abd19e2ac..251d8376a690 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -270,6 +270,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, null, + null, null ), null @@ -3070,6 +3071,7 @@ public void testIsTaskCurrent() kafkaHost, dataSchema, new KafkaSupervisorTuningConfig( + null, 1000, null, 50000, @@ -3109,6 +3111,7 @@ public void testIsTaskCurrent() DataSchema modifiedDataSchema = getDataSchema("some other datasource"); KafkaSupervisorTuningConfig modifiedTuningConfig = new KafkaSupervisorTuningConfig( + null, 42, // This is different null, 50000, @@ -3404,6 +3407,7 @@ public KafkaIndexTaskClient build( }; final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig( + null, 1000, null, 50000, @@ -3514,6 +3518,7 @@ public KafkaIndexTaskClient build( }; final KafkaSupervisorTuningConfig tuningConfig = new KafkaSupervisorTuningConfig( + null, 1000, null, 50000, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 5859f9035b06..7151eb0681fb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -25,6 +25,7 @@ import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Duration; import org.joda.time.Period; @@ -59,6 +60,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertNotNull(config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); @@ -94,7 +96,8 @@ public void testSerdeWithNonDefaults() throws Exception + " \"shutdownTimeout\": \"PT95S\",\n" + " \"offsetFetchPeriod\": \"PT20S\",\n" + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" - + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + "}"; KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue( @@ -108,6 +111,7 @@ public void testSerdeWithNonDefaults() throws Exception ); Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(100, config.getMaxRowsInMemory()); Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index 27e69e8e7e00..06550e159234 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; @@ -37,6 +38,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning @JsonCreator public TestModifiedKafkaIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @@ -60,6 +62,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( ) { super( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, diff --git a/extensions-core/kinesis-indexing-service/pom.xml b/extensions-core/kinesis-indexing-service/pom.xml index 6b08d50ea62d..0659bedc0f13 100644 --- a/extensions-core/kinesis-indexing-service/pom.xml +++ b/extensions-core/kinesis-indexing-service/pom.xml @@ -192,6 +192,11 @@ system-rules test + + nl.jqno.equalsverifier + equalsverifier + test + diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index f033a6ddf5f8..428f54faefec 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; @@ -50,6 +51,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC @JsonCreator public KinesisIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @@ -78,6 +80,7 @@ public KinesisIndexTaskTuningConfig( ) { super( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, @@ -154,6 +157,7 @@ public int getMaxRecordsPerPoll() public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir) { return new KinesisIndexTaskTuningConfig( + getAppendableIndexSpec(), getMaxRowsInMemory(), getMaxBytesInMemory(), getMaxRowsPerSegment(), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 8c1a5fa16701..7cf49a32da8b 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -23,6 +23,7 @@ import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Duration; import org.joda.time.Period; @@ -75,11 +76,13 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } public KinesisSupervisorTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @@ -115,6 +118,7 @@ public KinesisSupervisorTuningConfig( ) { super( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, @@ -248,6 +252,7 @@ public String toString() public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() { return new KinesisIndexTaskTuningConfig( + getAppendableIndexSpec(), getMaxRowsInMemory(), getMaxBytesInMemory(), getMaxRowsPerSegment(), diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 9ab058c3941e..f6b3582f96b1 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -72,6 +72,7 @@ public class KinesisIndexTaskSerdeTest null, null, null, + null, null ); private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 4aff50b40ba4..8ff5b43ba9b3 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2739,6 +2739,7 @@ private KinesisIndexTask createTask( boolean resetOffsetAutomatically = false; int maxRowsInMemory = 1000; final KinesisIndexTaskTuningConfig tuningConfig = new KinesisIndexTaskTuningConfig( + null, maxRowsInMemory, null, maxRowsPerSegment, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 15e56c032dff..69a70a494f16 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -22,10 +22,12 @@ import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig; import org.apache.druid.indexing.kinesis.test.TestModifiedKinesisIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.indexing.TuningConfig; import org.hamcrest.CoreMatchers; import org.joda.time.Period; @@ -66,6 +68,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertNotNull(config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); @@ -102,7 +105,8 @@ public void testSerdeWithNonDefaults() throws Exception + " \"fetchSequenceNumberTimeout\": 6000,\n" + " \"resetOffsetAutomatically\": false,\n" + " \"skipSequenceNumberAvailabilityCheck\": true,\n" - + " \"fetchThreads\": 2\n" + + " \"fetchThreads\": 2,\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + "}"; KinesisIndexTaskTuningConfig config = (KinesisIndexTaskTuningConfig) mapper.readValue( @@ -116,6 +120,7 @@ public void testSerdeWithNonDefaults() throws Exception ); Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(100, config.getMaxRowsInMemory()); Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); @@ -136,6 +141,7 @@ public void testSerdeWithNonDefaults() throws Exception public void testSerdeWithModifiedTuningConfigAddedField() throws IOException { KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig( + null, 1, 3L, 2, @@ -168,6 +174,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException mapper.readValue(serialized, TestModifiedKinesisIndexTaskTuningConfig.class); Assert.assertEquals(null, deserialized.getExtra()); + Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec()); Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory()); Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory()); Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment()); @@ -195,6 +202,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException { KinesisIndexTaskTuningConfig base = new KinesisIndexTaskTuningConfig( + null, 1, 3L, 2, @@ -226,6 +234,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException KinesisIndexTaskTuningConfig deserialized = mapper.readValue(serialized, KinesisIndexTaskTuningConfig.class); + Assert.assertEquals(base.getAppendableIndexSpec(), deserialized.getAppendableIndexSpec()); Assert.assertEquals(base.getMaxRowsInMemory(), deserialized.getMaxRowsInMemory()); Assert.assertEquals(base.getMaxBytesInMemory(), deserialized.getMaxBytesInMemory()); Assert.assertEquals(base.getMaxRowsPerSegment(), deserialized.getMaxRowsPerSegment()); @@ -282,6 +291,7 @@ public void testResetOffsetAndSkipSequenceNotBothTrue() throws Exception public void testConvert() { KinesisSupervisorTuningConfig original = new KinesisSupervisorTuningConfig( + null, 1, (long) 3, 2, @@ -317,6 +327,7 @@ public void testConvert() ); KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig(); + Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec()); Assert.assertEquals(1, copy.getMaxRowsInMemory()); Assert.assertEquals(3, copy.getMaxBytesInMemory()); Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue()); @@ -338,4 +349,12 @@ public void testConvert() Assert.assertEquals(100, copy.getMaxRecordsPerPoll()); Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod()); } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(KinesisIndexTaskTuningConfig.class) + .usingGetClass() + .verify(); + } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 135166250d99..3d57a6bbe1b2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -167,6 +167,7 @@ public void setupTest() supervisorRecordSupplier = createMock(KinesisRecordSupplier.class); tuningConfig = new KinesisSupervisorTuningConfig( + null, 1000, null, 50000, @@ -3689,6 +3690,7 @@ public void testIsTaskCurrent() DataSchema modifiedDataSchema = getDataSchema("some other datasource"); KinesisSupervisorTuningConfig modifiedTuningConfig = new KinesisSupervisorTuningConfig( + null, 1000, null, 50000, @@ -4741,6 +4743,7 @@ public KinesisIndexTaskClient build( }; final KinesisSupervisorTuningConfig tuningConfig = new KinesisSupervisorTuningConfig( + null, 1000, null, 50000, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java index d50321043ce8..ff3edc46e429 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java @@ -24,6 +24,7 @@ import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Duration; import org.joda.time.Period; @@ -58,6 +59,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertNotNull(config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); @@ -92,7 +94,8 @@ public void testSerdeWithNonDefaults() throws Exception + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" + " \"shutdownTimeout\": \"PT95S\",\n" - + " \"repartitionTransitionDuration\": \"PT500S\"\n" + + " \"repartitionTransitionDuration\": \"PT500S\",\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + "}"; KinesisSupervisorTuningConfig config = (KinesisSupervisorTuningConfig) mapper.readValue( @@ -106,6 +109,7 @@ public void testSerdeWithNonDefaults() throws Exception ); Assert.assertEquals(new File("/tmp/xxx"), config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(100, config.getMaxRowsInMemory()); Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index e45168db59a7..5b2e2bd2503c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.druid.indexing.kinesis.KinesisIndexTaskTuningConfig; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; @@ -37,6 +38,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu @JsonCreator public TestModifiedKinesisIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @@ -66,6 +68,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( ) { super( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, maxRowsPerSegment, @@ -98,6 +101,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig base, String extra) { super( + base.getAppendableIndexSpec(), base.getMaxRowsInMemory(), base.getMaxBytesInMemory(), base.getMaxRowsPerSegment(), diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index 2b80d28f101a..f1a8cc8adca0 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -29,6 +29,7 @@ import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.indexing.TuningConfig; import javax.annotation.Nullable; @@ -56,6 +57,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_SHARD_SPECS, DEFAULT_INDEX_SPEC, DEFAULT_INDEX_SPEC, + DEFAULT_APPENDABLE_INDEX, DEFAULT_ROW_FLUSH_BOUNDARY, 0L, false, @@ -83,6 +85,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final Map> shardSpecs; private final IndexSpec indexSpec; private final IndexSpec indexSpecForIntermediatePersists; + private final AppendableIndexSpec appendableIndexSpec; private final int rowFlushBoundary; private final long maxBytesInMemory; private final boolean leaveIntermediate; @@ -108,6 +111,7 @@ public HadoopTuningConfig( final @JsonProperty("shardSpecs") @Nullable Map> shardSpecs, final @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, + final @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, final @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, final @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, @@ -140,8 +144,9 @@ public HadoopTuningConfig( this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemoryCOMPAT : maxRowsInMemory; + this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; // initializing this to 0, it will be lazily initialized to a value - // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + // @see #getMaxBytesInMemoryOrDefault() this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure; @@ -211,6 +216,13 @@ public IndexSpec getIndexSpecForIntermediatePersists() return indexSpecForIntermediatePersists; } + @JsonProperty + @Override + public AppendableIndexSpec getAppendableIndexSpec() + { + return appendableIndexSpec; + } + @JsonProperty("maxRowsInMemory") public int getRowFlushBoundary() { @@ -218,6 +230,7 @@ public int getRowFlushBoundary() } @JsonProperty + @Override public long getMaxBytesInMemory() { return maxBytesInMemory; @@ -327,6 +340,7 @@ public HadoopTuningConfig withWorkingPath(String path) shardSpecs, indexSpec, indexSpecForIntermediatePersists, + appendableIndexSpec, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -357,6 +371,7 @@ public HadoopTuningConfig withVersion(String ver) shardSpecs, indexSpec, indexSpecForIntermediatePersists, + appendableIndexSpec, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -387,6 +402,7 @@ public HadoopTuningConfig withShardSpecs(Map> specs specs, indexSpec, indexSpecForIntermediatePersists, + appendableIndexSpec, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 91982fd036fd..abc0b3bc96f5 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -50,7 +50,6 @@ import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; -import org.apache.druid.segment.indexing.TuningConfigs; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; @@ -302,11 +301,12 @@ private static IncrementalIndex makeIncrementalIndex( .withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup()) .build(); - IncrementalIndex newIndex = new IncrementalIndex.Builder() + // Build the incremental-index according to the spec that was chosen by the user + IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder() .setIndexSchema(indexSchema) .setMaxRowCount(tuningConfig.getRowFlushBoundary()) - .setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) - .buildOnheap(); + .setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault()) + .build(); if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) { newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 4b8d72e78847..644ae240262d 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -465,6 +465,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java index 93eda9434246..a127d519cc2b 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -219,6 +219,7 @@ public DetermineHashedPartitionsJobTest( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index 6906eaf5611a..047c1049c0ff 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -328,6 +328,7 @@ public DeterminePartitionsJobTest( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java index c9937e0d98da..9eabd4187a32 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -241,6 +241,7 @@ HadoopIngestionSpec build() null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java index c1a7e954897d..fce828b64be1 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.junit.Assert; import org.junit.Test; @@ -44,6 +45,7 @@ public void testSerde() throws Exception null, null, null, + null, 100, null, true, @@ -68,6 +70,7 @@ public void testSerde() throws Exception Assert.assertEquals("/tmp/workingpath", actual.getWorkingPath()); Assert.assertEquals("version", actual.getVersion()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), actual.getAppendableIndexSpec()); Assert.assertNotNull(actual.getPartitionsSpec()); Assert.assertEquals(ImmutableMap.>of(), actual.getShardSpecs()); Assert.assertEquals(new IndexSpec(), actual.getIndexSpec()); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index cd93236f1216..8d59554909b6 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -529,6 +529,7 @@ public void setUp() throws Exception null, null, null, + null, maxRowsInMemory, maxBytesInMemory, true, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java index 1cdb9b200281..3188762bc0b0 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java @@ -165,6 +165,7 @@ public void setup() throws Exception null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java index 4da9ee41b77d..68505be91cf1 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java @@ -63,6 +63,7 @@ public class GranularityPathSpecTest null, null, null, + null, false, false, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index eec9b98d4973..1538e1581929 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; @@ -37,9 +38,8 @@ import java.io.File; @JsonTypeName("realtime_appenderator") -public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig +public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig { - private static final int DEFAULT_MAX_ROWS_IN_MEMORY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M"); private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 1); @@ -53,6 +53,7 @@ private static File createNewBasePersistDirectory() return FileUtils.createTempDir("druid-realtime-persist"); } + private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; private final long maxBytesInMemory; private final DynamicPartitionsSpec partitionsSpec; @@ -74,6 +75,7 @@ private static File createNewBasePersistDirectory() @JsonCreator public RealtimeAppenderatorTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @@ -93,9 +95,10 @@ public RealtimeAppenderatorTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions ) { + this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; - // initializing this to 0, it will be lazily intialized to a value - // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + // initializing this to 0, it will be lazily initialized to a value + // @see #getMaxBytesInMemoryOrDefault() this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); this.intermediatePersistPeriod = intermediatePersistPeriod == null @@ -135,6 +138,13 @@ public RealtimeAppenderatorTuningConfig( : logParseExceptions; } + @Override + @JsonProperty + public AppendableIndexSpec getAppendableIndexSpec() + { + return appendableIndexSpec; + } + @Override @JsonProperty public int getMaxRowsInMemory() @@ -260,6 +270,7 @@ public int getMaxSavedParseExceptions() public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) { return new RealtimeAppenderatorTuningConfig( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, partitionsSpec.getMaxRowsPerSegment(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index ba2502f197af..afaf0cfb01e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -219,6 +219,7 @@ static ParallelIndexTuningConfig getTuningConfig(TuningConfig tuningConfig) return new ParallelIndexTuningConfig( null, indexTuningConfig.getMaxRowsPerSegment(), + indexTuningConfig.getAppendableIndexSpec(), indexTuningConfig.getMaxRowsPerSegment(), indexTuningConfig.getMaxBytesInMemory(), indexTuningConfig.getMaxTotalRows(), diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 16e03f199624..f351a5db6c7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -74,6 +74,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.BatchIOConfig; @@ -1110,7 +1111,7 @@ public boolean isAppendToExisting() } } - public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig + public static class IndexTuningConfig implements AppenderatorConfig { private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; @@ -1118,6 +1119,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; private static final long DEFAULT_PUSH_TIMEOUT = 0; + private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; private final long maxBytesInMemory; @@ -1189,6 +1191,7 @@ private static PartitionsSpec getPartitionsSpec( public IndexTuningConfig( @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment, + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows, @@ -1211,6 +1214,7 @@ public IndexTuningConfig( ) { this( + appendableIndexSpec, maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, maxBytesInMemory != null ? maxBytesInMemory : 0, getPartitionsSpec( @@ -1242,10 +1246,11 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( + @Nullable AppendableIndexSpec appendableIndexSpec, @Nullable Integer maxRowsInMemory, @Nullable Long maxBytesInMemory, @Nullable PartitionsSpec partitionsSpec, @@ -1262,9 +1267,10 @@ private IndexTuningConfig( @Nullable Integer maxSavedParseExceptions ) { + this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; // initializing this to 0, it will be lazily initialized to a value - // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + // @see #getMaxBytesInMemoryOrDefault() this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.partitionsSpec = partitionsSpec; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; @@ -1300,6 +1306,7 @@ private IndexTuningConfig( public IndexTuningConfig withBasePersistDirectory(File dir) { return new IndexTuningConfig( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, partitionsSpec, @@ -1320,6 +1327,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) { return new IndexTuningConfig( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, partitionsSpec, @@ -1337,6 +1345,13 @@ public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) ); } + @JsonProperty + @Override + public AppendableIndexSpec getAppendableIndexSpec() + { + return appendableIndexSpec; + } + @JsonProperty @Override public int getMaxRowsInMemory() @@ -1514,7 +1529,8 @@ public boolean equals(Object o) return false; } IndexTuningConfig that = (IndexTuningConfig) o; - return maxRowsInMemory == that.maxRowsInMemory && + return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) && + maxRowsInMemory == that.maxRowsInMemory && maxBytesInMemory == that.maxBytesInMemory && maxPendingPersists == that.maxPendingPersists && forceGuaranteedRollup == that.forceGuaranteedRollup && @@ -1534,6 +1550,7 @@ public boolean equals(Object o) public int hashCode() { return Objects.hash( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, partitionsSpec, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 4a218a0b26e0..02df185835cd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -924,6 +924,7 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC return new IndexTuningConfig( null, null, + tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), tuningConfig.getMaxBytesInMemory(), null, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 935eeb48c21e..ca7fdf13ffed 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -28,6 +28,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.java.util.common.IAE; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Duration; import org.joda.time.Period; @@ -97,6 +98,7 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -105,6 +107,7 @@ public static ParallelIndexTuningConfig defaultConfig() public ParallelIndexTuningConfig( @JsonProperty("targetPartitionSize") @Deprecated @Nullable Integer targetPartitionSize, @JsonProperty("maxRowsPerSegment") @Deprecated @Nullable Integer maxRowsPerSegment, + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows, @@ -134,6 +137,7 @@ public ParallelIndexTuningConfig( super( targetPartitionSize, maxRowsPerSegment, + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, maxTotalRows, @@ -248,6 +252,7 @@ public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpe return new ParallelIndexTuningConfig( null, null, + getAppendableIndexSpec(), getMaxRowsInMemory(), getMaxBytesInMemory(), null, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index c192703e2d62..b56d8e5f2ed6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; @@ -32,11 +33,12 @@ import java.io.File; import java.util.Objects; -public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfig, AppenderatorConfig +public abstract class SeekableStreamIndexTaskTuningConfig implements AppenderatorConfig { private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; private static final boolean DEFAULT_SKIP_SEQUENCE_NUMBER_AVAILABILITY_CHECK = false; + private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; private final long maxBytesInMemory; private final DynamicPartitionsSpec partitionsSpec; @@ -59,6 +61,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi private final int maxSavedParseExceptions; public SeekableStreamIndexTaskTuningConfig( + @Nullable AppendableIndexSpec appendableIndexSpec, @Nullable Integer maxRowsInMemory, @Nullable Long maxBytesInMemory, @Nullable Integer maxRowsPerSegment, @@ -84,10 +87,11 @@ public SeekableStreamIndexTaskTuningConfig( // Cannot be a static because default basePersistDirectory is unique per-instance final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory); + this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows); // initializing this to 0, it will be lazily initialized to a value - // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + // @see #getMaxBytesInMemoryOrDefault() this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaults.getIntermediatePersistPeriod() @@ -130,6 +134,13 @@ public SeekableStreamIndexTaskTuningConfig( : logParseExceptions; } + @Override + @JsonProperty + public AppendableIndexSpec getAppendableIndexSpec() + { + return appendableIndexSpec; + } + @Override @JsonProperty public int getMaxRowsInMemory() @@ -281,7 +292,8 @@ public boolean equals(Object o) return false; } SeekableStreamIndexTaskTuningConfig that = (SeekableStreamIndexTaskTuningConfig) o; - return maxRowsInMemory == that.maxRowsInMemory && + return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) && + maxRowsInMemory == that.maxRowsInMemory && maxBytesInMemory == that.maxBytesInMemory && maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && @@ -304,6 +316,7 @@ public boolean equals(Object o) public int hashCode() { return Objects.hash( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, partitionsSpec, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index a9d82b169a20..9a8ae20c1ad8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1394,6 +1394,7 @@ private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( null ); RealtimeAppenderatorTuningConfig tuningConfig = new RealtimeAppenderatorTuningConfig( + null, 1000, null, maxRowsPerSegment, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 0692c606b639..0daffc66bbe8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -202,6 +202,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException .inputSpec(new CompactionIntervalSpec(Intervals.of("2019/2020"), "testSha256OfSortedSegmentIds")) .tuningConfig( new ParallelIndexTuningConfig( + null, null, null, 40000, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 08cd765a1fb1..1fce094d110f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -279,6 +279,7 @@ public void testRunWithHashPartitioning() throws Exception null, null, null, + null, new HashedPartitionsSpec(null, 3, null), null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c82f1e7ece37..14c0840be4bd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -305,6 +305,7 @@ private static ParallelIndexTuningConfig createTuningConfig() return new ParallelIndexTuningConfig( null, null, // null to compute maxRowsPerSegment automatically + null, 500000, 1000000L, null, @@ -438,6 +439,7 @@ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws new IndexTuningConfig( null, null, // null to compute maxRowsPerSegment automatically + null, 500000, 1000000L, null, @@ -583,6 +585,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( 100000, null, + null, 500000, 1000000L, null, @@ -648,6 +651,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, SegmentLoadingException { final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, null, null, 500000, @@ -715,6 +719,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm public void testCreateIngestionSchemaWithNumShards() throws IOException, SegmentLoadingException { final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, null, null, 500000, @@ -1103,6 +1108,7 @@ private void assertIngestionSchema( expectedMetricsSpec, expectedSegmentIntervals, new ParallelIndexTuningConfig( + null, null, null, 500000, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java index 23f5748a39a9..bf2e44428f29 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java @@ -57,6 +57,7 @@ public static void setup() public void testSerdeTuningConfigWithDynamicPartitionsSpec() throws IOException { final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, null, null, 100, @@ -90,6 +91,7 @@ public void testSerdeTuningConfigWithDynamicPartitionsSpec() throws IOException public void testSerdeTuningConfigWithHashedPartitionsSpec() throws IOException { final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, null, null, 100, @@ -125,6 +127,7 @@ public void testSerdeTuningConfigWithDeprecatedDynamicPartitionsSpec() throws IO final IndexTuningConfig tuningConfig = new IndexTuningConfig( null, 1000, + null, 100, 2000L, 3000L, @@ -156,6 +159,7 @@ public void testSerdeTuningConfigWithDeprecatedDynamicPartitionsSpec() throws IO public void testSerdeTuningConfigWithDeprecatedHashedPartitionsSpec() throws IOException { final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, null, null, 100, @@ -191,6 +195,7 @@ public void testForceGuaranteedRollupWithDynamicPartitionsSpec() expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("DynamicPartitionsSpec cannot be used for perfect rollup"); final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, null, null, 100, @@ -225,6 +230,7 @@ public void testBestEffortRollupWithHashedPartitionsSpec() expectedException.expect(IllegalArgumentException.class); expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup"); final IndexTuningConfig tuningConfig = new IndexTuningConfig( + null, null, null, 100, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index fdd30962ea0d..0d4810acb47c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import com.google.common.io.Files; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.impl.CSVParseSpec; import org.apache.druid.data.input.impl.CsvInputFormat; @@ -1122,6 +1123,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, null, null, + null, new HashedPartitionsSpec(2, null, null), INDEX_SPEC, null, @@ -1250,6 +1252,7 @@ public void testMultipleParseExceptionsFailure() throws Exception null, null, null, + null, new DynamicPartitionsSpec(2, null), INDEX_SPEC, null, @@ -1370,6 +1373,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc null, null, null, + null, new HashedPartitionsSpec(2, null, null), INDEX_SPEC, null, @@ -1808,6 +1812,7 @@ static IndexTuningConfig createTuningConfig( return new IndexTuningConfig( null, maxRowsPerSegment, + null, maxRowsInMemory, maxBytesInMemory, maxTotalRows, @@ -1998,4 +2003,12 @@ private static IndexIngestionSpec createIngestionSpec( ); } } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(IndexTuningConfig.class) + .usingGetClass() + .verify(); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index b87dcf7e63f5..2038441acfa7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -830,6 +830,7 @@ private RealtimeIndexTask makeRealtimeTask( null ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( + null, 1000, null, new Period("P1Y"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 9dfbd4e66f66..aa3d33bf2506 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -247,6 +247,7 @@ public void testIndexTaskSerde() throws Exception ), new IndexIOConfig(null, new LocalInputSource(new File("lol"), "rofl"), new NoopInputFormat(), true), new IndexTuningConfig( + null, null, null, 10, @@ -327,6 +328,7 @@ public void testIndexTaskwithResourceSerde() throws Exception ), new IndexIOConfig(null, new LocalInputSource(new File("lol"), "rofl"), new NoopInputFormat(), true), new IndexTuningConfig( + null, null, null, 10, @@ -396,6 +398,7 @@ public void testRealtimeIndexTaskSerde() throws Exception ), new RealtimeTuningConfig( + null, 1, 10L, new Period("PT10M"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index af221602aa84..ae3f92ceb441 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -157,6 +157,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase null, null, null, + null, 2, null, null, @@ -227,6 +228,7 @@ protected ParallelIndexTuningConfig newTuningConfig( null, null, null, + null, new MaxSizeSplitHintSpec(null, 1), partitionsSpec, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 37fb265b647d..fc193b0e1321 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -178,6 +178,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, numTotalSubTasks, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 0a30bf727070..be6b0a86737e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -427,6 +427,7 @@ private TestSupervisorTask newTask( null, null, null, + null, NUM_SUB_TASKS, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index e6b144fc0217..a80e3a8452b7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -271,6 +271,7 @@ ParallelIndexIngestionSpec build() null, null, null, + null, partitionsSpec, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index ef1db8a31468..b043f85ef5a6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -193,6 +193,7 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA appendToExisting ); final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, null, null, 10, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index 931fd24aeb13..c2f712320b91 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -158,6 +158,7 @@ ParallelIndexTuningConfig build() return new ParallelIndexTuningConfig( 1, null, + null, 3, 4L, 5L, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index 101c97473485..a826ae9cd42d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; @@ -67,6 +68,7 @@ public void testSerdeWithMaxRowsPerSegment() throws IOException { final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, null, null, 10, @@ -109,6 +111,7 @@ public void testSerdeWithMaxNumConcurrentSubTasks() throws IOException { final int maxNumConcurrentSubTasks = 250; final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, null, null, 10, @@ -151,6 +154,7 @@ public void testSerdeWithMaxNumSubTasks() throws IOException { final int maxNumSubTasks = 250; final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, null, null, 10, @@ -195,6 +199,7 @@ public void testSerdeWithMaxNumSubTasksAndMaxNumConcurrentSubTasks() expectedException.expectMessage("Can't use both maxNumSubTasks and maxNumConcurrentSubTasks"); final int maxNumSubTasks = 250; final ParallelIndexTuningConfig tuningConfig = new ParallelIndexTuningConfig( + null, null, null, 10, @@ -236,6 +241,7 @@ public void testConstructorWithHashedPartitionsSpecAndNonForceGuaranteedRollupFa expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup"); final boolean forceGuaranteedRollup = false; new ParallelIndexTuningConfig( + null, null, null, 10, @@ -277,6 +283,7 @@ public void testConstructorWithSingleDimensionPartitionsSpecAndNonForceGuarantee expectedException.expectMessage("DynamicPartitionsSpec must be used for best-effort rollup"); final boolean forceGuaranteedRollup = false; new ParallelIndexTuningConfig( + null, null, null, 10, @@ -318,6 +325,7 @@ public void testConstructorWithDynamicPartitionsSpecAndForceGuaranteedRollupFail expectedException.expectMessage("cannot be used for perfect rollup"); final boolean forceGuaranteedRollup = true; new ParallelIndexTuningConfig( + null, null, null, 10, @@ -351,4 +359,12 @@ public void testConstructorWithDynamicPartitionsSpecAndForceGuaranteedRollupFail null ); } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(ParallelIndexTuningConfig.class) + .usingGetClass() + .verify(); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index b7f107d07255..e444f8091839 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -352,6 +352,7 @@ public void testWith1MaxNumConcurrentSubTasks() null, null, null, + null, 1, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 99ab9a480e9e..48f75d7a82c3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -746,6 +746,7 @@ public void testIndexTask() throws Exception new IndexTuningConfig( null, 10000, + null, 10, null, null, @@ -826,6 +827,7 @@ public void testIndexTaskFailure() throws Exception new IndexTuningConfig( null, 10000, + null, 10, null, null, @@ -1251,6 +1253,7 @@ public void testResumeTasks() throws Exception new IndexTuningConfig( null, 10000, + null, 10, null, null, @@ -1358,6 +1361,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception new IndexTuningConfig( null, 10000, + null, 10, null, null, @@ -1468,6 +1472,7 @@ private RealtimeIndexTask newRealtimeIndexTask() // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( + null, 1000, null, new Period("P1Y"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 6bfbd7f9983f..2cefebd6a227 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -898,6 +898,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java b/processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java new file mode 100644 index 000000000000..177c7a5f7847 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/jackson/AppendableIndexModule.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.jackson; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; + +public class AppendableIndexModule extends SimpleModule +{ + public AppendableIndexModule() + { + super("AppendableIndexFactories"); + + setMixInAnnotation(AppendableIndexSpec.class, AppendableIndexSpecMixin.class); + } + + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = OnheapIncrementalIndex.Spec.class) + @JsonSubTypes(value = { + @JsonSubTypes.Type(name = OnheapIncrementalIndex.Spec.TYPE, value = OnheapIncrementalIndex.Spec.class), + }) + public interface AppendableIndexSpecMixin + { + } +} diff --git a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java index 235d46731d69..71cf30ff733c 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java +++ b/processing/src/main/java/org/apache/druid/jackson/DefaultObjectMapper.java @@ -49,6 +49,7 @@ public DefaultObjectMapper(JsonFactory factory) registerModule(new AggregatorsModule()); registerModule(new StringComparatorModule()); registerModule(new SegmentizerModule()); + registerModule(new AppendableIndexModule()); configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); configure(MapperFeature.AUTO_DETECT_GETTERS, false); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java index 877fd34ccf75..f0b2a9e1edde 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryHelper.java @@ -39,9 +39,12 @@ import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.PostAggregator; import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.incremental.AppendableIndexBuilder; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.incremental.OffheapIncrementalIndex; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -117,24 +120,24 @@ public String apply(DimensionSpec input) .withMinTimestamp(granTimeStart.getMillis()) .build(); + + final AppendableIndexBuilder indexBuilder; + if (query.getContextValue("useOffheap", false)) { - index = new IncrementalIndex.Builder() - .setIndexSchema(indexSchema) - .setDeserializeComplexMetrics(false) - .setConcurrentEventAdd(true) - .setSortFacts(sortResults) - .setMaxRowCount(querySpecificConfig.getMaxResults()) - .buildOffheap(bufferPool); + indexBuilder = new OffheapIncrementalIndex.Builder() + .setBufferPool(bufferPool); } else { - index = new IncrementalIndex.Builder() - .setIndexSchema(indexSchema) - .setDeserializeComplexMetrics(false) - .setConcurrentEventAdd(true) - .setSortFacts(sortResults) - .setMaxRowCount(querySpecificConfig.getMaxResults()) - .buildOnheap(); + indexBuilder = new OnheapIncrementalIndex.Builder(); } + index = indexBuilder + .setIndexSchema(indexSchema) + .setDeserializeComplexMetrics(false) + .setConcurrentEventAdd(true) + .setSortFacts(sortResults) + .setMaxRowCount(querySpecificConfig.getMaxResults()) + .build(); + Accumulator accumulator = new Accumulator() { @Override diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java new file mode 100644 index 000000000000..220f0e3a6506 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexBuilder.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.incremental; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.aggregation.AggregatorFactory; + +import javax.annotation.Nullable; + +public abstract class AppendableIndexBuilder +{ + @Nullable + protected IncrementalIndexSchema incrementalIndexSchema = null; + protected boolean deserializeComplexMetrics = true; + protected boolean concurrentEventAdd = false; + protected boolean sortFacts = true; + protected int maxRowCount = 0; + protected long maxBytesInMemory = 0; + + protected final Logger log = new Logger(this.getClass().getName()); + + public AppendableIndexBuilder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema) + { + this.incrementalIndexSchema = incrementalIndexSchema; + return this; + } + + /** + * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note + * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in + * production settings. + * + * @param metrics variable array of {@link AggregatorFactory} metrics + * + * @return this + */ + @VisibleForTesting + public AppendableIndexBuilder setSimpleTestingIndexSchema(final AggregatorFactory... metrics) + { + return setSimpleTestingIndexSchema(null, metrics); + } + + + /** + * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the + * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you + * would use it in production settings. + * + * @param metrics variable array of {@link AggregatorFactory} metrics + * + * @return this + */ + @VisibleForTesting + public AppendableIndexBuilder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics) + { + IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics); + this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build(); + return this; + } + + public AppendableIndexBuilder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics) + { + this.deserializeComplexMetrics = deserializeComplexMetrics; + return this; + } + + public AppendableIndexBuilder setConcurrentEventAdd(final boolean concurrentEventAdd) + { + this.concurrentEventAdd = concurrentEventAdd; + return this; + } + + public AppendableIndexBuilder setSortFacts(final boolean sortFacts) + { + this.sortFacts = sortFacts; + return this; + } + + public AppendableIndexBuilder setMaxRowCount(final int maxRowCount) + { + this.maxRowCount = maxRowCount; + return this; + } + + public AppendableIndexBuilder setMaxBytesInMemory(final long maxBytesInMemory) + { + this.maxBytesInMemory = maxBytesInMemory; + return this; + } + + public void validate() + { + if (maxRowCount <= 0) { + throw new IllegalArgumentException("Invalid max row count: " + maxRowCount); + } + + if (incrementalIndexSchema == null) { + throw new IllegalArgumentException("incrementIndexSchema cannot be null"); + } + } + + public final IncrementalIndex build() + { + log.debug("Building appendable index."); + validate(); + return buildInner(); + } + + protected abstract IncrementalIndex buildInner(); +} diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java similarity index 53% rename from server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java rename to processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java index 9c470ab677d0..67cdabdf5673 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfigs.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/AppendableIndexSpec.java @@ -17,25 +17,19 @@ * under the License. */ -package org.apache.druid.segment.indexing; +package org.apache.druid.segment.incremental; -public class TuningConfigs +import org.apache.druid.guice.annotations.UnstableApi; + +/** + * AppendableIndexSpec describes the in-memory indexing method for data ingestion. + */ +@UnstableApi +public interface AppendableIndexSpec { - private TuningConfigs() - { - } + // Returns a builder of the appendable index. + AppendableIndexBuilder builder(); - public static long getMaxBytesInMemoryOrDefault(final long maxBytesInMemory) - { - // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting - // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on - // the actual task node's jvm memory. - long newMaxBytesInMemory = maxBytesInMemory; - if (maxBytesInMemory == 0) { - newMaxBytesInMemory = TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY; - } else if (maxBytesInMemory < 0) { - newMaxBytesInMemory = Long.MAX_VALUE; - } - return newMaxBytesInMemory; - } + // Returns the default max bytes in memory for this index. + long getDefaultMaxBytesInMemory(); } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index a718b53646d9..cc338b9854d0 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -31,7 +31,6 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.google.errorprone.annotations.concurrent.GuardedBy; -import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedRow; @@ -79,7 +78,6 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -88,7 +86,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; @@ -319,126 +316,62 @@ protected IncrementalIndex( } } - public static class Builder + /** + * This class exists only as backward competability to reduce the number of modified lines. + */ + public static class Builder extends OnheapIncrementalIndex.Builder { - @Nullable - private IncrementalIndexSchema incrementalIndexSchema; - private boolean deserializeComplexMetrics; - private boolean concurrentEventAdd; - private boolean sortFacts; - private int maxRowCount; - private long maxBytesInMemory; - - public Builder() - { - incrementalIndexSchema = null; - deserializeComplexMetrics = true; - concurrentEventAdd = false; - sortFacts = true; - maxRowCount = 0; - maxBytesInMemory = 0; - } - + @Override public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema) { - this.incrementalIndexSchema = incrementalIndexSchema; - return this; + return (Builder) super.setIndexSchema(incrementalIndexSchema); } - /** - * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note - * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in - * production settings. - * - * @param metrics variable array of {@link AggregatorFactory} metrics - * - * @return this - */ - @VisibleForTesting + @Override public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics) { - return setSimpleTestingIndexSchema(null, metrics); + return (Builder) super.setSimpleTestingIndexSchema(metrics); } - - /** - * A helper method to set a simple index schema with controllable metrics and rollup, and default values for the - * other parameters. Note that this method is normally used for testing and benchmarking; it is unlikely that you - * would use it in production settings. - * - * @param metrics variable array of {@link AggregatorFactory} metrics - * - * @return this - */ - @VisibleForTesting + @Override public Builder setSimpleTestingIndexSchema(@Nullable Boolean rollup, final AggregatorFactory... metrics) { - IncrementalIndexSchema.Builder builder = new IncrementalIndexSchema.Builder().withMetrics(metrics); - this.incrementalIndexSchema = rollup != null ? builder.withRollup(rollup).build() : builder.build(); - return this; + return (Builder) super.setSimpleTestingIndexSchema(rollup, metrics); } + @Override public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics) { - this.deserializeComplexMetrics = deserializeComplexMetrics; - return this; + return (Builder) super.setDeserializeComplexMetrics(deserializeComplexMetrics); } + @Override public Builder setConcurrentEventAdd(final boolean concurrentEventAdd) { - this.concurrentEventAdd = concurrentEventAdd; - return this; + return (Builder) super.setConcurrentEventAdd(concurrentEventAdd); } + @Override public Builder setSortFacts(final boolean sortFacts) { - this.sortFacts = sortFacts; - return this; + return (Builder) super.setSortFacts(sortFacts); } + @Override public Builder setMaxRowCount(final int maxRowCount) { - this.maxRowCount = maxRowCount; - return this; + return (Builder) super.setMaxRowCount(maxRowCount); } - //maxBytesInMemory only applies to OnHeapIncrementalIndex + @Override public Builder setMaxBytesInMemory(final long maxBytesInMemory) { - this.maxBytesInMemory = maxBytesInMemory; - return this; + return (Builder) super.setMaxBytesInMemory(maxBytesInMemory); } public OnheapIncrementalIndex buildOnheap() { - if (maxRowCount <= 0) { - throw new IllegalArgumentException("Invalid max row count: " + maxRowCount); - } - - return new OnheapIncrementalIndex( - Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"), - deserializeComplexMetrics, - concurrentEventAdd, - sortFacts, - maxRowCount, - maxBytesInMemory - ); - } - - public IncrementalIndex buildOffheap(final NonBlockingPool bufferPool) - { - if (maxRowCount <= 0) { - throw new IllegalArgumentException("Invalid max row count: " + maxRowCount); - } - - return new OffheapIncrementalIndex( - Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"), - deserializeComplexMetrics, - concurrentEventAdd, - sortFacts, - maxRowCount, - Objects.requireNonNull(bufferPool, "bufferPool is null") - ); + return (OnheapIncrementalIndex) build(); } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java index 97da9945a06e..b3cdabcd5e36 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OffheapIncrementalIndex.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; /** @@ -115,6 +116,8 @@ protected BufferAggregator[] initAggs( selectors = new HashMap<>(); aggOffsetInBuffer = new int[metrics.length]; + int aggsCurOffsetInBuffer = 0; + for (int i = 0; i < metrics.length; i++) { AggregatorFactory agg = metrics[i]; @@ -129,15 +132,11 @@ protected BufferAggregator[] initAggs( new OnheapIncrementalIndex.CachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd) ); - if (i == 0) { - aggOffsetInBuffer[i] = 0; - } else { - aggOffsetInBuffer[i] = aggOffsetInBuffer[i - 1] + metrics[i - 1].getMaxIntermediateSizeWithNulls(); - } + aggOffsetInBuffer[i] = aggsCurOffsetInBuffer; + aggsCurOffsetInBuffer += agg.getMaxIntermediateSizeWithNulls(); } - aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - - 1].getMaxIntermediateSizeWithNulls(); + aggsTotalSize = aggsCurOffsetInBuffer; return new BufferAggregator[metrics.length]; } @@ -346,4 +345,38 @@ public void close() } aggBuffers.clear(); } + + public static class Builder extends AppendableIndexBuilder + { + @Nullable + NonBlockingPool bufferPool = null; + + public Builder setBufferPool(final NonBlockingPool bufferPool) + { + this.bufferPool = bufferPool; + return this; + } + + @Override + public void validate() + { + super.validate(); + if (bufferPool == null) { + throw new IllegalArgumentException("bufferPool cannot be null"); + } + } + + @Override + protected OffheapIncrementalIndex buildInner() + { + return new OffheapIncrementalIndex( + Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"), + deserializeComplexMetrics, + concurrentEventAdd, + sortFacts, + maxRowCount, + Objects.requireNonNull(bufferPool, "bufferPool is null") + ); + } + } } diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java index 9bbdf60dbf79..a72124a21533 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnheapIncrementalIndex.java @@ -32,6 +32,7 @@ import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.utils.JvmUtils; import javax.annotation.Nullable; import java.io.IOException; @@ -40,6 +41,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -434,4 +436,51 @@ public ColumnCapabilities getColumnCapabilities(String columnName) } } + public static class Builder extends AppendableIndexBuilder + { + @Override + protected OnheapIncrementalIndex buildInner() + { + return new OnheapIncrementalIndex( + Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"), + deserializeComplexMetrics, + concurrentEventAdd, + sortFacts, + maxRowCount, + maxBytesInMemory + ); + } + } + + public static class Spec implements AppendableIndexSpec + { + public static final String TYPE = "onheap"; + + @Override + public AppendableIndexBuilder builder() + { + return new Builder(); + } + + @Override + public long getDefaultMaxBytesInMemory() + { + // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only + // tracks active index and not the index being flushed to disk, to account for that + // we halved default to 1/6(max jvm memory) + return JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6; + } + + @Override + public boolean equals(Object that) + { + return that.getClass().equals(this.getClass()); + } + + @Override + public int hashCode() + { + return Objects.hash(this.getClass()); + } + } } diff --git a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java index dfc3e533b289..534034b29bd8 100644 --- a/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/data/IncrementalIndexTest.java @@ -63,9 +63,10 @@ import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.incremental.IncrementalIndex; -import org.apache.druid.segment.incremental.IncrementalIndex.Builder; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.IndexSizeExceededException; +import org.apache.druid.segment.incremental.OffheapIncrementalIndex; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.testing.InitializedNullHandlingTest; import org.joda.time.Interval; import org.junit.AfterClass; @@ -130,10 +131,11 @@ public static Collection constructorFeeder() RESOURCE_CLOSER.register(pool1); params.add( new Object[] { - (IndexCreator) factories -> new Builder() + (IndexCreator) factories -> new OffheapIncrementalIndex.Builder() + .setBufferPool(pool1) .setSimpleTestingIndexSchema(factories) .setMaxRowCount(1000000) - .buildOffheap(pool1) + .build() } ); params.add(new Object[] {(IndexCreator) IncrementalIndexTest::createNoRollupIndex}); @@ -144,7 +146,8 @@ public static Collection constructorFeeder() RESOURCE_CLOSER.register(pool2); params.add( new Object[] { - (IndexCreator) factories -> new Builder() + (IndexCreator) factories -> new OffheapIncrementalIndex.Builder() + .setBufferPool(pool2) .setIndexSchema( new IncrementalIndexSchema.Builder() .withMetrics(factories) @@ -152,7 +155,7 @@ public static Collection constructorFeeder() .build() ) .setMaxRowCount(1000000) - .buildOffheap(pool2) + .build() } ); @@ -173,7 +176,7 @@ public static IncrementalIndex createIndex( aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES; } - return new IncrementalIndex.Builder() + return new OnheapIncrementalIndex.Builder() .setIndexSchema( new IncrementalIndexSchema.Builder() .withDimensionsSpec(dimensionsSpec) @@ -181,7 +184,7 @@ public static IncrementalIndex createIndex( .build() ) .setMaxRowCount(1000000) - .buildOnheap(); + .build(); } public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories) @@ -190,10 +193,10 @@ public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactori aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES; } - return new IncrementalIndex.Builder() + return new OnheapIncrementalIndex.Builder() .setSimpleTestingIndexSchema(aggregatorFactories) .setMaxRowCount(1000000) - .buildOnheap(); + .build(); } public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregatorFactories) @@ -202,10 +205,10 @@ public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregato aggregatorFactories = DEFAULT_AGGREGATOR_FACTORIES; } - return new IncrementalIndex.Builder() + return new OnheapIncrementalIndex.Builder() .setSimpleTestingIndexSchema(false, aggregatorFactories) .setMaxRowCount(1000000) - .buildOnheap(); + .build(); } public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException @@ -722,7 +725,7 @@ public void run() @Test public void testgetDimensions() { - final IncrementalIndex incrementalIndex = new IncrementalIndex.Builder() + final IncrementalIndex incrementalIndex = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder() .setIndexSchema( new IncrementalIndexSchema.Builder() .withMetrics(new CountAggregatorFactory("count")) @@ -736,7 +739,7 @@ public void testgetDimensions() .build() ) .setMaxRowCount(1000000) - .buildOnheap(); + .build(); closerRule.closeLater(incrementalIndex); Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); @@ -745,10 +748,10 @@ public void testgetDimensions() @Test public void testDynamicSchemaRollup() throws IndexSizeExceededException { - IncrementalIndex index = new IncrementalIndex.Builder() + IncrementalIndex index = (OnheapIncrementalIndex) new OnheapIncrementalIndex.Builder() .setSimpleTestingIndexSchema(/* empty */) .setMaxRowCount(10) - .buildOnheap(); + .build(); closerRule.closeLater(index); index.add( new MapBasedInputRow( diff --git a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java index 91863ff54344..c7b885aee783 100644 --- a/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexTest.java @@ -117,12 +117,12 @@ public static Collection constructorFeeder() @Override public IncrementalIndex createIndex() { - return new IncrementalIndex.Builder() + return new OnheapIncrementalIndex.Builder() .setIndexSchema(schema) .setDeserializeComplexMetrics(false) .setSortFacts(sortFacts) .setMaxRowCount(1000) - .buildOnheap(); + .build(); } }, Closer.create() @@ -141,11 +141,12 @@ public IncrementalIndex createIndex() @Override public IncrementalIndex createIndex() { - return new IncrementalIndex.Builder() + return new OffheapIncrementalIndex.Builder() + .setBufferPool(stupidPool) .setIndexSchema(schema) .setSortFacts(sortFacts) .setMaxRowCount(1000000) - .buildOffheap(stupidPool); + .build(); } }, poolCloser diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index 728e2ffac57b..48fb592d9431 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -25,6 +25,7 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; import org.apache.druid.segment.realtime.plumber.IntervalStartVersioningPolicy; import org.apache.druid.segment.realtime.plumber.RejectionPolicyFactory; @@ -41,9 +42,8 @@ /** * */ -public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig +public class RealtimeTuningConfig implements AppenderatorConfig { - private static final int DEFAULT_MAX_ROWS_IN_MEMORY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final Period DEFAULT_INTERMEDIATE_PERSIST_PERIOD = new Period("PT10M"); private static final Period DEFAULT_WINDOW_PERIOD = new Period("PT10M"); private static final VersioningPolicy DEFAULT_VERSIONING_POLICY = new IntervalStartVersioningPolicy(); @@ -65,6 +65,7 @@ private static File createNewBasePersistDirectory() public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File basePersistDirectory) { return new RealtimeTuningConfig( + DEFAULT_APPENDABLE_INDEX, DEFAULT_MAX_ROWS_IN_MEMORY, 0L, DEFAULT_INTERMEDIATE_PERSIST_PERIOD, @@ -87,6 +88,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File ); } + private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; private final long maxBytesInMemory; private final Period intermediatePersistPeriod; @@ -110,6 +112,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File @JsonCreator public RealtimeTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @@ -132,9 +135,10 @@ public RealtimeTuningConfig( @JsonProperty("dedupColumn") @Nullable String dedupColumn ) { + this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; // initializing this to 0, it will be lazily initialized to a value - // @see server.src.main.java.org.apache.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + // @see #getMaxBytesInMemoryOrDefault() this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? DEFAULT_INTERMEDIATE_PERSIST_PERIOD @@ -166,6 +170,13 @@ public RealtimeTuningConfig( this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN : dedupColumn; } + @Override + @JsonProperty + public AppendableIndexSpec getAppendableIndexSpec() + { + return appendableIndexSpec; + } + @Override @JsonProperty public int getMaxRowsInMemory() @@ -304,6 +315,7 @@ public String getDedupColumn() public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, intermediatePersistPeriod, @@ -330,6 +342,7 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) public RealtimeTuningConfig withBasePersistDirectory(File dir) { return new RealtimeTuningConfig( + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, intermediatePersistPeriod, diff --git a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java index 6d02ca42ac1a..e3a4e1fadd05 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/TuningConfig.java @@ -21,7 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; -import org.apache.druid.utils.JvmUtils; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; /** */ @@ -32,11 +33,37 @@ public interface TuningConfig { boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false; + AppendableIndexSpec DEFAULT_APPENDABLE_INDEX = new OnheapIncrementalIndex.Spec(); int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE; int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0; int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000; - // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only - // tracks active index and not the index being flushed to disk, to account for that - // we halved default to 1/6(max jvm memory) - long DEFAULT_MAX_BYTES_IN_MEMORY = JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() / 6; + + /** + * The incremental index implementation to use + */ + AppendableIndexSpec getAppendableIndexSpec(); + + /** + * Maximum number of bytes (estimated) to store in memory before persisting to local storage + */ + long getMaxBytesInMemory(); + + /** + * Maximum number of bytes (estimated) to store in memory before persisting to local storage. + * If getMaxBytesInMemory() returns 0, the appendable index default will be used. + */ + default long getMaxBytesInMemoryOrDefault() + { + // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting + // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on + // the actual task node's jvm memory. + final long maxBytesInMemory = getMaxBytesInMemory(); + if (maxBytesInMemory > 0) { + return maxBytesInMemory; + } else if (maxBytesInMemory == 0) { + return getAppendableIndexSpec().getDefaultMaxBytesInMemory(); + } else { + return Long.MAX_VALUE; + } + } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index f8d09e0ec415..259e58c400a6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -21,13 +21,14 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; import javax.annotation.Nullable; import java.io.File; -public interface AppenderatorConfig +public interface AppenderatorConfig extends TuningConfig { boolean isReportParseExceptions(); @@ -36,11 +37,6 @@ public interface AppenderatorConfig */ int getMaxRowsInMemory(); - /** - * Maximum number of bytes (estimated) to store in memory before persisting to local storage - */ - long getMaxBytesInMemory(); - int getMaxPendingPersists(); /** diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index c65c2ad7cf0b..c2888376bd24 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -64,7 +64,6 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.indexing.TuningConfigs; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireHydrant; @@ -199,7 +198,7 @@ public class AppenderatorImpl implements Appenderator this.sinkTimeline = sinkQuerySegmentWalker.getSinkTimeline(); } - maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()); + maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault(); } @Override @@ -404,6 +403,7 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) schema, identifier.getShardSpec(), identifier.getVersion(), + tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), maxBytesTuningConfig, null @@ -1121,6 +1121,7 @@ private Object bootstrapSinksFromDisk() schema, identifier.getShardSpec(), identifier.getVersion(), + tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), maxBytesTuningConfig, null, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 7fae74c327ff..02f75c79900d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -49,6 +49,7 @@ import org.apache.druid.segment.IndexableAdapter; import org.apache.druid.segment.ProgressIndicator; import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -382,6 +383,12 @@ public boolean isReportParseExceptions() return baseConfig.isReportParseExceptions(); } + @Override + public AppendableIndexSpec getAppendableIndexSpec() + { + return baseConfig.getAppendableIndexSpec(); + } + @Override public int getMaxRowsInMemory() { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index bae600c8b310..2aec7587a1bc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -61,7 +61,6 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.indexing.TuningConfigs; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; @@ -260,8 +259,9 @@ private Sink getSink(long timestamp) schema, config.getShardSpec(), versioningPolicy.getVersion(sinkInterval), + config.getAppendableIndexSpec(), config.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), + config.getMaxBytesInMemoryOrDefault(), config.getDedupColumn() ); addSink(retVal); @@ -723,8 +723,9 @@ public int compare(File o1, File o2) schema, config.getShardSpec(), versioningPolicy.getVersion(sinkInterval), + config.getAppendableIndexSpec(), config.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), + config.getMaxBytesInMemoryOrDefault(), config.getDedupColumn(), hydrants ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java index d6cb2230f180..ccf5e4c5421b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/Sink.java @@ -30,6 +30,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexAddResult; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -63,6 +64,7 @@ public class Sink implements Iterable, Overshadowable private final DataSchema schema; private final ShardSpec shardSpec; private final String version; + private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; private final long maxBytesInMemory; private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList<>(); @@ -79,6 +81,7 @@ public Sink( DataSchema schema, ShardSpec shardSpec, String version, + AppendableIndexSpec appendableIndexSpec, int maxRowsInMemory, long maxBytesInMemory, String dedupColumn @@ -89,6 +92,7 @@ public Sink( schema, shardSpec, version, + appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, dedupColumn, @@ -101,6 +105,7 @@ public Sink( DataSchema schema, ShardSpec shardSpec, String version, + AppendableIndexSpec appendableIndexSpec, int maxRowsInMemory, long maxBytesInMemory, String dedupColumn, @@ -111,6 +116,7 @@ public Sink( this.shardSpec = shardSpec; this.interval = interval; this.version = version; + this.appendableIndexSpec = appendableIndexSpec; this.maxRowsInMemory = maxRowsInMemory; this.maxBytesInMemory = maxBytesInMemory; this.dedupColumn = dedupColumn; @@ -322,11 +328,13 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) .withMetrics(schema.getAggregators()) .withRollup(schema.getGranularitySpec().isRollup()) .build(); - final IncrementalIndex newIndex = new IncrementalIndex.Builder() + + // Build the incremental-index according to the spec that was chosen by the user + final IncrementalIndex newIndex = appendableIndexSpec.builder() .setIndexSchema(indexSchema) .setMaxRowCount(maxRowsInMemory) .setMaxBytesInMemory(maxBytesInMemory) - .buildOnheap(); + .build(); final FireHydrant old; synchronized (hydrantLock) { diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java index ea3f7cf3f7bd..72f0b8bbfe91 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -23,6 +23,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.data.CompressionStrategy; +import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.hamcrest.CoreMatchers; import org.joda.time.Period; @@ -89,6 +90,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertNotNull(config.getBasePersistDirectory()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); Assert.assertEquals(0, config.getAlertTimeout()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); @@ -119,7 +121,8 @@ public void testSerdeWithNonDefaults() throws Exception + " \"handoffConditionTimeout\": 100,\n" + " \"alertTimeout\": 70,\n" + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" - + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + "}"; ObjectMapper mapper = TestHelper.makeJsonMapper(); @@ -134,6 +137,7 @@ public void testSerdeWithNonDefaults() throws Exception ); Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString()); + Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); Assert.assertEquals(70, config.getAlertTimeout()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index ee79dc1f2286..87df0e8d532c 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -62,6 +62,7 @@ public AppenderatorPlumberTest() throws Exception EasyMock.anyObject())).andReturn(true).anyTimes(); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + null, 1, null, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index 9e57d2831316..c77d42991dbc 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -162,6 +162,7 @@ public AppenderatorTester( objectMapper ); tuningConfig = new RealtimeTuningConfig( + null, maxRowsInMemory, maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index 9e9d350a30ab..6e38c1cbe614 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -134,6 +134,7 @@ public int columnCacheSizeBytes() ); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + null, 75000, null, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 0eb62e8ceca3..9b73cbe12cd3 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -48,7 +48,6 @@ import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.indexing.TuningConfigs; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; @@ -200,6 +199,7 @@ public void setUp() throws Exception EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter); tuningConfig = new RealtimeTuningConfig( + null, 1, null, null, @@ -278,8 +278,9 @@ private void testPersist(final Object commitMetadata) throws Exception schema, tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), + tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), + tuningConfig.getMaxBytesInMemoryOrDefault(), tuningConfig.getDedupColumn() ); plumber.getSinks().put(0L, sink); @@ -323,8 +324,9 @@ public void testPersistFails() throws Exception schema, tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), + tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), + tuningConfig.getMaxBytesInMemoryOrDefault(), tuningConfig.getDedupColumn() ); plumber.getSinks().put(0L, sink); @@ -373,8 +375,9 @@ private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Ex schema2, tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), + tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), + tuningConfig.getMaxBytesInMemoryOrDefault(), tuningConfig.getDedupColumn() ); plumber2.getSinks().put(0L, sink); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java index bf94b2f70c89..477c3bd9b693 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java @@ -34,7 +34,6 @@ import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.indexing.TuningConfigs; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -66,6 +65,7 @@ public void testSwap() throws Exception final Interval interval = Intervals.of("2013-01-01/2013-01-02"); final String version = DateTimes.nowUtc().toString(); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + null, 100, null, new Period("P1Y"), @@ -91,8 +91,9 @@ public void testSwap() throws Exception schema, tuningConfig.getShardSpec(), version, + tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), + tuningConfig.getMaxBytesInMemoryOrDefault(), tuningConfig.getDedupColumn() ); @@ -220,6 +221,7 @@ public void testDedup() throws Exception final Interval interval = Intervals.of("2013-01-01/2013-01-02"); final String version = DateTimes.nowUtc().toString(); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + null, 100, null, new Period("P1Y"), @@ -245,8 +247,9 @@ public void testDedup() throws Exception schema, tuningConfig.getShardSpec(), version, + tuningConfig.getAppendableIndexSpec(), tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), + tuningConfig.getMaxBytesInMemoryOrDefault(), tuningConfig.getDedupColumn() ); diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index bb759b6ddf7c..8dec45341c35 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -156,6 +156,7 @@ public void testTaskValidator() throws Exception ), new RealtimeTuningConfig( + null, 1, null, new Period("PT10M"),