diff --git a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java index 8132309ac865..3028a7260b73 100644 --- a/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java +++ b/api/src/main/java/io/druid/data/input/impl/DimensionsSpec.java @@ -44,10 +44,7 @@ public class DimensionsSpec private final Set dimensionExclusions; private final Map dimensionSchemaMap; - public static DimensionsSpec ofEmpty() - { - return new DimensionsSpec(null, null, null); - } + public static final DimensionsSpec EMPTY = new DimensionsSpec(null, null, null); public static List getDefaultSchemas(List dimNames) { diff --git a/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java index 50fcab83d985..227604244ce5 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/FilterPartitionBenchmark.java @@ -29,7 +29,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -72,8 +71,6 @@ import io.druid.segment.filter.OrFilter; import io.druid.segment.filter.SelectorFilter; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.joda.time.Interval; @@ -228,17 +225,11 @@ public void tearDown() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } @Benchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/FilteredAggregatorBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/FilteredAggregatorBenchmark.java index 40a5180d5483..13c079f5b053 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/FilteredAggregatorBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/FilteredAggregatorBenchmark.java @@ -28,7 +28,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.benchmark.query.QueryBenchmarkUtil; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -73,8 +72,6 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; @@ -239,17 +236,11 @@ public void tearDown() throws IOException private IncrementalIndex makeIncIndex(AggregatorFactory[] metrics) { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(metrics) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(metrics) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) diff --git a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java index e4bf398dcb17..8cfc191aa28a 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -36,7 +36,6 @@ import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; import io.druid.data.input.Row; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -73,8 +72,6 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; @@ -433,17 +430,12 @@ public String getFormatString() private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setConcurrentEventAdd(true) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } @TearDown(Level.Trial) diff --git a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexRowTypeBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexRowTypeBenchmark.java index 4b900568552f..02c08a3b3f84 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexRowTypeBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexRowTypeBenchmark.java @@ -22,13 +22,11 @@ import com.google.common.collect.ImmutableMap; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; -import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Level; @@ -120,15 +118,12 @@ private MapBasedInputRow getStringRow(long timestamp, int rowID, int dimensionCo private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - 0, - Granularities.NONE, - aggs, - false, - false, - true, - maxRows - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(aggs) + .setDeserializeComplexMetrics(false) + .setReportParseExceptions(false) + .setMaxRowCount(maxRows) + .buildOnheap(); } @Setup diff --git a/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java index 48d312aed74a..542642962edc 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/TopNTypeInterfaceBenchmark.java @@ -30,7 +30,6 @@ import io.druid.collections.StupidPool; import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -70,8 +69,6 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -314,17 +311,11 @@ public void setup() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java index 431bb2a7ff36..57adb66c88a3 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java @@ -24,7 +24,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; @@ -49,7 +48,6 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -124,18 +122,16 @@ public void setup() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .withRollup(rollup) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } @Benchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java index 69b3b017f564..6ad20611586a 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexIngestionBenchmark.java @@ -23,14 +23,11 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; @@ -104,18 +101,16 @@ public void setup2() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .withRollup(rollup) - .build(), - true, - false, - true, - rowsPerSegment * 2 - ); + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment * 2) + .buildOnheap(); } @Benchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java index 6d8fdaac4d3b..91eb5e1d098d 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexMergeBenchmark.java @@ -25,10 +25,8 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.IndexIO; @@ -39,7 +37,6 @@ import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; @@ -161,18 +158,16 @@ public void tearDown() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) .withRollup(rollup) - .build(), - true, - false, - true, - rowsPerSegment - ); + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } @Benchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java index 7bd825262815..0a1520372cfe 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/indexing/IndexPersistBenchmark.java @@ -25,10 +25,8 @@ import io.druid.benchmark.datagen.BenchmarkSchemaInfo; import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.IndexIO; @@ -38,7 +36,6 @@ import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; @@ -155,18 +152,16 @@ public void teardown() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) .withRollup(rollup) - .build(), - true, - false, - true, - rowsPerSegment - ); + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } @Benchmark diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index 417cdfcc406f..6e028b99642b 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -36,7 +36,6 @@ import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; import io.druid.data.input.Row; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -76,7 +75,6 @@ import io.druid.segment.column.ValueType; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; @@ -476,18 +474,17 @@ public String getFormatString() private IncrementalIndex makeIncIndex(boolean withRollup) { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) .withRollup(withRollup) - .build(), - true, - false, - true, - rowsPerSegment - ); + .build() + ) + .setReportParseExceptions(false) + .setConcurrentEventAdd(true) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } @TearDown(Level.Trial) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java index 15119da67ca9..053d0730890a 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SearchBenchmark.java @@ -32,7 +32,6 @@ import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; import io.druid.data.input.Row; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -76,8 +75,6 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; @@ -388,17 +385,11 @@ public void tearDown() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java index c21af6bde443..426f2da0f2be 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SelectBenchmark.java @@ -31,7 +31,6 @@ import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; import io.druid.data.input.Row; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -66,8 +65,6 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; @@ -252,17 +249,11 @@ public void tearDown() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java index f0a6c7b36983..221e9381c8be 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/TimeseriesBenchmark.java @@ -28,7 +28,6 @@ import io.druid.benchmark.datagen.BenchmarkSchemas; import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -70,8 +69,6 @@ import io.druid.segment.column.Column; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.joda.time.Interval; @@ -312,17 +309,11 @@ public void tearDown() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java index b2d337c6e4e0..29a10ae96f17 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java @@ -29,7 +29,6 @@ import io.druid.collections.StupidPool; import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.hll.HyperLogLogHash; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.granularity.Granularities; @@ -68,8 +67,6 @@ import io.druid.segment.QueryableIndexSegment; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import org.apache.commons.io.FileUtils; import org.openjdk.jmh.annotations.Benchmark; @@ -292,17 +289,11 @@ public void tearDown() throws IOException private IncrementalIndex makeIncIndex() { - return new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMetrics(schemaInfo.getAggsArray()) - .withDimensionsSpec(new DimensionsSpec(null, null, null)) - .build(), - true, - false, - true, - rowsPerSegment - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(schemaInfo.getAggsArray()) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } private static List runQuery(QueryRunnerFactory factory, QueryRunner runner, Query query) diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java index 36a8e378a0c7..5dfd1c176b1c 100644 --- a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountGroupByQueryTest.java @@ -25,7 +25,6 @@ import io.druid.data.input.Row; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.QueryRunnerTestHelper; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.dimension.DefaultDimensionSpec; import io.druid.query.dimension.DimensionSpec; @@ -40,7 +39,7 @@ import io.druid.segment.Segment; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import org.junit.Test; import java.util.Arrays; @@ -56,9 +55,17 @@ public void testGroupByWithDistinctCountAgg() throws Exception config.setMaxIntermediateRows(10000); final GroupByQueryRunnerFactory factory = GroupByQueryRunnerTest.makeQueryRunnerFactory(config); - IncrementalIndex index = new OnheapIncrementalIndex( - 0, Granularities.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 - ); + IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.SECOND) + .withMetrics(new CountAggregatorFactory("cnt")) + .build() + ) + .setConcurrentEventAdd(true) + .setMaxRowCount(1000) + .buildOnheap(); + String visitor_id = "visitor_id"; String client_type = "client_type"; long timestamp = System.currentTimeMillis(); diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java index fbda378b0108..d56eeb69413e 100644 --- a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTimeseriesQueryTest.java @@ -27,15 +27,14 @@ import io.druid.query.Druids; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.timeseries.TimeseriesQueryEngine; import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.junit.Test; @@ -50,9 +49,16 @@ public void testTopNWithDistinctCountAgg() throws Exception { TimeseriesQueryEngine engine = new TimeseriesQueryEngine(); - IncrementalIndex index = new OnheapIncrementalIndex( - 0, Granularities.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 - ); + IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.SECOND) + .withMetrics(new CountAggregatorFactory("cnt")) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); + String visitor_id = "visitor_id"; String client_type = "client_type"; DateTime time = new DateTime("2016-03-04T00:00:00.000Z"); diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java index 1e6b6d100681..190d79ed9bd7 100644 --- a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java @@ -29,7 +29,6 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.topn.TopNQuery; import io.druid.query.topn.TopNQueryBuilder; @@ -37,8 +36,8 @@ import io.druid.query.topn.TopNResultValue; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.junit.Test; @@ -68,9 +67,16 @@ public ByteBuffer get() ) ); - IncrementalIndex index = new OnheapIncrementalIndex( - 0, Granularities.SECOND, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 - ); + IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.SECOND) + .withMetrics(new CountAggregatorFactory("cnt")) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); + String visitor_id = "visitor_id"; String client_type = "client_type"; DateTime time = new DateTime("2016-03-04T00:00:00.000Z"); diff --git a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java index 254f4dceda96..fd768f658f27 100644 --- a/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java +++ b/extensions-contrib/scan-query/src/test/java/io/druid/query/scan/MultiSegmentScanQueryTest.java @@ -39,7 +39,6 @@ import io.druid.segment.TestIndex; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.apache.commons.io.IOUtils; @@ -150,7 +149,10 @@ private static IncrementalIndex newIndex(String minTimeStamp, int maxRowCount) .withQueryGranularity(Granularities.HOUR) .withMetrics(TestIndex.METRIC_AGGS) .build(); - return new OnheapIncrementalIndex(schema, true, maxRowCount); + return new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(maxRowCount) + .buildOnheap(); } @AfterClass diff --git a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java index 11ef3c9fea8d..bb61149bc18e 100644 --- a/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java +++ b/extensions-contrib/virtual-columns/src/test/java/io/druid/segment/MapVirtualColumnTest.java @@ -30,7 +30,6 @@ import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.jackson.DefaultObjectMapper; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; import io.druid.query.QueryRunner; @@ -46,7 +45,6 @@ import io.druid.query.select.SelectResultValue; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; @@ -87,9 +85,11 @@ public static Iterable constructorFeeder() throws IOException final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) - .withQueryGranularity(Granularities.NONE) .build(); - final IncrementalIndex index = new OnheapIncrementalIndex(schema, true, 10000); + final IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(10000) + .buildOnheap(); final StringInputRowParser parser = new StringInputRowParser( new DelimitedParseSpec( diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index 2c6fc22c64fb..ead58242e977 100644 --- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -27,7 +27,6 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; import io.druid.query.QueryContexts; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; @@ -96,18 +95,16 @@ public void setUp() throws Exception .schema( new IncrementalIndexSchema.Builder() .withMetrics( - new AggregatorFactory[]{ - new CountAggregatorFactory("cnt"), - new DoubleSumAggregatorFactory("m1", "m1"), - new ApproximateHistogramAggregatorFactory( - "hist_m1", - "m1", - null, - null, - null, - null - ) - } + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new ApproximateHistogramAggregatorFactory( + "hist_m1", + "m1", + null, + null, + null, + null + ) ) .withRollup(false) .build() diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 2a07fa521590..f511071205ac 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -49,7 +49,6 @@ import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.ShardSpec; @@ -237,11 +236,11 @@ private static IncrementalIndex makeIncrementalIndex( .withRollup(config.getSchema().getDataSchema().getGranularitySpec().isRollup()) .build(); - OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex( - indexSchema, - !tuningConfig.isIgnoreInvalidRows(), - tuningConfig.getRowFlushBoundary() - ); + IncrementalIndex newIndex = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) + .setMaxRowCount(tuningConfig.getRowFlushBoundary()) + .buildOnheap(); if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) { newIndex.loadDimensionIterable(oldDimOrder, oldCapabilities); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 71b656e57b44..e98291228725 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -54,10 +54,8 @@ import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.supervisor.SupervisorManager; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.logger.Logger; import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.SelectorDimFilter; @@ -65,8 +63,8 @@ import io.druid.segment.IndexMerger; import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; +import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.DataSegmentMover; @@ -133,21 +131,17 @@ public static Collection constructorFeeder() throws IOException } ); final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) .withMinTimestamp(JodaUtils.MIN_INSTANT) .withDimensionsSpec(ROW_PARSER) .withMetrics( - new AggregatorFactory[]{ - new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME), - new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME) - } + new LongSumAggregatorFactory(METRIC_LONG_NAME, DIM_LONG_NAME), + new DoubleSumAggregatorFactory(METRIC_FLOAT_NAME, DIM_FLOAT_NAME) ) .build(); - final OnheapIncrementalIndex index = new OnheapIncrementalIndex( - schema, - true, - MAX_ROWS * MAX_SHARD_NUMBER - ); + final IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(MAX_ROWS * MAX_SHARD_NUMBER) + .buildOnheap(); for (Integer i = 0; i < MAX_ROWS; ++i) { index.add(ROW_PARSER.parse(buildRow(i.longValue()))); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 7b9b1c972540..4fc2b7a63bc5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -49,17 +49,15 @@ import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; -import io.druid.java.util.common.granularity.Granularities; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.filter.NoopDimFilter; import io.druid.segment.IndexIO; import io.druid.segment.IndexMerger; import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; +import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; @@ -211,16 +209,14 @@ private static Map persist(File tmpDir, InputRow... rows) { final File persistDir = new File(tmpDir, UUID.randomUUID().toString()); final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) .withMinTimestamp(JodaUtils.MIN_INSTANT) .withDimensionsSpec(ROW_PARSER) - .withMetrics( - new AggregatorFactory[]{ - new LongSumAggregatorFactory(METRICS[0], METRICS[0]) - } - ) + .withMetrics(new LongSumAggregatorFactory(METRICS[0], METRICS[0])) .build(); - final OnheapIncrementalIndex index = new OnheapIncrementalIndex(schema, true, rows.length); + final IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(rows.length) + .buildOnheap(); for (InputRow row : rows) { try { diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java index 64f67e14a4c5..b9d0910d8dd5 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryHelper.java @@ -44,8 +44,6 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OffheapIncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import java.nio.ByteBuffer; @@ -120,22 +118,21 @@ public String apply(DimensionSpec input) .build(); if (query.getContextValue("useOffheap", false)) { - index = new OffheapIncrementalIndex( - indexSchema, - false, - true, - sortResults, - querySpecificConfig.getMaxResults(), - bufferPool - ); + index = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setDeserializeComplexMetrics(false) + .setConcurrentEventAdd(true) + .setSortFacts(sortResults) + .setMaxRowCount(querySpecificConfig.getMaxResults()) + .buildOffheap(bufferPool); } else { - index = new OnheapIncrementalIndex( - indexSchema, - false, - true, - sortResults, - querySpecificConfig.getMaxResults() - ); + index = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setDeserializeComplexMetrics(false) + .setConcurrentEventAdd(true) + .setSortFacts(sortResults) + .setMaxRowCount(querySpecificConfig.getMaxResults()) + .buildOnheap(); } Accumulator accumulator = new Accumulator() diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 194d04f2e812..9e1c121abec8 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -30,6 +30,7 @@ import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; +import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -67,6 +68,7 @@ import javax.annotation.concurrent.GuardedBy; import java.io.Closeable; import java.lang.reflect.Array; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -75,6 +77,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; @@ -210,16 +213,21 @@ public ColumnCapabilities getColumnCapabilities(String columnName) * Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that * should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics. * + * Set concurrentEventAdd to true to indicate that adding of input row should be thread-safe (for example, groupBy + * where the multiple threads can add concurrently to the IncrementalIndex). + * * @param incrementalIndexSchema the schema to use for incremental index * @param deserializeComplexMetrics flag whether or not to call ComplexMetricExtractor.extractValue() on the input * value for aggregators that return metrics other than float. * @param reportParseExceptions flag whether or not to report ParseExceptions that occur while extracting values * from input rows + * @param concurrentEventAdd flag whether ot not adding of input rows should be thread-safe */ - public IncrementalIndex( + protected IncrementalIndex( final IncrementalIndexSchema incrementalIndexSchema, final boolean deserializeComplexMetrics, - final boolean reportParseExceptions + final boolean reportParseExceptions, + final boolean concurrentEventAdd ) { this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); @@ -238,7 +246,7 @@ public IncrementalIndex( .setQueryGranularity(this.gran) .setRollup(this.rollup); - this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics); + this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics, concurrentEventAdd); this.metricDescs = Maps.newLinkedHashMap(); for (AggregatorFactory metric : metrics) { @@ -280,6 +288,113 @@ public IncrementalIndex( } } + public static class Builder + { + private IncrementalIndexSchema incrementalIndexSchema; + private boolean deserializeComplexMetrics; + private boolean reportParseExceptions; + private boolean concurrentEventAdd; + private boolean sortFacts; + private int maxRowCount; + + public Builder() + { + incrementalIndexSchema = null; + deserializeComplexMetrics = true; + reportParseExceptions = true; + concurrentEventAdd = false; + sortFacts = true; + maxRowCount = 0; + } + + public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema) + { + this.incrementalIndexSchema = incrementalIndexSchema; + return this; + } + + /** + * A helper method to set a simple index schema with only metrics and default values for the other parameters. Note + * that this method is normally used for testing and benchmarking; it is unlikely that you would use it in + * production settings. + * + * @param metrics variable array of {@link AggregatorFactory} metrics + * + * @return this + */ + @VisibleForTesting + public Builder setSimpleTestingIndexSchema(final AggregatorFactory... metrics) + { + this.incrementalIndexSchema = new IncrementalIndexSchema.Builder() + .withMetrics(metrics) + .build(); + return this; + } + + public Builder setDeserializeComplexMetrics(final boolean deserializeComplexMetrics) + { + this.deserializeComplexMetrics = deserializeComplexMetrics; + return this; + } + + public Builder setReportParseExceptions(final boolean reportParseExceptions) + { + this.reportParseExceptions = reportParseExceptions; + return this; + } + + public Builder setConcurrentEventAdd(final boolean concurrentEventAdd) + { + this.concurrentEventAdd = concurrentEventAdd; + return this; + } + + public Builder setSortFacts(final boolean sortFacts) + { + this.sortFacts = sortFacts; + return this; + } + + public Builder setMaxRowCount(final int maxRowCount) + { + this.maxRowCount = maxRowCount; + return this; + } + + public IncrementalIndex buildOnheap() + { + if (maxRowCount <= 0) { + throw new IllegalArgumentException("Invalid max row count: " + maxRowCount); + } + + return new OnheapIncrementalIndex( + Objects.requireNonNull(incrementalIndexSchema, "incrementIndexSchema is null"), + deserializeComplexMetrics, + reportParseExceptions, + concurrentEventAdd, + sortFacts, + maxRowCount + ); + } + + public IncrementalIndex buildOffheap(final StupidPool bufferPool) + { + if (maxRowCount <= 0) { + throw new IllegalArgumentException("Invalid max row count: " + maxRowCount); + } + + return new OffheapIncrementalIndex( + Objects.requireNonNull(incrementalIndexSchema, "incrementalIndexSchema is null"), + deserializeComplexMetrics, + reportParseExceptions, + concurrentEventAdd, + sortFacts, + maxRowCount, + Objects.requireNonNull(bufferPool, "bufferPool is null") + ); + } + } + public boolean isRollup() { return rollup; @@ -294,7 +409,8 @@ public boolean isRollup() protected abstract AggregatorType[] initAggs( AggregatorFactory[] metrics, Supplier rowSupplier, - boolean deserializeComplexMetrics + boolean deserializeComplexMetrics, + boolean concurrentEventAdd ); // Note: This method needs to be thread safe. diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java index d9e138511787..5d583ceb2a26 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexSchema.java @@ -109,7 +109,7 @@ public Builder() this.minTimestamp = 0L; this.gran = Granularities.NONE; this.virtualColumns = VirtualColumns.EMPTY; - this.dimensionsSpec = new DimensionsSpec(null, null, null); + this.dimensionsSpec = DimensionsSpec.EMPTY; this.metrics = new AggregatorFactory[]{}; this.rollup = true; } @@ -152,7 +152,7 @@ public Builder withVirtualColumns(VirtualColumns virtualColumns) public Builder withDimensionsSpec(DimensionsSpec dimensionsSpec) { - this.dimensionsSpec = dimensionsSpec == null ? DimensionsSpec.ofEmpty() : dimensionsSpec; + this.dimensionsSpec = dimensionsSpec == null ? DimensionsSpec.EMPTY : dimensionsSpec; return this; } @@ -169,7 +169,7 @@ public Builder withDimensionsSpec(InputRowParser parser) return this; } - public Builder withMetrics(AggregatorFactory[] metrics) + public Builder withMetrics(AggregatorFactory... metrics) { this.metrics = metrics; return this; diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index d01f9969fb3b..598d454a27c3 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -26,7 +26,6 @@ import io.druid.data.input.InputRow; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; -import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.AggregatorFactory; @@ -66,16 +65,17 @@ public class OffheapIncrementalIndex extends IncrementalIndex private String outOfRowsReason = null; - public OffheapIncrementalIndex( + OffheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean concurrentEventAdd, boolean sortFacts, int maxRowCount, StupidPool bufferPool ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); this.maxRowCount = maxRowCount; this.bufferPool = bufferPool; @@ -91,47 +91,6 @@ public OffheapIncrementalIndex( aggBuffers.add(bb); } - public OffheapIncrementalIndex( - long minTimestamp, - Granularity gran, - boolean rollup, - final AggregatorFactory[] metrics, - int maxRowCount, - StupidPool bufferPool - ) - { - this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .withRollup(rollup) - .build(), - true, - true, - true, - maxRowCount, - bufferPool - ); - } - - public OffheapIncrementalIndex( - long minTimestamp, - Granularity gran, - final AggregatorFactory[] metrics, - int maxRowCount, - StupidPool bufferPool - ) - { - this( - minTimestamp, - gran, - IncrementalIndexSchema.DEFAULT_ROLLUP, - metrics, - maxRowCount, - bufferPool - ); - } - @Override public FactsHolder getFacts() { @@ -140,7 +99,10 @@ public FactsHolder getFacts() @Override protected BufferAggregator[] initAggs( - AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics + final AggregatorFactory[] metrics, + final Supplier rowSupplier, + final boolean deserializeComplexMetrics, + final boolean concurrentEventAdd ) { selectors = Maps.newHashMap(); @@ -157,7 +119,7 @@ protected BufferAggregator[] initAggs( selectors.put( agg.getName(), - new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory) + new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory, concurrentEventAdd) ); if (i == 0) { diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 8afec755ee50..059d954772cc 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -23,9 +23,7 @@ import com.google.common.base.Throwables; import com.google.common.collect.Maps; import io.druid.data.input.InputRow; -import io.druid.data.input.impl.DimensionsSpec; import io.druid.java.util.common.io.Closer; -import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.Aggregator; @@ -40,9 +38,9 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; /** @@ -59,93 +57,22 @@ public class OnheapIncrementalIndex extends IncrementalIndex private String outOfRowsReason = null; - public OnheapIncrementalIndex( + OnheapIncrementalIndex( IncrementalIndexSchema incrementalIndexSchema, boolean deserializeComplexMetrics, boolean reportParseExceptions, + boolean concurrentEventAdd, boolean sortFacts, int maxRowCount ) { - super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions); + super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); this.maxRowCount = maxRowCount; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) : new PlainFactsHolder(sortFacts); } - public OnheapIncrementalIndex( - long minTimestamp, - Granularity gran, - final AggregatorFactory[] metrics, - boolean deserializeComplexMetrics, - boolean reportParseExceptions, - boolean sortFacts, - int maxRowCount - ) - { - this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .withRollup(IncrementalIndexSchema.DEFAULT_ROLLUP) - .build(), - deserializeComplexMetrics, - reportParseExceptions, - sortFacts, - maxRowCount - ); - } - - public OnheapIncrementalIndex( - long minTimestamp, - Granularity gran, - boolean rollup, - DimensionsSpec dimensionsSpec, - AggregatorFactory[] metrics, - int maxRowCount - ) - { - this( - new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withDimensionsSpec(dimensionsSpec) - .withMetrics(metrics) - .withRollup(rollup) - .build(), - true, - true, - true, - maxRowCount - ); - } - - public OnheapIncrementalIndex( - long minTimestamp, - Granularity gran, - final AggregatorFactory[] metrics, - int maxRowCount - ) - { - this( - minTimestamp, - gran, - IncrementalIndexSchema.DEFAULT_ROLLUP, - null, - metrics, - maxRowCount - ); - } - - public OnheapIncrementalIndex( - IncrementalIndexSchema incrementalIndexSchema, - boolean reportParseExceptions, - int maxRowCount - ) - { - this(incrementalIndexSchema, true, reportParseExceptions, true, maxRowCount); - } - @Override public FactsHolder getFacts() { @@ -154,14 +81,20 @@ public FactsHolder getFacts() @Override protected Aggregator[] initAggs( - AggregatorFactory[] metrics, Supplier rowSupplier, boolean deserializeComplexMetrics + final AggregatorFactory[] metrics, + final Supplier rowSupplier, + final boolean deserializeComplexMetrics, + final boolean concurrentEventAdd ) { selectors = Maps.newHashMap(); for (AggregatorFactory agg : metrics) { selectors.put( agg.getName(), - new ObjectCachingColumnSelectorFactory(makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)) + new ObjectCachingColumnSelectorFactory( + makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics), + concurrentEventAdd + ) ); } @@ -246,7 +179,7 @@ private void doAggregate( { rowContainer.set(row); - for (int i = 0 ; i < aggs.length ; i++) { + for (int i = 0; i < aggs.length; i++) { final Aggregator agg = aggs[i]; synchronized (agg) { try { @@ -363,17 +296,28 @@ public void close() // Caches references to selector objects for each column instead of creating a new object each time in order to save heap space. // In general the selectorFactory need not to thread-safe. - // here its made thread safe to support the special case of groupBy where the multiple threads can add concurrently to the IncrementalIndex. + // If required, set concurrentEventAdd to true to use concurrent hash map instead of vanilla hash map for thread-safe + // operations. static class ObjectCachingColumnSelectorFactory implements ColumnSelectorFactory { - private final ConcurrentMap longColumnSelectorMap = Maps.newConcurrentMap(); - private final ConcurrentMap floatColumnSelectorMap = Maps.newConcurrentMap(); - private final ConcurrentMap objectColumnSelectorMap = Maps.newConcurrentMap(); + private final Map longColumnSelectorMap; + private final Map floatColumnSelectorMap; + private final Map objectColumnSelectorMap; private final ColumnSelectorFactory delegate; - public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate) + public ObjectCachingColumnSelectorFactory(ColumnSelectorFactory delegate, boolean concurrentEventAdd) { this.delegate = delegate; + + if (concurrentEventAdd) { + longColumnSelectorMap = new ConcurrentHashMap<>(); + floatColumnSelectorMap = new ConcurrentHashMap<>(); + objectColumnSelectorMap = new ConcurrentHashMap<>(); + } else { + longColumnSelectorMap = new HashMap<>(); + floatColumnSelectorMap = new HashMap<>(); + objectColumnSelectorMap = new HashMap<>(); + } } @Override @@ -385,49 +329,31 @@ public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) @Override public FloatColumnSelector makeFloatColumnSelector(String columnName) { - FloatColumnSelector existing = floatColumnSelectorMap.get(columnName); + final FloatColumnSelector existing = floatColumnSelectorMap.get(columnName); if (existing != null) { return existing; - } else { - FloatColumnSelector newSelector = delegate.makeFloatColumnSelector(columnName); - FloatColumnSelector prev = floatColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; } + return floatColumnSelectorMap.computeIfAbsent(columnName, delegate::makeFloatColumnSelector); } @Override public LongColumnSelector makeLongColumnSelector(String columnName) { - LongColumnSelector existing = longColumnSelectorMap.get(columnName); + final LongColumnSelector existing = longColumnSelectorMap.get(columnName); if (existing != null) { return existing; - } else { - LongColumnSelector newSelector = delegate.makeLongColumnSelector(columnName); - LongColumnSelector prev = longColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; } + return longColumnSelectorMap.computeIfAbsent(columnName, delegate::makeLongColumnSelector); } @Override public ObjectColumnSelector makeObjectColumnSelector(String columnName) { - ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName); + final ObjectColumnSelector existing = objectColumnSelectorMap.get(columnName); if (existing != null) { return existing; - } else { - ObjectColumnSelector newSelector = delegate.makeObjectColumnSelector(columnName); - ObjectColumnSelector prev = objectColumnSelectorMap.putIfAbsent( - columnName, - newSelector - ); - return prev != null ? prev : newSelector; } + return objectColumnSelectorMap.computeIfAbsent(columnName, delegate::makeObjectColumnSelector); } @Nullable diff --git a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java index 614d45ff18b0..3113ce5bafe0 100644 --- a/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java +++ b/processing/src/test/java/io/druid/query/MultiValuedDimensionTest.java @@ -60,7 +60,6 @@ import io.druid.segment.Segment; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.junit.AfterClass; @@ -113,17 +112,10 @@ public static Collection constructorFeeder() throws IOException @BeforeClass public static void setupClass() throws Exception { - incrementalIndex = new OnheapIncrementalIndex( - 0, - Granularities.NONE, - new AggregatorFactory[]{ - new CountAggregatorFactory("count") - }, - true, - true, - true, - 5000 - ); + incrementalIndex = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(5000) + .buildOnheap(); StringInputRowParser parser = new StringInputRowParser( new CSVParseSpec( diff --git a/processing/src/test/java/io/druid/query/SchemaEvolutionTest.java b/processing/src/test/java/io/druid/query/SchemaEvolutionTest.java index b6f9c32e25ca..76b329b13241 100644 --- a/processing/src/test/java/io/druid/query/SchemaEvolutionTest.java +++ b/processing/src/test/java/io/druid/query/SchemaEvolutionTest.java @@ -150,7 +150,7 @@ public void setUp() throws IOException .tmpDir(temporaryFolder.newFolder()) .schema( new IncrementalIndexSchema.Builder() - .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("cnt")}) + .withMetrics(new CountAggregatorFactory("cnt")) .withRollup(false) .build() ) @@ -162,11 +162,11 @@ public void setUp() throws IOException .tmpDir(temporaryFolder.newFolder()) .schema( new IncrementalIndexSchema.Builder() - .withMetrics(new AggregatorFactory[]{ + .withMetrics( new CountAggregatorFactory("cnt"), new LongSumAggregatorFactory("c1", "c1"), new HyperUniquesAggregatorFactory("uniques", "c2") - }) + ) .withRollup(false) .build() ) @@ -178,11 +178,11 @@ public void setUp() throws IOException .tmpDir(temporaryFolder.newFolder()) .schema( new IncrementalIndexSchema.Builder() - .withMetrics(new AggregatorFactory[]{ + .withMetrics( new CountAggregatorFactory("cnt"), new DoubleSumAggregatorFactory("c1", "c1"), new HyperUniquesAggregatorFactory("uniques", "c2") - }) + ) .withRollup(false) .build() ) @@ -194,9 +194,7 @@ public void setUp() throws IOException .tmpDir(temporaryFolder.newFolder()) .schema( new IncrementalIndexSchema.Builder() - .withMetrics(new AggregatorFactory[]{ - new HyperUniquesAggregatorFactory("c2", "c2") - }) + .withMetrics(new HyperUniquesAggregatorFactory("c2", "c2")) .withRollup(false) .build() ) diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index d85979fd78b6..c0061ac1c858 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -77,7 +77,7 @@ import io.druid.segment.TestHelper; import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import org.apache.commons.io.IOUtils; import org.apache.commons.io.LineIterator; import org.junit.rules.TemporaryFolder; @@ -413,7 +413,18 @@ public void createIndex( List toMerge = new ArrayList<>(); try { - index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount); + index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build() + ) + .setDeserializeComplexMetrics(deserializeComplexMetrics) + .setMaxRowCount(maxRowCount) + .buildOnheap(); + while (rows.hasNext()) { Object row = rows.next(); if (!index.canAppendRow()) { @@ -421,7 +432,17 @@ public void createIndex( toMerge.add(tmp); indexMerger.persist(index, tmp, new IndexSpec()); index.close(); - index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount); + index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build() + ) + .setDeserializeComplexMetrics(deserializeComplexMetrics) + .setMaxRowCount(maxRowCount) + .buildOnheap(); } if (row instanceof String && parser instanceof StringInputRowParser) { //Note: this is required because StringInputRowParser is InputRowParser as opposed to diff --git a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java index f66d577ec12d..50f43754e5de 100644 --- a/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java +++ b/processing/src/test/java/io/druid/query/datasourcemetadata/DataSourceMetadataQueryTest.java @@ -27,7 +27,6 @@ import com.google.common.collect.MapMaker; import io.druid.data.input.MapBasedInputRow; import io.druid.jackson.DefaultObjectMapper; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequences; import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.Druids; @@ -38,11 +37,9 @@ import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerTestHelper; import io.druid.query.Result; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.timeline.LogicalSegment; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -114,10 +111,11 @@ public void testContextSerde() throws Exception @Test public void testMaxIngestedEventTime() throws Exception { - final IncrementalIndex rtIndex = new OnheapIncrementalIndex( - 0L, Granularities.NONE, new AggregatorFactory[]{new CountAggregatorFactory("count")}, 1000 - ); - ; + final IncrementalIndex rtIndex = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(1000) + .buildOnheap(); + final QueryRunner runner = QueryRunnerTestHelper.makeQueryRunner( (QueryRunnerFactory) new DataSourceMetadataQueryRunnerFactory( new DataSourceQueryQueryToolChest(DefaultGenericQueryMetricsFactory.instance()), diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java index edda6b9db3cd..89937637fca8 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerFactoryTest.java @@ -46,7 +46,6 @@ import io.druid.segment.Segment; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.junit.Rule; import org.junit.Test; @@ -129,17 +128,11 @@ public Sequence run(QueryPlus queryPlus, Map responseContext) private Segment createSegment() throws Exception { - IncrementalIndex incrementalIndex = new OnheapIncrementalIndex( - 0, - Granularities.NONE, - new AggregatorFactory[]{ - new CountAggregatorFactory("count") - }, - true, - true, - true, - 5000 - ); + IncrementalIndex incrementalIndex = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setConcurrentEventAdd(true) + .setMaxRowCount(5000) + .buildOnheap(); StringInputRowParser parser = new StringInputRowParser( new CSVParseSpec( diff --git a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java index 181d52de82c7..0b86a749872f 100644 --- a/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/search/SearchQueryRunnerTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.druid.data.input.MapBasedInputRow; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; @@ -61,7 +60,6 @@ import io.druid.segment.column.ValueType; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; @@ -745,13 +743,15 @@ public void testSearchOnFloatColumnWithExFn() @Test public void testSearchWithNullValueInDimension() throws Exception { - IncrementalIndex index = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withQueryGranularity(Granularities.NONE) - .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()).build(), - true, - 10 - ); + IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) + .build() + ) + .setMaxRowCount(10) + .buildOnheap(); + index.add( new MapBasedInputRow( 1481871600000L, diff --git a/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java b/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java index 24196d7d1868..d7b92dd00ad9 100644 --- a/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java +++ b/processing/src/test/java/io/druid/query/select/MultiSegmentSelectQueryTest.java @@ -43,7 +43,6 @@ import io.druid.segment.TestIndex; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -188,7 +187,10 @@ private static IncrementalIndex newIndex(String minTimeStamp, int maxRowCount) .withQueryGranularity(Granularities.HOUR) .withMetrics(TestIndex.METRIC_AGGS) .build(); - return new OnheapIncrementalIndex(schema, true, maxRowCount); + return new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(maxRowCount) + .buildOnheap(); } @AfterClass diff --git a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java index c0825f63fe97..34362fc5e6a2 100644 --- a/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeboundary/TimeBoundaryQueryRunnerTest.java @@ -38,7 +38,6 @@ import io.druid.segment.TestIndex; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -119,7 +118,10 @@ private static IncrementalIndex newIndex(String minTimeStamp, int maxRowCount) .withQueryGranularity(Granularities.HOUR) .withMetrics(TestIndex.METRIC_AGGS) .build(); - return new OnheapIncrementalIndex(schema, true, maxRowCount); + return new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(maxRowCount) + .buildOnheap(); } private static String makeIdentifier(IncrementalIndex index, String version) diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java index 7d9fc452b7f6..aba6b33fa6f0 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeseriesQueryRunnerBonusTest.java @@ -37,7 +37,7 @@ import io.druid.segment.IncrementalIndexSegment; import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; @@ -69,9 +69,14 @@ public TimeseriesQueryRunnerBonusTest(boolean descending) @Test public void testOneRowAtATime() throws Exception { - final IncrementalIndex oneRowIndex = new OnheapIncrementalIndex( - new DateTime("2012-01-01T00:00:00Z").getMillis(), Granularities.NONE, new AggregatorFactory[]{}, 1000 - ); + final IncrementalIndex oneRowIndex = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime("2012-01-01T00:00:00Z").getMillis()) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); List> results; diff --git a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java index 58c3f3a53142..73671652b097 100644 --- a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java +++ b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java @@ -22,12 +22,10 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import io.druid.collections.bitmap.ConciseBitmapFactory; -import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.column.Column; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.FileUtils; import org.joda.time.Interval; import org.junit.Assert; @@ -49,12 +47,11 @@ public void testEmptyIndex() throws Exception } try { - IncrementalIndex emptyIndex = new OnheapIncrementalIndex( - 0, - Granularities.NONE, - new AggregatorFactory[0], - 1000 - ); + IncrementalIndex emptyIndex = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(/* empty */) + .setMaxRowCount(1000) + .buildOnheap(); + IncrementalIndexAdapter emptyIndexAdapter = new IncrementalIndexAdapter( new Interval("2012-08-01/P3D"), emptyIndex, diff --git a/processing/src/test/java/io/druid/segment/IndexBuilder.java b/processing/src/test/java/io/druid/segment/IndexBuilder.java index 35d2611ba3ef..3ccd811ceb9d 100644 --- a/processing/src/test/java/io/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/io/druid/segment/IndexBuilder.java @@ -30,7 +30,6 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OnheapIncrementalIndex; import java.io.File; import java.io.IOException; @@ -47,9 +46,9 @@ public class IndexBuilder private static final int ROWS_PER_INDEX_FOR_MERGING = 1; private static final int DEFAULT_MAX_ROWS = Integer.MAX_VALUE; - private IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder().withMetrics(new AggregatorFactory[]{ - new CountAggregatorFactory("count") - }).build(); + private IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMetrics(new CountAggregatorFactory("count")) + .build(); private IndexMerger indexMerger = TestHelper.getTestIndexMerger(); private File tmpDir; private IndexSpec indexSpec = new IndexSpec(); @@ -203,11 +202,11 @@ private static IncrementalIndex buildIncrementalIndexWithRows( ) { Preconditions.checkNotNull(schema, "schema"); - final IncrementalIndex incrementalIndex = new OnheapIncrementalIndex( - schema, - true, - maxRows - ); + final IncrementalIndex incrementalIndex = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(maxRows) + .buildOnheap(); + for (InputRow row : rows) { try { incrementalIndex.add(row); diff --git a/processing/src/test/java/io/druid/segment/IndexIOTest.java b/processing/src/test/java/io/druid/segment/IndexIOTest.java index d397b0a28144..c2b855341375 100644 --- a/processing/src/test/java/io/druid/segment/IndexIOTest.java +++ b/processing/src/test/java/io/druid/segment/IndexIOTest.java @@ -30,9 +30,7 @@ import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.DimensionsSpec; import io.druid.java.util.common.UOE; -import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.Aggregator; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressionFactory; @@ -41,7 +39,6 @@ import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -263,49 +260,39 @@ public IndexIOTest( this.exception = exception; } - final IncrementalIndex incrementalIndex1 = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis()) - .withQueryGranularity(Granularities.NONE) - .withMetrics( - new AggregatorFactory[]{ - new CountAggregatorFactory( - "count" - ) - } - ) - .withDimensionsSpec( - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")), - null, - null - ) - ) - .build(), - true, - 1000000 - ); - - final IncrementalIndex incrementalIndex2 = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis()) - .withQueryGranularity(Granularities.NONE) - .withMetrics( - new AggregatorFactory[]{ - new CountAggregatorFactory( - "count" - ) - } - ) - .withDimensionsSpec( - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")), - null, - null - ) - ) - .build(), - true, - 1000000 - ); + final IncrementalIndex incrementalIndex1 = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis()) + .withMetrics(new CountAggregatorFactory("count")) + .withDimensionsSpec( + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")), + null, + null + ) + ) + .build() + ) + .setMaxRowCount(1000000) + .buildOnheap(); + + final IncrementalIndex incrementalIndex2 = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DEFAULT_INTERVAL.getStart().getMillis()) + .withMetrics(new CountAggregatorFactory("count")) + .withDimensionsSpec( + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")), + null, + null + ) + ) + .build() + ) + .setMaxRowCount(1000000) + .buildOnheap(); IndexableAdapter adapter1; IndexableAdapter adapter2; diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 565b26ec4d14..aad7daf04563 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -58,7 +58,6 @@ import io.druid.segment.incremental.IncrementalIndexAdapter; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; @@ -293,12 +292,10 @@ public void testPersistMerge() throws Exception IncrementalIndex toPersist1 = IncrementalIndexTest.createIndex(null); IncrementalIndexTest.populateIndex(timestamp, toPersist1); - IncrementalIndex toPersist2 = new OnheapIncrementalIndex( - 0L, - Granularities.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - 1000 - ); + IncrementalIndex toPersist2 = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(1000) + .buildOnheap(); toPersist2.add( new MapBasedInputRow( @@ -379,18 +376,16 @@ public void testPersistMerge() throws Exception @Test public void testPersistEmptyColumn() throws Exception { - final IncrementalIndex toPersist1 = new OnheapIncrementalIndex( - 0L, - Granularities.NONE, - new AggregatorFactory[]{}, - 10 - ); - final IncrementalIndex toPersist2 = new OnheapIncrementalIndex( - 0L, - Granularities.NONE, - new AggregatorFactory[]{}, - 10 - ); + final IncrementalIndex toPersist1 = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(/* empty */) + .setMaxRowCount(10) + .buildOnheap(); + + final IncrementalIndex toPersist2 = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(/* empty */) + .setMaxRowCount(10) + .buildOnheap(); + final File tmpDir1 = temporaryFolder.newFolder(); final File tmpDir2 = temporaryFolder.newFolder(); final File tmpDir3 = temporaryFolder.newFolder(); @@ -924,15 +919,22 @@ public void testMergeWithDimensionsList() throws Exception null, null )) - .withMinTimestamp(0L) - .withQueryGranularity(Granularities.NONE) - .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .withMetrics(new CountAggregatorFactory("count")) .build(); - IncrementalIndex toPersist1 = new OnheapIncrementalIndex(schema, true, 1000); - IncrementalIndex toPersist2 = new OnheapIncrementalIndex(schema, true, 1000); - IncrementalIndex toPersist3 = new OnheapIncrementalIndex(schema, true, 1000); + IncrementalIndex toPersist1 = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(1000) + .buildOnheap(); + IncrementalIndex toPersist2 = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(1000) + .buildOnheap(); + IncrementalIndex toPersist3 = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(1000) + .buildOnheap(); addDimValuesToIndex(toPersist1, "dimA", Arrays.asList("1", "2")); addDimValuesToIndex(toPersist2, "dimA", Arrays.asList("1", "2")); @@ -1142,12 +1144,11 @@ public void testJointDimMerge() throws Exception // d8: 'has null' join 'no null' // d9: 'no null' join 'no null' - IncrementalIndex toPersistA = new OnheapIncrementalIndex( - 0L, - Granularities.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - 1000 - ); + IncrementalIndex toPersistA = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistA.add( new MapBasedInputRow( 1, @@ -1167,12 +1168,11 @@ public void testJointDimMerge() throws Exception ) ); - IncrementalIndex toPersistB = new OnheapIncrementalIndex( - 0L, - Granularities.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - 1000 - ); + IncrementalIndex toPersistB = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistB.add( new MapBasedInputRow( 3, @@ -1283,12 +1283,14 @@ public void testNoRollupMergeWithoutDuplicateRow() throws Exception // d9: 'no null' join 'no null' IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() - .withMinTimestamp(0L) - .withQueryGranularity(Granularities.NONE) - .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .withMetrics(new CountAggregatorFactory("count")) .withRollup(false) .build(); - IncrementalIndex toPersistA = new OnheapIncrementalIndex(indexSchema, true, 1000); + IncrementalIndex toPersistA = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistA.add( new MapBasedInputRow( 1, @@ -1308,7 +1310,11 @@ public void testNoRollupMergeWithoutDuplicateRow() throws Exception ) ); - IncrementalIndex toPersistB = new OnheapIncrementalIndex(indexSchema, true, 1000); + IncrementalIndex toPersistB = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistB.add( new MapBasedInputRow( 3, @@ -1418,12 +1424,14 @@ public void testNoRollupMergeWithDuplicateRow() throws Exception // 3. merge 2 indexes with duplicate rows IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() - .withMinTimestamp(0L) - .withQueryGranularity(Granularities.NONE) - .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) + .withMetrics(new CountAggregatorFactory("count")) .withRollup(false) .build(); - IncrementalIndex toPersistA = new OnheapIncrementalIndex(indexSchema, true, 1000); + IncrementalIndex toPersistA = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistA.add( new MapBasedInputRow( 1, @@ -1443,7 +1451,11 @@ public void testNoRollupMergeWithDuplicateRow() throws Exception ) ); - IncrementalIndex toPersistB = new OnheapIncrementalIndex(indexSchema, true, 1000); + IncrementalIndex toPersistB = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistB.add( new MapBasedInputRow( 1, @@ -1543,12 +1555,10 @@ public void testMergeWithSupersetOrdering() throws Exception IncrementalIndex toPersistBA = getSingleDimIndex("dimB", Arrays.asList("1", "2", "3")); addDimValuesToIndex(toPersistBA, "dimA", Arrays.asList("1", "2")); - IncrementalIndex toPersistBA2 = new OnheapIncrementalIndex( - 0L, - Granularities.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - 1000 - ); + IncrementalIndex toPersistBA2 = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(1000) + .buildOnheap(); toPersistBA2.add( new MapBasedInputRow( @@ -2117,14 +2127,14 @@ private IncrementalIndex getIndexWithNumericDims() throws Exception private IncrementalIndex getIndexWithDimsFromSchemata(List dims) { IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() - .withMinTimestamp(0L) - .withQueryGranularity(Granularities.NONE) .withDimensionsSpec(new DimensionsSpec(dims, null, null)) - .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) - .withRollup(true) + .withMetrics(new CountAggregatorFactory("count")) .build(); - return new OnheapIncrementalIndex(schema, true, 1000); + return new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(1000) + .buildOnheap(); } @@ -2174,12 +2184,10 @@ public void testPersistNullColumnSkipping() throws Exception private IncrementalIndex getIndexD3() throws Exception { - IncrementalIndex toPersist1 = new OnheapIncrementalIndex( - 0L, - Granularities.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - 1000 - ); + IncrementalIndex toPersist1 = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(1000) + .buildOnheap(); toPersist1.add( new MapBasedInputRow( @@ -2210,12 +2218,10 @@ private IncrementalIndex getIndexD3() throws Exception private IncrementalIndex getSingleDimIndex(String dimName, List values) throws Exception { - IncrementalIndex toPersist1 = new OnheapIncrementalIndex( - 0L, - Granularities.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("count")}, - 1000 - ); + IncrementalIndex toPersist1 = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("count")) + .setMaxRowCount(1000) + .buildOnheap(); addDimValuesToIndex(toPersist1, dimName, values); return toPersist1; @@ -2237,14 +2243,14 @@ private void addDimValuesToIndex(IncrementalIndex index, String dimName, List dims) { IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() - .withMinTimestamp(0L) - .withQueryGranularity(Granularities.NONE) .withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null)) - .withMetrics(new AggregatorFactory[]{new CountAggregatorFactory("count")}) - .withRollup(true) + .withMetrics(new CountAggregatorFactory("count")) .build(); - return new OnheapIncrementalIndex(schema, true, 1000); + return new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(1000) + .buildOnheap(); } private AggregatorFactory[] getCombiningAggregators(AggregatorFactory[] aggregators) diff --git a/processing/src/test/java/io/druid/segment/IndexMergerV9CompatibilityTest.java b/processing/src/test/java/io/druid/segment/IndexMergerV9CompatibilityTest.java index fc629594bc21..63742ef3d504 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerV9CompatibilityTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerV9CompatibilityTest.java @@ -28,14 +28,13 @@ import io.druid.common.utils.JodaUtils; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; -import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressionFactory; import io.druid.segment.data.ConciseBitmapSerdeFactory; import io.druid.segment.incremental.IncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.junit.After; @@ -166,12 +165,16 @@ public IndexMergerV9CompatibilityTest( @Before public void setUp() throws IOException { - toPersist = new OnheapIncrementalIndex( - JodaUtils.MIN_INSTANT, - Granularities.NONE, - DEFAULT_AGG_FACTORIES, - 1000000 - ); + toPersist = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(JodaUtils.MIN_INSTANT) + .withMetrics(DEFAULT_AGG_FACTORIES) + .build() + ) + .setMaxRowCount(1000000) + .buildOnheap(); + toPersist.getMetadata().put("key", "value"); for (InputRow event : events) { toPersist.add(event); diff --git a/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java b/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java index 98d25fcfef9c..bc0b2ecba6f4 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerV9WithSpatialIndexTest.java @@ -45,7 +45,6 @@ import io.druid.query.timeseries.TimeseriesResultValue; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -102,30 +101,33 @@ public static Collection constructorFeeder() throws IOException private static IncrementalIndex makeIncrementalIndex() throws IOException { - IncrementalIndex theIndex = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Arrays.asList("lat", "long") - ), - new SpatialDimensionSchema( - "spatialIsRad", - Arrays.asList("lat2", "long2") - ) - - ) - ) - ).build(), - false, - NUM_POINTS - ); + IncrementalIndex theIndex = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ), + new SpatialDimensionSchema( + "spatialIsRad", + Arrays.asList("lat2", "long2") + ) + + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(NUM_POINTS) + .buildOnheap(); theIndex.add( new MapBasedInputRow( @@ -270,79 +272,89 @@ private static QueryableIndex makeQueryableIndex(IndexSpec indexSpec) throws IOE private static QueryableIndex makeMergedQueryableIndex(IndexSpec indexSpec) { try { - IncrementalIndex first = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Arrays.asList("lat", "long") - ), - new SpatialDimensionSchema( - "spatialIsRad", - Arrays.asList("lat2", "long2") - ) - - ) - ) - ).build(), - false, - 1000 - ); - IncrementalIndex second = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Arrays.asList("lat", "long") - ), - new SpatialDimensionSchema( - "spatialIsRad", - Arrays.asList("lat2", "long2") - ) - - ) - ) - ).build(), - false, - 1000 - ); - IncrementalIndex third = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Arrays.asList("lat", "long") - ), - new SpatialDimensionSchema( - "spatialIsRad", - Arrays.asList("lat2", "long2") - ) - - ) - ) - ).build(), - false, - NUM_POINTS - ); - + IncrementalIndex first = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ), + new SpatialDimensionSchema( + "spatialIsRad", + Arrays.asList("lat2", "long2") + ) + + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(1000) + .buildOnheap(); + + IncrementalIndex second = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ), + new SpatialDimensionSchema( + "spatialIsRad", + Arrays.asList("lat2", "long2") + ) + + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(1000) + .buildOnheap(); + + IncrementalIndex third = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ), + new SpatialDimensionSchema( + "spatialIsRad", + Arrays.asList("lat2", "long2") + ) + + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(NUM_POINTS) + .buildOnheap(); first.add( new MapBasedInputRow( diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndexTest.java b/processing/src/test/java/io/druid/segment/SchemalessIndexTest.java index 724a609b6501..b0a57b123002 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndexTest.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndexTest.java @@ -39,8 +39,8 @@ import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde; import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.VersionedIntervalTimeline; @@ -141,7 +141,16 @@ public static QueryableIndex getIncrementalIndex(int index1, int index2) final long timestamp = new DateTime(event.get(TIMESTAMP)).getMillis(); if (theIndex == null) { - theIndex = new OnheapIncrementalIndex(timestamp, Granularities.MINUTE, METRIC_AGGS, 1000); + theIndex = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(timestamp) + .withQueryGranularity(Granularities.MINUTE) + .withMetrics(METRIC_AGGS) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); } final List dims = Lists.newArrayList(); @@ -350,9 +359,16 @@ private static void makeRowPersistedIndexes() } } - final IncrementalIndex rowIndex = new OnheapIncrementalIndex( - timestamp, Granularities.MINUTE, METRIC_AGGS, 1000 - ); + final IncrementalIndex rowIndex = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(timestamp) + .withQueryGranularity(Granularities.MINUTE) + .withMetrics(METRIC_AGGS) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); rowIndex.add( new MapBasedInputRow(timestamp, dims, event) @@ -380,9 +396,16 @@ private static IncrementalIndex makeIncrementalIndex(final String resourceFilena String filename = resource.getFile(); log.info("Realtime loading index file[%s]", filename); - final IncrementalIndex retVal = new OnheapIncrementalIndex( - new DateTime("2011-01-12T00:00:00.000Z").getMillis(), Granularities.MINUTE, aggs, 1000 - ); + final IncrementalIndex retVal = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) + .withQueryGranularity(Granularities.MINUTE) + .withMetrics(aggs) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); try { final List events = jsonMapper.readValue(new File(filename), List.class); diff --git a/processing/src/test/java/io/druid/segment/StringDimensionHandlerTest.java b/processing/src/test/java/io/druid/segment/StringDimensionHandlerTest.java index e16c3586de68..226df78ec1b2 100644 --- a/processing/src/test/java/io/druid/segment/StringDimensionHandlerTest.java +++ b/processing/src/test/java/io/druid/segment/StringDimensionHandlerTest.java @@ -23,8 +23,6 @@ import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.DimensionsSpec; import io.druid.java.util.common.Pair; -import io.druid.java.util.common.granularity.Granularities; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.data.CompressedObjectStrategy; import io.druid.segment.data.CompressionFactory; @@ -32,7 +30,7 @@ import io.druid.segment.data.Indexed; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexAdapter; -import io.druid.segment.incremental.OnheapIncrementalIndex; +import io.druid.segment.incremental.IncrementalIndexSchema; import org.joda.time.Interval; import org.junit.Rule; import org.junit.Test; @@ -63,31 +61,27 @@ private static Pair getAdapter Map event1, Map event2 ) throws Exception { - IncrementalIndex incrementalIndex1 = new OnheapIncrementalIndex( - TEST_INTERVAL.getStartMillis(), - Granularities.NONE, - true, - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null), - new AggregatorFactory[]{ - new CountAggregatorFactory( - "count" - ) - }, - 1000 - ); - - IncrementalIndex incrementalIndex2 = new OnheapIncrementalIndex( - TEST_INTERVAL.getStartMillis(), - Granularities.NONE, - true, - new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null), - new AggregatorFactory[]{ - new CountAggregatorFactory( - "count" - ) - }, - 1000 - ); + IncrementalIndex incrementalIndex1 = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(TEST_INTERVAL.getStartMillis()) + .withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null)) + .withMetrics(new CountAggregatorFactory("count")) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); + + IncrementalIndex incrementalIndex2 = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(TEST_INTERVAL.getStartMillis()) + .withDimensionsSpec(new DimensionsSpec(DimensionsSpec.getDefaultSchemas(dims), null, null)) + .withMetrics(new CountAggregatorFactory("count")) + .build() + ) + .setMaxRowCount(1000) + .buildOnheap(); incrementalIndex1.add(new MapBasedInputRow(TEST_INTERVAL.getStartMillis(), dims, event1)); incrementalIndex2.add(new MapBasedInputRow(TEST_INTERVAL.getStartMillis() + 3, dims, event2)); diff --git a/processing/src/test/java/io/druid/segment/TestIndex.java b/processing/src/test/java/io/druid/segment/TestIndex.java index d20da2a05b89..5f67cdae8100 100644 --- a/processing/src/test/java/io/druid/segment/TestIndex.java +++ b/processing/src/test/java/io/druid/segment/TestIndex.java @@ -33,7 +33,6 @@ import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.hll.HyperLogLogHash; -import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.logger.Logger; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleMaxAggregatorFactory; @@ -44,7 +43,6 @@ import io.druid.query.expression.TestExprMacroTable; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.serde.ComplexMetrics; import io.druid.segment.virtual.ExpressionVirtualColumn; import org.joda.time.DateTime; @@ -261,13 +259,15 @@ public static IncrementalIndex makeRealtimeIndex(final CharSource source, boolea final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() .withMinTimestamp(new DateTime("2011-01-12T00:00:00.000Z").getMillis()) .withTimestampSpec(new TimestampSpec("ds", "auto", null)) - .withQueryGranularity(Granularities.NONE) .withDimensionsSpec(DIMENSIONS_SPEC) .withVirtualColumns(VIRTUAL_COLUMNS) .withMetrics(METRIC_AGGS) .withRollup(rollup) .build(); - final IncrementalIndex retVal = new OnheapIncrementalIndex(schema, true, 10000); + final IncrementalIndex retVal = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(10000) + .buildOnheap(); try { return loadIncrementalIndex(retVal, source); diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 93c4ce0b1360..866f34f0fc6a 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -64,8 +64,6 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OffheapIncrementalIndex; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Rule; @@ -133,20 +131,22 @@ public IncrementalIndex createIndex(AggregatorFactory[] factories) @Override public IncrementalIndex createIndex(AggregatorFactory[] factories) { - return new OffheapIncrementalIndex( - 0L, Granularities.NONE, factories, 1000000, - new StupidPool( - "OffheapIncrementalIndex-bufferPool", - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } - ) - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(factories) + .setMaxRowCount(1000000) + .buildOffheap( + new StupidPool( + "OffheapIncrementalIndex-bufferPool", + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) + ); } } }, @@ -166,20 +166,27 @@ public IncrementalIndex createIndex(AggregatorFactory[] factories) @Override public IncrementalIndex createIndex(AggregatorFactory[] factories) { - return new OffheapIncrementalIndex( - 0L, Granularities.NONE, false, factories, 1000000, - new StupidPool( - "OffheapIncrementalIndex-bufferPool", - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(factories) + .withRollup(false) + .build() ) - ); + .setMaxRowCount(1000000) + .buildOffheap( + new StupidPool( + "OffheapIncrementalIndex-bufferPool", + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) + ); } } } @@ -200,15 +207,22 @@ public static AggregatorFactory[] getDefaultCombiningAggregatorFactories() public static IncrementalIndex createIndex( AggregatorFactory[] aggregatorFactories, - DimensionsSpec dimensionsSpec) + DimensionsSpec dimensionsSpec + ) { if (null == aggregatorFactories) { aggregatorFactories = defaultAggregatorFactories; } - return new OnheapIncrementalIndex( - 0L, Granularities.NONE, true, dimensionsSpec, aggregatorFactories, 1000000 - ); + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withDimensionsSpec(dimensionsSpec) + .withMetrics(aggregatorFactories) + .build() + ) + .setMaxRowCount(1000000) + .buildOnheap(); } public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactories) @@ -217,9 +231,10 @@ public static IncrementalIndex createIndex(AggregatorFactory[] aggregatorFactori aggregatorFactories = defaultAggregatorFactories; } - return new OnheapIncrementalIndex( - 0L, Granularities.NONE, true, null, aggregatorFactories, 1000000 - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(aggregatorFactories) + .setMaxRowCount(1000000) + .buildOnheap(); } public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregatorFactories) @@ -228,9 +243,10 @@ public static IncrementalIndex createNoRollupIndex(AggregatorFactory[] aggregato aggregatorFactories = defaultAggregatorFactories; } - return new OnheapIncrementalIndex( - 0L, Granularities.NONE, false, null, aggregatorFactories, 1000000 - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(aggregatorFactories) + .setMaxRowCount(1000000) + .buildOnheap(); } public static void populateIndex(long timestamp, IncrementalIndex index) throws IndexSizeExceededException @@ -476,12 +492,12 @@ public void testSingleThreadedIndexingAndQuery() throws Exception for (int i = 0; i < dimensionCount; ++i) { Assert.assertEquals( String.format("Failed long sum on dimension %d", i), - 2*rows, + 2 * rows, result.getValue().getLongMetric(String.format("sumResult%s", i)).intValue() ); Assert.assertEquals( String.format("Failed double sum on dimension %d", i), - 2*rows, + 2 * rows, result.getValue().getDoubleMetric(String.format("doubleSumResult%s", i)).intValue() ); } @@ -757,26 +773,21 @@ public void run() @Test public void testgetDimensions() { - final IncrementalIndex incrementalIndex = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withQueryGranularity(Granularities.NONE) - .withMetrics( - new AggregatorFactory[]{ - new CountAggregatorFactory( - "count" - ) - } - ) - .withDimensionsSpec( - new DimensionsSpec( - DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")), - null, - null - ) - ) - .build(), - true, - 1000000 - ); + final IncrementalIndex incrementalIndex = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(new CountAggregatorFactory("count")) + .withDimensionsSpec( + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas(Arrays.asList("dim0", "dim1")), + null, + null + ) + ) + .build() + ) + .setMaxRowCount(1000000) + .buildOnheap(); closer.closeLater(incrementalIndex); Assert.assertEquals(Arrays.asList("dim0", "dim1"), incrementalIndex.getDimensionNames()); @@ -785,11 +796,10 @@ public void testgetDimensions() @Test public void testDynamicSchemaRollup() throws IndexSizeExceededException { - IncrementalIndex index = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withQueryGranularity(Granularities.NONE).build(), - true, - 10 - ); + IncrementalIndex index = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(/* empty */) + .setMaxRowCount(10) + .buildOnheap(); closer.closeLater(index); index.add( new MapBasedInputRow( diff --git a/processing/src/test/java/io/druid/segment/filter/FloatFilteringTest.java b/processing/src/test/java/io/druid/segment/filter/FloatFilteringTest.java index 1f6f1ec64a1f..7f500bb0996a 100644 --- a/processing/src/test/java/io/druid/segment/filter/FloatFilteringTest.java +++ b/processing/src/test/java/io/druid/segment/filter/FloatFilteringTest.java @@ -34,7 +34,6 @@ import io.druid.data.input.impl.TimestampSpec; import io.druid.java.util.common.Pair; import io.druid.js.JavaScriptConfig; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.extraction.MapLookupExtractor; import io.druid.query.filter.BoundDimFilter; @@ -108,11 +107,8 @@ public FloatFilteringTest( ROWS, indexBuilder.schema( new IncrementalIndexSchema.Builder() - .withMetrics( - new AggregatorFactory[]{ - new DoubleSumAggregatorFactory(FLOAT_COLUMN, FLOAT_COLUMN) - } - ).build() + .withMetrics(new DoubleSumAggregatorFactory(FLOAT_COLUMN, FLOAT_COLUMN)) + .build() ), finisher, cnf, diff --git a/processing/src/test/java/io/druid/segment/filter/InvalidFilteringTest.java b/processing/src/test/java/io/druid/segment/filter/InvalidFilteringTest.java index 57e73730486e..5c8f52bee0a5 100644 --- a/processing/src/test/java/io/druid/segment/filter/InvalidFilteringTest.java +++ b/processing/src/test/java/io/druid/segment/filter/InvalidFilteringTest.java @@ -29,7 +29,6 @@ import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; import io.druid.java.util.common.Pair; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleMaxAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -95,11 +94,12 @@ public InvalidFilteringTest( private static IndexBuilder overrideIndexBuilderSchema(IndexBuilder indexBuilder) { - IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder().withMetrics(new AggregatorFactory[]{ - new CountAggregatorFactory("count"), - new HyperUniquesAggregatorFactory("hyperion", "dim1"), - new DoubleMaxAggregatorFactory("dmax", "dim0") - }).build(); + IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("count"), + new HyperUniquesAggregatorFactory("hyperion", "dim1"), + new DoubleMaxAggregatorFactory("dmax", "dim0") + ).build(); return indexBuilder.schema(schema); } diff --git a/processing/src/test/java/io/druid/segment/filter/LongFilteringTest.java b/processing/src/test/java/io/druid/segment/filter/LongFilteringTest.java index 331880dcd79d..b20c7f7575e4 100644 --- a/processing/src/test/java/io/druid/segment/filter/LongFilteringTest.java +++ b/processing/src/test/java/io/druid/segment/filter/LongFilteringTest.java @@ -34,7 +34,6 @@ import io.druid.data.input.impl.TimestampSpec; import io.druid.java.util.common.Pair; import io.druid.js.JavaScriptConfig; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.extraction.MapLookupExtractor; import io.druid.query.filter.BoundDimFilter; @@ -112,11 +111,8 @@ public LongFilteringTest( ROWS, indexBuilder.schema( new IncrementalIndexSchema.Builder() - .withMetrics( - new AggregatorFactory[]{ - new LongSumAggregatorFactory(LONG_COLUMN, LONG_COLUMN) - } - ).build() + .withMetrics(new LongSumAggregatorFactory(LONG_COLUMN, LONG_COLUMN)) + .build() ), finisher, cnf, diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java index 8ac4fad7178f..86ab7c0f6013 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterBonusTest.java @@ -54,7 +54,6 @@ import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Test; @@ -116,25 +115,29 @@ public static Collection constructorFeeder() throws IOException private static IncrementalIndex makeIncrementalIndex() throws IOException { - IncrementalIndex theIndex = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Lists.newArrayList() - ) - ) - ) - ).build(), - false, - NUM_POINTS - ); + IncrementalIndex theIndex = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Lists.newArrayList() + ) + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(NUM_POINTS) + .buildOnheap(); + theIndex.add( new MapBasedInputRow( new DateTime("2013-01-01").getMillis(), @@ -255,66 +258,76 @@ private static QueryableIndex makeQueryableIndex(IndexSpec indexSpec) throws IOE private static QueryableIndex makeMergedQueryableIndex(final IndexSpec indexSpec) { try { - IncrementalIndex first = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Lists.newArrayList() - ) - ) - ) - - ).build(), - false, - NUM_POINTS - ); - IncrementalIndex second = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Lists.newArrayList() - ) - ) - ) - ).build(), - false, - NUM_POINTS - ); - IncrementalIndex third = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Lists.newArrayList() - ) - ) - ) - - ).build(), - false, - NUM_POINTS - ); - + IncrementalIndex first = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Lists.newArrayList() + ) + ) + ) + + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(NUM_POINTS) + .buildOnheap(); + + IncrementalIndex second = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Lists.newArrayList() + ) + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(NUM_POINTS) + .buildOnheap(); + + IncrementalIndex third = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Lists.newArrayList() + ) + ) + ) + + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(NUM_POINTS) + .buildOnheap(); first.add( new MapBasedInputRow( diff --git a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java index f2fe2cd886e1..f28321e732b8 100644 --- a/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/io/druid/segment/filter/SpatialFilterTest.java @@ -53,7 +53,6 @@ import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Test; @@ -109,30 +108,33 @@ public static Collection constructorFeeder() throws IOException private static IncrementalIndex makeIncrementalIndex() throws IOException { - IncrementalIndex theIndex = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Arrays.asList("lat", "long") - ), - new SpatialDimensionSchema( - "spatialIsRad", - Arrays.asList("lat2", "long2") - ) - - ) - ) - ).build(), - false, - NUM_POINTS - ); + IncrementalIndex theIndex = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ), + new SpatialDimensionSchema( + "spatialIsRad", + Arrays.asList("lat2", "long2") + ) + + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(NUM_POINTS) + .buildOnheap(); theIndex.add( new MapBasedInputRow( @@ -273,79 +275,86 @@ private static QueryableIndex makeQueryableIndex(IndexSpec indexSpec) throws IOE private static QueryableIndex makeMergedQueryableIndex(IndexSpec indexSpec) { try { - IncrementalIndex first = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Arrays.asList("lat", "long") - ), - new SpatialDimensionSchema( - "spatialIsRad", - Arrays.asList("lat2", "long2") - ) - - ) - ) - ).build(), - false, - 1000 - ); - IncrementalIndex second = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Arrays.asList("lat", "long") - ), - new SpatialDimensionSchema( - "spatialIsRad", - Arrays.asList("lat2", "long2") - ) - - ) - ) - ).build(), - false, - 1000 - ); - IncrementalIndex third = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) - .withQueryGranularity(Granularities.DAY) - .withMetrics(METRIC_AGGS) - .withDimensionsSpec( - new DimensionsSpec( - null, - null, - Arrays.asList( - new SpatialDimensionSchema( - "dim.geo", - Arrays.asList("lat", "long") - ), - new SpatialDimensionSchema( - "spatialIsRad", - Arrays.asList("lat2", "long2") - ) - - ) - ) - ).build(), - false, - NUM_POINTS - ); - + IncrementalIndex first = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ), + new SpatialDimensionSchema( + "spatialIsRad", + Arrays.asList("lat2", "long2") + ) + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(1000) + .buildOnheap(); + + IncrementalIndex second = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ), + new SpatialDimensionSchema( + "spatialIsRad", + Arrays.asList("lat2", "long2") + ) + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(1000) + .buildOnheap(); + + IncrementalIndex third = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(Granularities.DAY) + .withMetrics(METRIC_AGGS) + .withDimensionsSpec( + new DimensionsSpec( + null, + null, + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ), + new SpatialDimensionSchema( + "spatialIsRad", + Arrays.asList("lat2", "long2") + ) + ) + ) + ).build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(NUM_POINTS) + .buildOnheap(); first.add( new MapBasedInputRow( diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java index b78914cfda1a..a5fcff133316 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexMultiValueSpecTest.java @@ -77,7 +77,10 @@ public Object get(Object key) return null; } }; - IncrementalIndex index = new OnheapIncrementalIndex(schema, true, 10000); + IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setMaxRowCount(10000) + .buildOnheap(); index.add( new MapBasedInputRow( 0, Arrays.asList( diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 902ec89be498..97e14777d31a 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -97,9 +97,10 @@ public static Collection constructorFeeder() throws IOException @Override public IncrementalIndex createIndex() { - return new OnheapIncrementalIndex( - 0, Granularities.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 - ); + return new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) + .setMaxRowCount(1000) + .buildOnheap(); } } } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 02e7a180e03c..afac0e2a6d3a 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -87,11 +87,9 @@ public static Collection constructorFeeder() throws IOException ) }; final IncrementalIndexSchema schema = new IncrementalIndexSchema.Builder() - .withMinTimestamp(0) .withQueryGranularity(Granularities.MINUTE) .withDimensionsSpec(dimensions) .withMetrics(metrics) - .withRollup(true) .build(); final List constructors = Lists.newArrayList(); @@ -103,7 +101,12 @@ public static Collection constructorFeeder() throws IOException @Override public IncrementalIndex createIndex() { - return new OnheapIncrementalIndex(schema, false, true, sortFacts, 1000); + return new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setDeserializeComplexMetrics(false) + .setSortFacts(sortFacts) + .setMaxRowCount(1000) + .buildOnheap(); } } } @@ -115,24 +118,23 @@ public IncrementalIndex createIndex() @Override public IncrementalIndex createIndex() { - return new OffheapIncrementalIndex( - schema, - true, - true, - sortFacts, - 1000000, - new StupidPool( - "OffheapIncrementalIndex-bufferPool", - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } - ) - ); + return new IncrementalIndex.Builder() + .setIndexSchema(schema) + .setSortFacts(sortFacts) + .setMaxRowCount(1000000) + .buildOffheap( + new StupidPool( + "OffheapIncrementalIndex-bufferPool", + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) + ); } } } diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 53d26b4526da..8449c3fd03e3 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -110,6 +110,25 @@ private static final class MapIncrementalIndex extends OnheapIncrementalIndex private final AtomicInteger indexIncrement = new AtomicInteger(0); ConcurrentHashMap indexedMap = new ConcurrentHashMap(); + public MapIncrementalIndex( + IncrementalIndexSchema incrementalIndexSchema, + boolean deserializeComplexMetrics, + boolean reportParseExceptions, + boolean concurrentEventAdd, + boolean sortFacts, + int maxRowCount + ) + { + super( + incrementalIndexSchema, + deserializeComplexMetrics, + reportParseExceptions, + concurrentEventAdd, + sortFacts, + maxRowCount + ); + } + public MapIncrementalIndex( long minTimestamp, Granularity gran, @@ -117,7 +136,18 @@ public MapIncrementalIndex( int maxRowCount ) { - super(minTimestamp, gran, metrics, maxRowCount); + super( + new IncrementalIndexSchema.Builder() + .withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + true, + true, + false, + true, + maxRowCount + ); } @Override @@ -254,12 +284,21 @@ public void testConcurrentAddRead() final int concurrentThreads = 3; final int elementsPerThread = 1 << 15; - final OnheapIncrementalIndex incrementalIndex = this.incrementalIndex.getConstructor( - Long.TYPE, - Granularity.class, - AggregatorFactory[].class, + final IncrementalIndex incrementalIndex = this.incrementalIndex.getConstructor( + IncrementalIndexSchema.class, + Boolean.TYPE, + Boolean.TYPE, + Boolean.TYPE, + Boolean.TYPE, Integer.TYPE - ).newInstance(0, Granularities.NONE, factories, elementsPerThread * taskCount); + ).newInstance( + new IncrementalIndexSchema.Builder().withMetrics(factories).build(), + true, + true, + false, + true, + elementsPerThread * taskCount + ); final ArrayList queryAggregatorFactories = new ArrayList<>(dimensionCount + 1); queryAggregatorFactories.add(new CountAggregatorFactory("rows")); for (int i = 0; i < dimensionCount; ++i) { diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexTest.java index b788de84a1a5..4021f1e3025c 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexTest.java @@ -24,7 +24,6 @@ import io.druid.data.input.MapBasedInputRow; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.Aggregator; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongMaxAggregator; import io.druid.query.aggregation.LongMaxAggregatorFactory; import org.easymock.EasyMock; @@ -42,12 +41,15 @@ public class OnheapIncrementalIndexTest @Test public void testMultithreadAddFacts() throws Exception { - final OnheapIncrementalIndex index = new OnheapIncrementalIndex( - 0, - Granularities.MINUTE, - new AggregatorFactory[]{new LongMaxAggregatorFactory("max", "max")}, - MAX_ROWS - ); + final IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.MINUTE) + .withMetrics(new LongMaxAggregatorFactory("max", "max")) + .build() + ) + .setMaxRowCount(MAX_ROWS) + .buildOnheap(); final Random random = new Random(); final int addThreadCount = 2; @@ -108,12 +110,15 @@ public void testOnHeapIncrementalIndexClose() throws Exception mockedAggregator.close(); EasyMock.expectLastCall().times(1); - final OnheapIncrementalIndex index = new OnheapIncrementalIndex( - 0, - Granularities.MINUTE, - new AggregatorFactory[]{new LongMaxAggregatorFactory("max", "max")}, - MAX_ROWS - ); + final OnheapIncrementalIndex index = (OnheapIncrementalIndex) new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withQueryGranularity(Granularities.MINUTE) + .withMetrics(new LongMaxAggregatorFactory("max", "max")) + .build() + ) + .setMaxRowCount(MAX_ROWS) + .buildOnheap(); index.add(new MapBasedInputRow( 0, diff --git a/processing/src/test/java/io/druid/segment/incremental/TimeAndDimsCompTest.java b/processing/src/test/java/io/druid/segment/incremental/TimeAndDimsCompTest.java index 474986cd937c..8677dc186281 100644 --- a/processing/src/test/java/io/druid/segment/incremental/TimeAndDimsCompTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/TimeAndDimsCompTest.java @@ -22,8 +22,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.druid.data.input.MapBasedInputRow; -import io.druid.java.util.common.granularity.Granularities; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import org.junit.Assert; import org.junit.Test; @@ -41,9 +39,10 @@ public class TimeAndDimsCompTest @Test public void testBasic() throws IndexSizeExceededException { - IncrementalIndex index = new OnheapIncrementalIndex( - 0, Granularities.NONE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 - ); + IncrementalIndex index = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) + .setMaxRowCount(1000) + .buildOnheap(); long time = System.currentTimeMillis(); TimeAndDims td1 = index.toTimeAndDims(toMapRow(time, "billy", "A", "joe", "B")); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index ddebc567c282..07282e27686d 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -36,7 +36,6 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; -import io.druid.segment.incremental.OnheapIncrementalIndex; import io.druid.segment.indexing.DataSchema; import io.druid.segment.realtime.FireHydrant; import io.druid.timeline.DataSegment; @@ -254,7 +253,11 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) .withMetrics(schema.getAggregators()) .withRollup(schema.getGranularitySpec().isRollup()) .build(); - final IncrementalIndex newIndex = new OnheapIncrementalIndex(indexSchema, reportParseExceptions, maxRowsInMemory); + final IncrementalIndex newIndex = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setReportParseExceptions(reportParseExceptions) + .setMaxRowCount(maxRowsInMemory) + .buildOnheap(); final FireHydrant old; synchronized (hydrantLock) { diff --git a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java index 01a90fa8b8bc..623c9035fd4c 100644 --- a/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java +++ b/server/src/test/java/io/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java @@ -31,7 +31,6 @@ import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.hll.HyperLogLogCollector; -import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -46,7 +45,6 @@ import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IncrementalIndexStorageAdapter; -import io.druid.segment.incremental.OnheapIncrementalIndex; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Assert; @@ -105,15 +103,15 @@ public void testReadFromIndexAndWriteAnotherIndex() throws Exception try ( final QueryableIndex qi = indexIO.loadIndex(segmentDir); - final IncrementalIndex index = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withDimensionsSpec(DIMENSIONS_SPEC_REINDEX) - .withQueryGranularity(Granularities.NONE) - .withMetrics(AGGREGATORS_REINDEX.toArray(new AggregatorFactory[]{})) - .build(), - true, - 5000 - ) + final IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withDimensionsSpec(DIMENSIONS_SPEC_REINDEX) + .withMetrics(AGGREGATORS_REINDEX.toArray(new AggregatorFactory[]{})) + .build() + ) + .setMaxRowCount(5000) + .buildOnheap(); ) { final StorageAdapter sa = new QueryableIndexStorageAdapter(qi); final WindowedStorageAdapter wsa = new WindowedStorageAdapter(sa, sa.getInterval()); @@ -193,15 +191,15 @@ private void createTestIndex(File segmentDir) throws Exception ); try ( - final IncrementalIndex index = new OnheapIncrementalIndex( - new IncrementalIndexSchema.Builder() - .withDimensionsSpec(parser.getParseSpec().getDimensionsSpec()) - .withQueryGranularity(Granularities.NONE) - .withMetrics(AGGREGATORS.toArray(new AggregatorFactory[]{})) - .build(), - true, - 5000 - ) + final IncrementalIndex index = new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withDimensionsSpec(parser.getParseSpec().getDimensionsSpec()) + .withMetrics(AGGREGATORS.toArray(new AggregatorFactory[]{})) + .build() + ) + .setMaxRowCount(5000) + .buildOnheap(); ) { for (String line : rows) { index.add(parser.parse(line)); diff --git a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java index 982c5badba62..990f674c1f5e 100644 --- a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.druid.data.input.InputRow; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -93,11 +92,9 @@ public void setUp() throws Exception .schema( new IncrementalIndexSchema.Builder() .withMetrics( - new AggregatorFactory[]{ - new CountAggregatorFactory("cnt"), - new DoubleSumAggregatorFactory("m1", "m1"), - new HyperUniquesAggregatorFactory("unique_dim1", "dim1") - } + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") ) .withRollup(false) .build() @@ -110,11 +107,7 @@ public void setUp() throws Exception .indexMerger(TestHelper.getTestIndexMergerV9()) .schema( new IncrementalIndexSchema.Builder() - .withMetrics( - new AggregatorFactory[]{ - new LongSumAggregatorFactory("m1", "m1") - } - ) + .withMetrics(new LongSumAggregatorFactory("m1", "m1")) .withRollup(false) .build() ) diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index dbf18b746587..be7514a8f113 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -46,7 +46,6 @@ import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerTestHelper; -import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -281,11 +280,9 @@ public int getNumMergeBuffers() private static final IncrementalIndexSchema INDEX_SCHEMA = new IncrementalIndexSchema.Builder() .withMetrics( - new AggregatorFactory[]{ - new CountAggregatorFactory("cnt"), - new DoubleSumAggregatorFactory("m1", "m1"), - new HyperUniquesAggregatorFactory("unique_dim1", "dim1") - } + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") ) .withRollup(false) .build();