diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md
index 6893cab217ae..3855cf60e6b8 100644
--- a/.github/pull_request_template.md
+++ b/.github/pull_request_template.md
@@ -44,7 +44,7 @@ This PR has:
- [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
- [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
-- [ ] added unit tests or modified existing tests to cover new code paths.
+- [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
- [ ] added integration tests.
- [ ] been tested in a test Druid cluster.
diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index cd32fb67f732..2da1cb7f2e13 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -29,6 +29,7 @@
+
@@ -387,8 +388,8 @@
-
-
+
+
diff --git a/.travis.yml b/.travis.yml
index 6dd1adf6629e..9b3c755107c6 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -166,7 +166,8 @@ jobs:
project_files="$(echo "${all_files}" | grep "${regex}" || [[ $? == 1 ]])";
fi
- for f in ${project_files}; do echo $f; done # for debugging
- # Check diff code coverage for the maven projects being tested (retry install in case of network error)
+ # Check diff code coverage for the maven projects being tested (retry install in case of network error).
+ # Currently, the function coverage check is not reliable, so it is disabled.
- >
if [ -n "${project_files}" ]; then
travis_retry npm install @connectis/diff-test-coverage@1.5.3
@@ -174,9 +175,13 @@ jobs:
| node_modules/.bin/diff-test-coverage
--coverage "**/target/site/jacoco/jacoco.xml"
--type jacoco
- --line-coverage 65
- --branch-coverage 65
- --function-coverage 80
+ --line-coverage 50
+ --branch-coverage 50
+ --function-coverage 0
+ --log-template "coverage-lines-complete"
+ --log-template "coverage-files-complete"
+ --log-template "totals-complete"
+ --log-template "errors"
--
|| { printf "\nDiff code coverage check failed. To view coverage report, run 'mvn clean test jacoco:report' and open 'target/site/jacoco/index.html'\n" && false; }
fi
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/BoundFilterBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/BoundFilterBenchmark.java
index 6c0b6d940ed4..f9a920a3d39f 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/BoundFilterBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/BoundFilterBenchmark.java
@@ -33,6 +33,7 @@
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.column.BitmapIndex;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.data.GenericIndexed;
@@ -195,7 +196,7 @@ public CloseableIndexed getDimensionValues(String dimension)
}
@Override
- public boolean hasMultipleValues(final String dimension)
+ public ColumnCapabilities.Capable hasMultipleValues(final String dimension)
{
throw new UnsupportedOperationException();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/DimensionPredicateFilterBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/DimensionPredicateFilterBenchmark.java
index 8dd177f1e831..a845bb1a2233 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/DimensionPredicateFilterBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/DimensionPredicateFilterBenchmark.java
@@ -35,6 +35,7 @@
import org.apache.druid.query.filter.DruidLongPredicate;
import org.apache.druid.query.filter.DruidPredicateFactory;
import org.apache.druid.segment.column.BitmapIndex;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.data.GenericIndexed;
@@ -166,7 +167,7 @@ public CloseableIndexed getDimensionValues(String dimension)
}
@Override
- public boolean hasMultipleValues(final String dimension)
+ public ColumnCapabilities.Capable hasMultipleValues(final String dimension)
{
throw new UnsupportedOperationException();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java
index 7d7c8c45f81a..83a13e9a0a67 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionAggregationBenchmark.java
@@ -21,8 +21,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import org.apache.druid.benchmark.datagen.BenchmarkColumnSchema;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
@@ -42,6 +40,8 @@
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.generator.GeneratorColumnSchema;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.openjdk.jmh.annotations.Benchmark;
@@ -90,10 +90,10 @@ public void setup()
{
this.closer = Closer.create();
- final BenchmarkSchemaInfo schemaInfo = new BenchmarkSchemaInfo(
+ final GeneratorSchemaInfo schemaInfo = new GeneratorSchemaInfo(
ImmutableList.of(
- BenchmarkColumnSchema.makeNormal("x", ValueType.FLOAT, false, 1, 0d, 0d, 10000d, false),
- BenchmarkColumnSchema.makeNormal("y", ValueType.FLOAT, false, 1, 0d, 0d, 10000d, false)
+ GeneratorColumnSchema.makeNormal("x", ValueType.FLOAT, false, 1, 0d, 0d, 10000d, false),
+ GeneratorColumnSchema.makeNormal("y", ValueType.FLOAT, false, 1, 0d, 0d, 10000d, false)
),
ImmutableList.of(),
Intervals.of("2000/P1D"),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java
index 6b96d6137357..ea7ac3bdac8f 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionFilterBenchmark.java
@@ -20,8 +20,6 @@
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.benchmark.datagen.BenchmarkColumnSchema;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
@@ -39,6 +37,8 @@
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.generator.GeneratorColumnSchema;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.openjdk.jmh.annotations.Benchmark;
@@ -86,9 +86,9 @@ public void setup()
{
this.closer = Closer.create();
- final BenchmarkSchemaInfo schemaInfo = new BenchmarkSchemaInfo(
+ final GeneratorSchemaInfo schemaInfo = new GeneratorSchemaInfo(
ImmutableList.of(
- BenchmarkColumnSchema.makeEnumerated(
+ GeneratorColumnSchema.makeEnumerated(
"x",
ValueType.STRING,
false,
@@ -97,7 +97,7 @@ public void setup()
Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null),
Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
),
- BenchmarkColumnSchema.makeEnumerated(
+ GeneratorColumnSchema.makeEnumerated(
"y",
ValueType.STRING,
false,
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java
index d3406a469051..0ee1db3669c7 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/ExpressionSelectorBenchmark.java
@@ -20,8 +20,6 @@
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.benchmark.datagen.BenchmarkColumnSchema;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
@@ -41,6 +39,8 @@
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.generator.GeneratorColumnSchema;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -86,9 +86,9 @@ public void setup()
{
this.closer = Closer.create();
- final BenchmarkSchemaInfo schemaInfo = new BenchmarkSchemaInfo(
+ final GeneratorSchemaInfo schemaInfo = new GeneratorSchemaInfo(
ImmutableList.of(
- BenchmarkColumnSchema.makeZipf(
+ GeneratorColumnSchema.makeZipf(
"n",
ValueType.LONG,
false,
@@ -98,7 +98,7 @@ public void setup()
10000,
3d
),
- BenchmarkColumnSchema.makeZipf(
+ GeneratorColumnSchema.makeZipf(
"s",
ValueType.STRING,
false,
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
index 512f1eb932db..5efe00e5b6aa 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
@@ -22,9 +22,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -69,6 +66,9 @@
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.filter.OrFilter;
import org.apache.druid.segment.filter.SelectorFilter;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -125,7 +125,7 @@ public class FilterPartitionBenchmark
private Filter timeFilterHalf;
private Filter timeFilterAll;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private static String JS_FN = "function(str) { return 'super-' + str; }";
private static ExtractionFn JS_EXTRACTION_FN = new JavaScriptExtractionFn(JS_FN, false, JavaScriptConfig.getEnabledInstance());
@@ -153,9 +153,9 @@ public void setup() throws IOException
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
index f60708596a8c..ef4ca7bc1925 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
@@ -21,9 +21,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
@@ -67,6 +64,9 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -125,7 +125,7 @@ public class FilteredAggregatorBenchmark
private DimFilter filter;
private List inputRows;
private QueryRunnerFactory factory;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private TimeseriesQuery query;
private File tmpDir;
@@ -152,9 +152,9 @@ public void setup() throws IOException
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
@@ -202,7 +202,7 @@ public void setup() throws IOException
QueryBenchmarkUtil.NOOP_QUERYWATCHER
);
- BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
List queryAggs = new ArrayList<>();
queryAggs.add(filteredMetrics[0]);
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
index 1570619b9029..424f2977104c 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/FloatCompressionBenchmarkFileGenerator.java
@@ -20,14 +20,14 @@
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.benchmark.datagen.BenchmarkColumnSchema;
-import org.apache.druid.benchmark.datagen.BenchmarkColumnValueGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ColumnarFloatsSerializer;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.generator.ColumnValueGenerator;
+import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import java.io.BufferedReader;
@@ -65,7 +65,7 @@ public static void main(String[] args) throws IOException
dirPath = args[0];
}
- BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated(
+ GeneratorColumnSchema enumeratedSchema = GeneratorColumnSchema.makeEnumerated(
"",
ValueType.FLOAT,
true,
@@ -86,7 +86,7 @@ public static void main(String[] args) throws IOException
0.0001
)
);
- BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf(
+ GeneratorColumnSchema zipfLowSchema = GeneratorColumnSchema.makeZipf(
"",
ValueType.FLOAT,
true,
@@ -96,7 +96,7 @@ public static void main(String[] args) throws IOException
1000,
1d
);
- BenchmarkColumnSchema zipfHighSchema = BenchmarkColumnSchema.makeZipf(
+ GeneratorColumnSchema zipfHighSchema = GeneratorColumnSchema.makeZipf(
"",
ValueType.FLOAT,
true,
@@ -106,7 +106,7 @@ public static void main(String[] args) throws IOException
1000,
3d
);
- BenchmarkColumnSchema sequentialSchema = BenchmarkColumnSchema.makeSequential(
+ GeneratorColumnSchema sequentialSchema = GeneratorColumnSchema.makeSequential(
"",
ValueType.FLOAT,
true,
@@ -115,7 +115,7 @@ public static void main(String[] args) throws IOException
1470187671,
2000000000
);
- BenchmarkColumnSchema uniformSchema = BenchmarkColumnSchema.makeContinuousUniform(
+ GeneratorColumnSchema uniformSchema = GeneratorColumnSchema.makeContinuousUniform(
"",
ValueType.FLOAT,
true,
@@ -125,18 +125,18 @@ public static void main(String[] args) throws IOException
1000
);
- Map generators = new HashMap<>();
- generators.put("enumerate", new BenchmarkColumnValueGenerator(enumeratedSchema, 1));
- generators.put("zipfLow", new BenchmarkColumnValueGenerator(zipfLowSchema, 1));
- generators.put("zipfHigh", new BenchmarkColumnValueGenerator(zipfHighSchema, 1));
- generators.put("sequential", new BenchmarkColumnValueGenerator(sequentialSchema, 1));
- generators.put("uniform", new BenchmarkColumnValueGenerator(uniformSchema, 1));
+ Map generators = new HashMap<>();
+ generators.put("enumerate", new ColumnValueGenerator(enumeratedSchema, 1));
+ generators.put("zipfLow", new ColumnValueGenerator(zipfLowSchema, 1));
+ generators.put("zipfHigh", new ColumnValueGenerator(zipfHighSchema, 1));
+ generators.put("sequential", new ColumnValueGenerator(sequentialSchema, 1));
+ generators.put("uniform", new ColumnValueGenerator(uniformSchema, 1));
File dir = new File(dirPath);
dir.mkdir();
// create data files using BenchmarkColunValueGenerator
- for (Map.Entry entry : generators.entrySet()) {
+ for (Map.Entry entry : generators.entrySet()) {
final File dataFile = new File(dir, entry.getKey());
dataFile.delete();
try (Writer writer = Files.newBufferedWriter(dataFile.toPath(), StandardCharsets.UTF_8)) {
@@ -147,7 +147,7 @@ public static void main(String[] args) throws IOException
}
// create compressed files using all combinations of CompressionStrategy and FloatEncoding provided
- for (Map.Entry entry : generators.entrySet()) {
+ for (Map.Entry entry : generators.entrySet()) {
for (CompressionStrategy compression : COMPRESSIONS) {
String name = entry.getKey() + "-" + compression;
log.info("%s: ", name);
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 647a787f67b6..d12ff8784f2e 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -23,9 +23,6 @@
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
@@ -70,6 +67,9 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -144,7 +144,7 @@ public class GroupByTypeInterfaceBenchmark
private QueryRunnerFactory factory;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private GroupByQuery stringQuery;
private GroupByQuery longFloatQuery;
private GroupByQuery floatQuery;
@@ -172,7 +172,7 @@ private void setupQueries()
{
// queries for the basic schema
Map basicQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -277,13 +277,13 @@ public void setup() throws IOException
String schemaName = "basic";
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
stringQuery = SCHEMA_QUERY_MAP.get(schemaName).get("string");
longFloatQuery = SCHEMA_QUERY_MAP.get(schemaName).get("longFloat");
longQuery = SCHEMA_QUERY_MAP.get(schemaName).get("long");
floatQuery = SCHEMA_QUERY_MAP.get(schemaName).get("float");
- final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(
+ final DataGenerator dataGenerator = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + 1,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java
index 9098953fbe97..132a3215927b 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/JoinAndLookupBenchmark.java
@@ -38,6 +38,7 @@
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnConfig;
@@ -48,8 +49,8 @@
import org.apache.druid.segment.join.JoinTestHelper;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.segment.join.JoinableClause;
-import org.apache.druid.segment.join.filter.JoinFilterAnalyzer;
-import org.apache.druid.segment.join.filter.JoinFilterPreAnalysis;
+import org.apache.druid.segment.join.filter.rewrite.JoinFilterPreAnalysisGroup;
+import org.apache.druid.segment.join.filter.rewrite.JoinFilterRewriteConfig;
import org.apache.druid.segment.join.lookup.LookupJoinable;
import org.apache.druid.segment.join.table.IndexedTableJoinable;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
@@ -139,19 +140,20 @@ public void setup() throws IOException
)
)
);
- JoinFilterPreAnalysis preAnalysisLookupStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
- joinableClausesLookupStringKey,
- VirtualColumns.EMPTY,
- null,
- false,
- false,
- false,
- 0
+ JoinFilterPreAnalysisGroup preAnalysisGroupLookupStringKey = new JoinFilterPreAnalysisGroup(
+ new JoinFilterRewriteConfig(
+ false,
+ false,
+ false,
+ 0
+ ),
+ true
);
+
hashJoinLookupStringKeySegment = new HashJoinSegment(
- baseSegment,
+ ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupStringKey,
- preAnalysisLookupStringKey
+ preAnalysisGroupLookupStringKey
);
List joinableClausesLookupLongKey = ImmutableList.of(
@@ -166,19 +168,20 @@ public void setup() throws IOException
)
)
);
- JoinFilterPreAnalysis preAnalysisLookupLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
- joinableClausesLookupLongKey,
- VirtualColumns.EMPTY,
- null,
- false,
- false,
- false,
- 0
+
+ JoinFilterPreAnalysisGroup preAnalysisGroupLookupLongKey = new JoinFilterPreAnalysisGroup(
+ new JoinFilterRewriteConfig(
+ false,
+ false,
+ false,
+ 0
+ ),
+ true
);
hashJoinLookupLongKeySegment = new HashJoinSegment(
- baseSegment,
+ ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesLookupLongKey,
- preAnalysisLookupLongKey
+ preAnalysisGroupLookupLongKey
);
List joinableClausesIndexedTableStringKey = ImmutableList.of(
@@ -193,19 +196,20 @@ public void setup() throws IOException
)
)
);
- JoinFilterPreAnalysis preAnalysisIndexedTableStringKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
- joinableClausesIndexedTableStringKey,
- VirtualColumns.EMPTY,
- null,
- false,
- false,
- false,
- 0
+
+ JoinFilterPreAnalysisGroup preAnalysisGroupIndexedStringKey = new JoinFilterPreAnalysisGroup(
+ new JoinFilterRewriteConfig(
+ false,
+ false,
+ false,
+ 0
+ ),
+ true
);
hashJoinIndexedTableStringKeySegment = new HashJoinSegment(
- baseSegment,
+ ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableStringKey,
- preAnalysisIndexedTableStringKey
+ preAnalysisGroupIndexedStringKey
);
List joinableClausesIndexedTableLonggKey = ImmutableList.of(
@@ -220,19 +224,19 @@ public void setup() throws IOException
)
)
);
- JoinFilterPreAnalysis preAnalysisIndexedTableLongKey = JoinFilterAnalyzer.computeJoinFilterPreAnalysis(
- joinableClausesIndexedTableLonggKey,
- VirtualColumns.EMPTY,
- null,
- false,
- false,
- false,
- 0
+ JoinFilterPreAnalysisGroup preAnalysisGroupIndexedLongKey = new JoinFilterPreAnalysisGroup(
+ new JoinFilterRewriteConfig(
+ false,
+ false,
+ false,
+ 0
+ ),
+ true
);
hashJoinIndexedTableLongKeySegment = new HashJoinSegment(
- baseSegment,
+ ReferenceCountingSegment.wrapRootGenerationSegment(baseSegment),
joinableClausesIndexedTableLonggKey,
- preAnalysisIndexedTableLongKey
+ preAnalysisGroupIndexedLongKey
);
final Map countryCodeToNameMap = JoinTestHelper.createCountryIsoCodeToNameLookup().getMap();
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/LikeFilterBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/LikeFilterBenchmark.java
index 83551146eb24..5c4a0f35a60e 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/LikeFilterBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/LikeFilterBenchmark.java
@@ -35,6 +35,7 @@
import org.apache.druid.query.filter.SelectorDimFilter;
import org.apache.druid.query.ordering.StringComparators;
import org.apache.druid.segment.column.BitmapIndex;
+import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.data.BitmapSerdeFactory;
import org.apache.druid.segment.data.CloseableIndexed;
import org.apache.druid.segment.data.GenericIndexed;
@@ -166,7 +167,7 @@ public CloseableIndexed getDimensionValues(String dimension)
}
@Override
- public boolean hasMultipleValues(final String dimension)
+ public ColumnCapabilities.Capable hasMultipleValues(final String dimension)
{
throw new UnsupportedOperationException();
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
index ead3dd0e5cc7..b9bca954de45 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/LongCompressionBenchmarkFileGenerator.java
@@ -20,14 +20,14 @@
package org.apache.druid.benchmark;
import com.google.common.collect.ImmutableList;
-import org.apache.druid.benchmark.datagen.BenchmarkColumnSchema;
-import org.apache.druid.benchmark.datagen.BenchmarkColumnValueGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.ColumnarLongsSerializer;
import org.apache.druid.segment.data.CompressionFactory;
import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.generator.ColumnValueGenerator;
+import org.apache.druid.segment.generator.GeneratorColumnSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMedium;
import java.io.BufferedReader;
@@ -66,7 +66,7 @@ public static void main(String[] args) throws IOException
dirPath = args[0];
}
- BenchmarkColumnSchema enumeratedSchema = BenchmarkColumnSchema.makeEnumerated(
+ GeneratorColumnSchema enumeratedSchema = GeneratorColumnSchema.makeEnumerated(
"",
ValueType.LONG,
true,
@@ -87,8 +87,8 @@ public static void main(String[] args) throws IOException
0.0001
)
);
- BenchmarkColumnSchema zipfLowSchema = BenchmarkColumnSchema.makeZipf("", ValueType.LONG, true, 1, 0d, -1, 1000, 1d);
- BenchmarkColumnSchema zipfHighSchema = BenchmarkColumnSchema.makeZipf(
+ GeneratorColumnSchema zipfLowSchema = GeneratorColumnSchema.makeZipf("", ValueType.LONG, true, 1, 0d, -1, 1000, 1d);
+ GeneratorColumnSchema zipfHighSchema = GeneratorColumnSchema.makeZipf(
"",
ValueType.LONG,
true,
@@ -98,7 +98,7 @@ public static void main(String[] args) throws IOException
1000,
3d
);
- BenchmarkColumnSchema sequentialSchema = BenchmarkColumnSchema.makeSequential(
+ GeneratorColumnSchema sequentialSchema = GeneratorColumnSchema.makeSequential(
"",
ValueType.LONG,
true,
@@ -107,7 +107,7 @@ public static void main(String[] args) throws IOException
1470187671,
2000000000
);
- BenchmarkColumnSchema uniformSchema = BenchmarkColumnSchema.makeDiscreteUniform(
+ GeneratorColumnSchema uniformSchema = GeneratorColumnSchema.makeDiscreteUniform(
"",
ValueType.LONG,
true,
@@ -117,18 +117,18 @@ public static void main(String[] args) throws IOException
1000
);
- Map generators = new HashMap<>();
- generators.put("enumerate", new BenchmarkColumnValueGenerator(enumeratedSchema, 1));
- generators.put("zipfLow", new BenchmarkColumnValueGenerator(zipfLowSchema, 1));
- generators.put("zipfHigh", new BenchmarkColumnValueGenerator(zipfHighSchema, 1));
- generators.put("sequential", new BenchmarkColumnValueGenerator(sequentialSchema, 1));
- generators.put("uniform", new BenchmarkColumnValueGenerator(uniformSchema, 1));
+ Map generators = new HashMap<>();
+ generators.put("enumerate", new ColumnValueGenerator(enumeratedSchema, 1));
+ generators.put("zipfLow", new ColumnValueGenerator(zipfLowSchema, 1));
+ generators.put("zipfHigh", new ColumnValueGenerator(zipfHighSchema, 1));
+ generators.put("sequential", new ColumnValueGenerator(sequentialSchema, 1));
+ generators.put("uniform", new ColumnValueGenerator(uniformSchema, 1));
File dir = new File(dirPath);
dir.mkdir();
// create data files using BenchmarkColunValueGenerator
- for (Map.Entry entry : generators.entrySet()) {
+ for (Map.Entry entry : generators.entrySet()) {
final File dataFile = new File(dir, entry.getKey());
dataFile.delete();
try (Writer writer = Files.newBufferedWriter(dataFile.toPath(), StandardCharsets.UTF_8)) {
@@ -139,7 +139,7 @@ public static void main(String[] args) throws IOException
}
// create compressed files using all combinations of CompressionStrategy and LongEncoding provided
- for (Map.Entry entry : generators.entrySet()) {
+ for (Map.Entry entry : generators.entrySet()) {
for (CompressionStrategy compression : COMPRESSIONS) {
for (CompressionFactory.LongEncodingStrategy encoding : ENCODINGS) {
String name = entry.getKey() + "-" + compression + "-" + encoding;
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
index 69f48664555f..bf7733db180e 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
@@ -20,9 +20,6 @@
package org.apache.druid.benchmark;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
@@ -66,6 +63,9 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -125,7 +125,7 @@ public class TopNTypeInterfaceBenchmark
private List qIndexes;
private QueryRunnerFactory factory;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private TopNQueryBuilder queryBuilder;
private TopNQuery stringQuery;
private TopNQuery longQuery;
@@ -153,7 +153,7 @@ private void setupQueries()
{
// queries for the basic schema
Map basicQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -165,7 +165,7 @@ private void setupQueries()
queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));
- // Use an IdentityExtractionFn to force usage of DimExtractionTopNAlgorithm
+ // Use an IdentityExtractionFn to force usage of HeapBasedTopNAlgorithm
TopNQueryBuilder queryBuilderString = new TopNQueryBuilder()
.dataSource("blah")
.granularity(Granularities.ALL)
@@ -174,7 +174,7 @@ private void setupQueries()
.intervals(intervalSpec)
.aggregators(queryAggs);
- // DimExtractionTopNAlgorithm is always used for numeric columns
+ // HeapBasedTopNAlgorithm is always used for numeric columns
TopNQueryBuilder queryBuilderLong = new TopNQueryBuilder()
.dataSource("blah")
.granularity(Granularities.ALL)
@@ -241,7 +241,7 @@ public void setup() throws IOException
setupQueries();
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
queryBuilder = SCHEMA_QUERY_MAP.get("basic").get("string");
queryBuilder.threshold(threshold);
stringQuery = queryBuilder.build();
@@ -258,7 +258,7 @@ public void setup() throws IOException
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java b/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java
deleted file mode 100644
index 142843753fdc..000000000000
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/BenchmarkSchemas.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.benchmark.datagen;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.math.expr.ExprMacroTable;
-import org.apache.druid.query.aggregation.AggregatorFactory;
-import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.query.aggregation.DoubleMinAggregatorFactory;
-import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
-import org.apache.druid.query.aggregation.LongMaxAggregatorFactory;
-import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
-import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
-import org.apache.druid.segment.column.ValueType;
-import org.joda.time.Interval;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-public class BenchmarkSchemas
-{
- public static final Map SCHEMA_MAP = new LinkedHashMap<>();
-
- static { // basic schema
- List basicSchemaColumns = ImmutableList.of(
- // dims
- BenchmarkColumnSchema.makeSequential("dimSequential", ValueType.STRING, false, 1, null, 0, 1000),
- BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 101, 1.0),
- BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 100000),
- BenchmarkColumnSchema.makeSequential("dimSequentialHalfNull", ValueType.STRING, false, 1, 0.5, 0, 1000),
- BenchmarkColumnSchema.makeEnumerated(
- "dimMultivalEnumerated",
- ValueType.STRING,
- false,
- 4,
- null,
- Arrays.asList("Hello", "World", "Foo", "Bar", "Baz"),
- Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
- ),
- BenchmarkColumnSchema.makeEnumerated(
- "dimMultivalEnumerated2",
- ValueType.STRING,
- false,
- 3,
- null,
- Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null),
- Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
- ),
- BenchmarkColumnSchema.makeSequential("dimMultivalSequentialWithNulls", ValueType.STRING, false, 8, 0.15, 1, 11),
- BenchmarkColumnSchema.makeSequential("dimHyperUnique", ValueType.STRING, false, 1, null, 0, 100000),
- BenchmarkColumnSchema.makeSequential("dimNull", ValueType.STRING, false, 1, 1.0, 0, 1),
-
- // metrics
- BenchmarkColumnSchema.makeSequential("metLongSequential", ValueType.LONG, true, 1, null, 0, 10000),
- BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500),
- BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true),
- BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.0)
- );
-
- List basicSchemaIngestAggs = new ArrayList<>();
- basicSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
- basicSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential"));
- basicSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
- basicSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal"));
- basicSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
- basicSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
-
- List basicSchemaIngestAggsExpression = new ArrayList<>();
- basicSchemaIngestAggsExpression.add(new CountAggregatorFactory("rows"));
- basicSchemaIngestAggsExpression.add(new LongSumAggregatorFactory("sumLongSequential", null, "if(sumLongSequential>0 && dimSequential>100 || dimSequential<10 || metLongSequential>3000,sumLongSequential,0)", ExprMacroTable.nil()));
- basicSchemaIngestAggsExpression.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
- basicSchemaIngestAggsExpression.add(new DoubleSumAggregatorFactory("sumFloatNormal", null, "if(sumFloatNormal>0 && dimSequential>100 || dimSequential<10 || metLongSequential>3000,sumFloatNormal,0)", ExprMacroTable.nil()));
- basicSchemaIngestAggsExpression.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
- basicSchemaIngestAggsExpression.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
-
- Interval basicSchemaDataInterval = Intervals.of("2000-01-01/P1D");
-
- BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo(
- basicSchemaColumns,
- basicSchemaIngestAggs,
- basicSchemaDataInterval,
- true
- );
-
- BenchmarkSchemaInfo basicSchemaExpression = new BenchmarkSchemaInfo(
- basicSchemaColumns,
- basicSchemaIngestAggsExpression,
- basicSchemaDataInterval,
- true
- );
-
- SCHEMA_MAP.put("basic", basicSchema);
- SCHEMA_MAP.put("expression", basicSchemaExpression);
- }
-
- static { // simple single string column and count agg schema, no rollup
- List basicSchemaColumns = ImmutableList.of(
- // dims
- BenchmarkColumnSchema.makeSequential("dimSequential", ValueType.STRING, false, 1, null, 0, 1000000)
- );
-
- List basicSchemaIngestAggs = new ArrayList<>();
- basicSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
-
- Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
-
- BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo(
- basicSchemaColumns,
- basicSchemaIngestAggs,
- basicSchemaDataInterval,
- false
- );
- SCHEMA_MAP.put("simple", basicSchema);
- }
-
- static { // simple single long column and count agg schema, no rollup
- List basicSchemaColumns = ImmutableList.of(
- // dims, ingest as a metric for now with rollup off, until numeric dims at ingestion are supported
- BenchmarkColumnSchema.makeSequential("dimSequential", ValueType.LONG, true, 1, null, 0, 1000000)
- );
-
- List basicSchemaIngestAggs = new ArrayList<>();
- basicSchemaIngestAggs.add(new LongSumAggregatorFactory("dimSequential", "dimSequential"));
- basicSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
-
- Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
-
- BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo(
- basicSchemaColumns,
- basicSchemaIngestAggs,
- basicSchemaDataInterval,
- false
- );
- SCHEMA_MAP.put("simpleLong", basicSchema);
- }
-
- static { // simple single float column and count agg schema, no rollup
- List basicSchemaColumns = ImmutableList.of(
- // dims, ingest as a metric for now with rollup off, until numeric dims at ingestion are supported
- BenchmarkColumnSchema.makeSequential("dimSequential", ValueType.FLOAT, true, 1, null, 0, 1000000)
- );
-
- List basicSchemaIngestAggs = new ArrayList<>();
- basicSchemaIngestAggs.add(new DoubleSumAggregatorFactory("dimSequential", "dimSequential"));
- basicSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
-
- Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
-
- BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo(
- basicSchemaColumns,
- basicSchemaIngestAggs,
- basicSchemaDataInterval,
- false
- );
- SCHEMA_MAP.put("simpleFloat", basicSchema);
- }
-
- static { // schema with high opportunity for rollup
- List rolloColumns = ImmutableList.of(
- // dims
- BenchmarkColumnSchema.makeEnumerated(
- "dimEnumerated",
- ValueType.STRING,
- false,
- 1,
- null,
- Arrays.asList("Hello", "World", "Foo", "Bar", "Baz"),
- Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
- ),
- BenchmarkColumnSchema.makeEnumerated(
- "dimEnumerated2",
- ValueType.STRING,
- false,
- 1,
- null,
- Arrays.asList("Apple", "Orange", "Xylophone", "Corundum", null),
- Arrays.asList(0.2, 0.25, 0.15, 0.10, 0.3)
- ),
- BenchmarkColumnSchema.makeZipf("dimZipf", ValueType.STRING, false, 1, null, 1, 100, 2.0),
- BenchmarkColumnSchema.makeDiscreteUniform("dimUniform", ValueType.STRING, false, 1, null, 1, 100),
-
- // metrics
- BenchmarkColumnSchema.makeZipf("metLongZipf", ValueType.LONG, true, 1, null, 0, 10000, 2.0),
- BenchmarkColumnSchema.makeDiscreteUniform("metLongUniform", ValueType.LONG, true, 1, null, 0, 500),
- BenchmarkColumnSchema.makeNormal("metFloatNormal", ValueType.FLOAT, true, 1, null, 5000.0, 1.0, true),
- BenchmarkColumnSchema.makeZipf("metFloatZipf", ValueType.FLOAT, true, 1, null, 0, 1000, 1.5)
- );
-
- List rolloSchemaIngestAggs = new ArrayList<>();
- rolloSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
- rolloSchemaIngestAggs.add(new LongSumAggregatorFactory("sumLongSequential", "metLongSequential"));
- rolloSchemaIngestAggs.add(new LongMaxAggregatorFactory("maxLongUniform", "metLongUniform"));
- rolloSchemaIngestAggs.add(new DoubleSumAggregatorFactory("sumFloatNormal", "metFloatNormal"));
- rolloSchemaIngestAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "metFloatZipf"));
- rolloSchemaIngestAggs.add(new HyperUniquesAggregatorFactory("hyper", "dimHyperUnique"));
-
- Interval basicSchemaDataInterval = Intervals.utc(0, 1000000);
-
- BenchmarkSchemaInfo rolloSchema = new BenchmarkSchemaInfo(
- rolloColumns,
- rolloSchemaIngestAggs,
- basicSchemaDataInterval,
- true
- );
- SCHEMA_MAP.put("rollo", rolloSchema);
- }
-
- static { // simple schema with null valued rows, no aggs on numeric columns
- List nullsSchemaColumns = ImmutableList.of(
- // string dims with nulls
- BenchmarkColumnSchema.makeZipf("stringZipf", ValueType.STRING, false, 1, 0.8, 1, 101, 1.5),
- BenchmarkColumnSchema.makeDiscreteUniform("stringUniform", ValueType.STRING, false, 1, 0.3, 1, 100000),
- BenchmarkColumnSchema.makeSequential("stringSequential", ValueType.STRING, false, 1, 0.5, 0, 1000),
-
- // numeric dims with nulls
- BenchmarkColumnSchema.makeSequential("longSequential", ValueType.LONG, false, 1, 0.45, 0, 10000),
- BenchmarkColumnSchema.makeDiscreteUniform("longUniform", ValueType.LONG, false, 1, 0.25, 0, 500),
- BenchmarkColumnSchema.makeZipf("doubleZipf", ValueType.DOUBLE, false, 1, 0.1, 0, 1000, 2.0),
- BenchmarkColumnSchema.makeZipf("floatZipf", ValueType.FLOAT, false, 1, 0.1, 0, 1000, 2.0)
- );
-
- List simpleNullsSchemaIngestAggs = new ArrayList<>();
- simpleNullsSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
-
- Interval nullsSchemaDataInterval = Intervals.of("2000-01-01/P1D");
-
- BenchmarkSchemaInfo nullsSchema = new BenchmarkSchemaInfo(
- nullsSchemaColumns,
- simpleNullsSchemaIngestAggs,
- nullsSchemaDataInterval,
- false
- );
-
- SCHEMA_MAP.put("nulls", nullsSchema);
- }
-
- static { // simple schema with null valued rows, no aggs on numeric columns
- List nullsSchemaColumns = ImmutableList.of(
- // string dims
- BenchmarkColumnSchema.makeZipf("stringZipf", ValueType.STRING, false, 1, null, 1, 101, 1.5),
- BenchmarkColumnSchema.makeDiscreteUniform("stringUniform", ValueType.STRING, false, 1, null, 1, 100000),
- BenchmarkColumnSchema.makeSequential("stringSequential", ValueType.STRING, false, 1, null, 0, 1000),
-
- // numeric dims
- BenchmarkColumnSchema.makeSequential("longSequential", ValueType.LONG, false, 1, null, 0, 10000),
- BenchmarkColumnSchema.makeDiscreteUniform("longUniform", ValueType.LONG, false, 1, null, 0, 500),
- BenchmarkColumnSchema.makeZipf("doubleZipf", ValueType.DOUBLE, false, 1, null, 0, 1000, 2.0),
- BenchmarkColumnSchema.makeZipf("floatZipf", ValueType.FLOAT, false, 1, null, 0, 1000, 2.0),
-
- // string dims with nulls
- BenchmarkColumnSchema.makeZipf("stringZipfWithNulls", ValueType.STRING, false, 1, 0.8, 1, 101, 1.5),
- BenchmarkColumnSchema.makeDiscreteUniform("stringUniformWithNulls", ValueType.STRING, false, 1, 0.3, 1, 100000),
- BenchmarkColumnSchema.makeSequential("stringSequentialWithNulls", ValueType.STRING, false, 1, 0.5, 0, 1000),
-
- // numeric dims with nulls
- BenchmarkColumnSchema.makeSequential("longSequentialWithNulls", ValueType.LONG, false, 1, 0.45, 0, 10000),
- BenchmarkColumnSchema.makeDiscreteUniform("longUniformWithNulls", ValueType.LONG, false, 1, 0.25, 0, 500),
- BenchmarkColumnSchema.makeZipf("doubleZipfWithNulls", ValueType.DOUBLE, false, 1, 0.1, 0, 1000, 2.0),
- BenchmarkColumnSchema.makeZipf("floatZipfWithNulls", ValueType.FLOAT, false, 1, 0.1, 0, 1000, 2.0)
- );
-
- List simpleNullsSchemaIngestAggs = new ArrayList<>();
- simpleNullsSchemaIngestAggs.add(new CountAggregatorFactory("rows"));
-
- Interval nullsSchemaDataInterval = Intervals.of("2000-01-01/P1D");
-
- BenchmarkSchemaInfo nullsSchema = new BenchmarkSchemaInfo(
- nullsSchemaColumns,
- simpleNullsSchemaIngestAggs,
- nullsSchemaDataInterval,
- false
- );
-
- SCHEMA_MAP.put("nulls-and-non-nulls", nullsSchema);
- }
-}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java b/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java
index 95a42d5ffd88..f6d018d9bb82 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java
@@ -35,6 +35,8 @@
import org.apache.druid.segment.QueryableIndexIndexableAdapter;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -100,7 +102,7 @@ public File getCacheDir()
public QueryableIndex generate(
final DataSegment dataSegment,
- final BenchmarkSchemaInfo schemaInfo,
+ final GeneratorSchemaInfo schemaInfo,
final Granularity granularity,
final int numRows
)
@@ -131,7 +133,7 @@ public QueryableIndex generate(
log.info("Writing segment with hash[%s] to directory[%s].", dataHash, outDir);
- final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(
+ final DataGenerator dataGenerator = new DataGenerator(
schemaInfo.getColumnSchemas(),
dataSegment.getId().hashCode(), /* Use segment identifier hashCode as seed */
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
index f7d1e91b5236..1a9ac01cf412 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IncrementalIndexReadBenchmark.java
@@ -19,9 +19,6 @@
package org.apache.druid.benchmark.indexing;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -43,6 +40,9 @@
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.data.IndexedInts;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
@@ -91,7 +91,7 @@ public class IncrementalIndexReadBenchmark
private IncrementalIndex incIndex;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
@Setup
public void setup() throws IOException
@@ -100,9 +100,9 @@ public void setup() throws IOException
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
index 19862f27c948..02c6efd89569 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexIngestionBenchmark.java
@@ -19,13 +19,13 @@
package org.apache.druid.benchmark.indexing;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
@@ -70,7 +70,7 @@ public class IndexIngestionBenchmark
private IncrementalIndex incIndex;
private ArrayList rows;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
@Setup
public void setup()
@@ -78,9 +78,9 @@ public void setup()
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
rows = new ArrayList();
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
index 6b3b902c3ad4..80e964362e44 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
@@ -21,9 +21,6 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -35,6 +32,9 @@
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
@@ -96,7 +96,7 @@ public class IndexMergeBenchmark
}
private List indexesToMerge;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private File tmpDir;
private IndexMergerV9 indexMergerV9;
@@ -121,10 +121,10 @@ public void setup() throws IOException
indexesToMerge = new ArrayList<>();
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
for (int i = 0; i < numSegments; i++) {
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
index d826fe8fdc73..5b1a0ca6d6f4 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -20,9 +20,6 @@
package org.apache.druid.benchmark.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -32,6 +29,9 @@
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
@@ -92,7 +92,7 @@ public class IndexPersistBenchmark
private IncrementalIndex incIndex;
private ArrayList rows;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
@Setup
public void setup()
@@ -102,7 +102,7 @@ public void setup()
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());
rows = new ArrayList();
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schema);
int valuesPerTimestamp = 1;
switch (rollupOpportunity) {
@@ -115,7 +115,7 @@ public void setup()
}
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED,
schemaInfo.getDataInterval().getStartMillis(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index 94b560e8f4df..daa614ff4ddd 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -27,8 +27,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.client.CachingClusteredClient;
import org.apache.druid.client.DruidServer;
@@ -104,6 +102,8 @@
import org.apache.druid.query.topn.TopNResultValue;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.timeline.DataSegment;
@@ -180,7 +180,7 @@ public class CachingClusteredClientBenchmark
private final Closer closer = Closer.create();
- private final BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ private final GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
private final QuerySegmentSpec basicSchemaIntervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(basicSchema.getDataInterval())
);
@@ -204,7 +204,7 @@ public void setup()
parallelCombine = parallelism > 0;
- BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+ GeneratorSchemaInfo schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
Map queryableIndexes = Maps.newHashMapWithExpectedSize(numServers);
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 02107f09af48..57214154cd2c 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -26,9 +26,6 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
@@ -84,6 +81,9 @@
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
@@ -161,7 +161,7 @@ public class GroupByBenchmark
private QueryRunnerFactory factory;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private GroupByQuery query;
private ExecutorService executorService;
@@ -192,7 +192,7 @@ private void setupQueries()
{
// queries for the basic schema
Map basicQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -324,7 +324,7 @@ private void setupQueries()
// simple one column schema, for testing performance difference between querying on numeric values as Strings and
// directly as longs
Map simpleQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo simpleSchema = BenchmarkSchemas.SCHEMA_MAP.get("simple");
+ GeneratorSchemaInfo simpleSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("simple");
{ // simple.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(simpleSchema.getDataInterval()));
@@ -351,7 +351,7 @@ private void setupQueries()
Map simpleLongQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo simpleLongSchema = BenchmarkSchemas.SCHEMA_MAP.get("simpleLong");
+ GeneratorSchemaInfo simpleLongSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("simpleLong");
{ // simpleLong.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(simpleLongSchema.getDataInterval()));
List queryAggs = new ArrayList<>();
@@ -377,7 +377,7 @@ private void setupQueries()
Map simpleFloatQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo simpleFloatSchema = BenchmarkSchemas.SCHEMA_MAP.get("simpleFloat");
+ GeneratorSchemaInfo simpleFloatSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("simpleFloat");
{ // simpleFloat.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(simpleFloatSchema.getDataInterval()));
List queryAggs = new ArrayList<>();
@@ -402,7 +402,7 @@ private void setupQueries()
// simple one column schema, for testing performance difference between querying on numeric values as Strings and
// directly as longs
Map nullQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo nullSchema = BenchmarkSchemas.SCHEMA_MAP.get("nulls");
+ GeneratorSchemaInfo nullSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("nulls");
{ // simple-null
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(nullSchema.getDataInterval()));
@@ -443,10 +443,10 @@ public void setup() throws IOException
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
- final BenchmarkDataGenerator dataGenerator = new BenchmarkDataGenerator(
+ final DataGenerator dataGenerator = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + 1,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
index a2b0e98fb980..410b13e4661c 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/ScanBenchmark.java
@@ -22,9 +22,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
@@ -67,6 +64,9 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -133,7 +133,7 @@ public class ScanBenchmark
private List qIndexes;
private QueryRunnerFactory factory;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private Druids.ScanQueryBuilder queryBuilder;
private ScanQuery query;
private File tmpDir;
@@ -155,7 +155,7 @@ private void setupQueries()
{
// queries for the basic schema
final Map basicQueries = new LinkedHashMap<>();
- final BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ final GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
final List queryTypes = ImmutableList.of("A", "B", "C", "D");
for (final String eachType : queryTypes) {
@@ -165,7 +165,7 @@ private void setupQueries()
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
- private static Druids.ScanQueryBuilder makeQuery(final String name, final BenchmarkSchemaInfo basicSchema)
+ private static Druids.ScanQueryBuilder makeQuery(final String name, final GeneratorSchemaInfo basicSchema)
{
switch (name) {
case "A":
@@ -182,7 +182,7 @@ private static Druids.ScanQueryBuilder makeQuery(final String name, final Benchm
}
/* Just get everything */
- private static Druids.ScanQueryBuilder basicA(final BenchmarkSchemaInfo basicSchema)
+ private static Druids.ScanQueryBuilder basicA(final GeneratorSchemaInfo basicSchema)
{
final QuerySegmentSpec intervalSpec =
new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -193,7 +193,7 @@ private static Druids.ScanQueryBuilder basicA(final BenchmarkSchemaInfo basicSch
.order(ordering);
}
- private static Druids.ScanQueryBuilder basicB(final BenchmarkSchemaInfo basicSchema)
+ private static Druids.ScanQueryBuilder basicB(final GeneratorSchemaInfo basicSchema)
{
final QuerySegmentSpec intervalSpec =
new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -214,7 +214,7 @@ private static Druids.ScanQueryBuilder basicB(final BenchmarkSchemaInfo basicSch
.order(ordering);
}
- private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSchema)
+ private static Druids.ScanQueryBuilder basicC(final GeneratorSchemaInfo basicSchema)
{
final QuerySegmentSpec intervalSpec =
new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -227,7 +227,7 @@ private static Druids.ScanQueryBuilder basicC(final BenchmarkSchemaInfo basicSch
.order(ordering);
}
- private static Druids.ScanQueryBuilder basicD(final BenchmarkSchemaInfo basicSchema)
+ private static Druids.ScanQueryBuilder basicD(final GeneratorSchemaInfo basicSchema)
{
final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(basicSchema.getDataInterval())
@@ -257,7 +257,7 @@ public void setup() throws IOException
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
queryBuilder.limit(limit);
query = queryBuilder.build();
@@ -265,7 +265,7 @@ public void setup() throws IOException
incIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
System.currentTimeMillis(),
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
index 06320c331101..d8ae55adc412 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SearchBenchmark.java
@@ -24,9 +24,6 @@
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Row;
@@ -74,6 +71,9 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -133,7 +133,7 @@ public class SearchBenchmark
private List qIndexes;
private QueryRunnerFactory factory;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private Druids.SearchQueryBuilder queryBuilder;
private SearchQuery query;
private File tmpDir;
@@ -162,7 +162,7 @@ private void setupQueries()
{
// queries for the basic schema
final Map basicQueries = new LinkedHashMap<>();
- final BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ final GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
final List queryTypes = ImmutableList.of("A", "B", "C", "D");
for (final String eachType : queryTypes) {
@@ -172,7 +172,7 @@ private void setupQueries()
SCHEMA_QUERY_MAP.put("basic", basicQueries);
}
- private static SearchQueryBuilder makeQuery(final String name, final BenchmarkSchemaInfo basicSchema)
+ private static SearchQueryBuilder makeQuery(final String name, final GeneratorSchemaInfo basicSchema)
{
switch (name) {
case "A":
@@ -188,7 +188,7 @@ private static SearchQueryBuilder makeQuery(final String name, final BenchmarkSc
}
}
- private static SearchQueryBuilder basicA(final BenchmarkSchemaInfo basicSchema)
+ private static SearchQueryBuilder basicA(final GeneratorSchemaInfo basicSchema)
{
final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -199,7 +199,7 @@ private static SearchQueryBuilder basicA(final BenchmarkSchemaInfo basicSchema)
.query("123");
}
- private static SearchQueryBuilder basicB(final BenchmarkSchemaInfo basicSchema)
+ private static SearchQueryBuilder basicB(final GeneratorSchemaInfo basicSchema)
{
final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -230,7 +230,7 @@ private static SearchQueryBuilder basicB(final BenchmarkSchemaInfo basicSchema)
.filters(new AndDimFilter(dimFilters));
}
- private static SearchQueryBuilder basicC(final BenchmarkSchemaInfo basicSchema)
+ private static SearchQueryBuilder basicC(final GeneratorSchemaInfo basicSchema)
{
final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -284,7 +284,7 @@ public ExtractionType getExtractionType()
.filters(new AndDimFilter(dimFilters));
}
- private static SearchQueryBuilder basicD(final BenchmarkSchemaInfo basicSchema)
+ private static SearchQueryBuilder basicD(final GeneratorSchemaInfo basicSchema)
{
final QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(
Collections.singletonList(basicSchema.getDataInterval())
@@ -327,7 +327,7 @@ public void setup() throws IOException
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
queryBuilder.limit(limit);
query = queryBuilder.build();
@@ -335,7 +335,7 @@ public void setup() throws IOException
incIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
System.currentTimeMillis(),
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
index 6c2de09cce55..bd63a1a20e9d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlBenchmark.java
@@ -22,8 +22,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -32,6 +30,8 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
@@ -385,7 +385,7 @@ public class SqlBenchmark
@Setup(Level.Trial)
public void setup()
{
- final BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ final GeneratorSchemaInfo schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
final DataSegment dataSegment = DataSegment.builder()
.dataSource("foo")
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
index dc75b6d47d1f..2e89c4dda48e 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlVsNativeBenchmark.java
@@ -21,8 +21,6 @@
import com.google.common.collect.ImmutableList;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.datagen.SegmentGenerator;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.java.util.common.Intervals;
@@ -38,6 +36,8 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.server.security.AuthenticationResult;
@@ -96,7 +96,7 @@ public void setup()
{
this.closer = Closer.create();
- final BenchmarkSchemaInfo schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ final GeneratorSchemaInfo schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
final DataSegment dataSegment = DataSegment.builder()
.dataSource("foo")
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
index bf72eea3774e..931af6cf5f7b 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
@@ -20,9 +20,6 @@
package org.apache.druid.benchmark.query;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -68,6 +65,9 @@
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -126,7 +126,7 @@ public class TimeseriesBenchmark
private File tmpDir;
private QueryRunnerFactory factory;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private TimeseriesQuery query;
private ExecutorService executorService;
@@ -153,7 +153,7 @@ private void setupQueries()
{
// queries for the basic schema
Map basicQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -253,13 +253,13 @@ public void setup() throws IOException
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
incIndexes = new ArrayList<>();
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
index c9ccfd7828e2..bbafeff1507f 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/TopNBenchmark.java
@@ -20,9 +20,6 @@
package org.apache.druid.benchmark.query;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.InputRow;
@@ -65,6 +62,9 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -125,7 +125,7 @@ public class TopNBenchmark
private List qIndexes;
private QueryRunnerFactory factory;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private TopNQueryBuilder queryBuilder;
private TopNQuery query;
private File tmpDir;
@@ -154,7 +154,7 @@ private void setupQueries()
{
// queries for the basic schema
Map basicQueries = new LinkedHashMap<>();
- BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
{ // basic.A
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -228,7 +228,7 @@ public void setup() throws IOException
String schemaName = schemaQuery[0];
String queryName = schemaQuery[1];
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
queryBuilder = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
queryBuilder.threshold(threshold);
query = queryBuilder.build();
@@ -237,7 +237,7 @@ public void setup() throws IOException
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
schemaInfo.getDataInterval(),
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
index f87773d36f1e..3272ecd2623d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
@@ -21,9 +21,6 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
-import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
import org.apache.druid.benchmark.query.QueryBenchmarkUtil;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
@@ -72,6 +69,9 @@
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
+import org.apache.druid.segment.generator.DataGenerator;
+import org.apache.druid.segment.generator.GeneratorBasicSchemas;
+import org.apache.druid.segment.generator.GeneratorSchemaInfo;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
@@ -143,7 +143,7 @@ public class TimeCompareBenchmark
private Query timeseriesQuery;
private QueryRunner timeseriesRunner;
- private BenchmarkSchemaInfo schemaInfo;
+ private GeneratorSchemaInfo schemaInfo;
private File tmpDir;
private Interval[] segmentIntervals;
@@ -172,7 +172,7 @@ public int columnCacheSizeBytes()
private void setupQueries()
{
// queries for the basic schema
- BenchmarkSchemaInfo basicSchema = BenchmarkSchemas.SCHEMA_MAP.get("basic");
+ GeneratorSchemaInfo basicSchema = GeneratorBasicSchemas.SCHEMA_MAP.get("basic");
QuerySegmentSpec intervalSpec =
new MultipleIntervalSegmentSpec(Collections.singletonList(basicSchema.getDataInterval()));
@@ -291,7 +291,7 @@ public void setup() throws IOException
setupQueries();
String schemaName = "basic";
- schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schemaName);
+ schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
segmentIntervals = new Interval[numSegments];
long startMillis = schemaInfo.getDataInterval().getStartMillis();
@@ -308,7 +308,7 @@ public void setup() throws IOException
for (int i = 0; i < numSegments; i++) {
log.info("Generating rows for segment " + i);
- BenchmarkDataGenerator gen = new BenchmarkDataGenerator(
+ DataGenerator gen = new DataGenerator(
schemaInfo.getColumnSchemas(),
RNG_SEED + i,
segmentIntervals[i],
diff --git a/codestyle/druid-forbidden-apis.txt b/codestyle/druid-forbidden-apis.txt
index 0ae33f5459ad..95c7d770326d 100644
--- a/codestyle/druid-forbidden-apis.txt
+++ b/codestyle/druid-forbidden-apis.txt
@@ -24,7 +24,6 @@ com.google.common.io.Files#createTempDir() @ Use org.apache.druid.java.util.comm
com.google.common.util.concurrent.MoreExecutors#sameThreadExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#newDirectExecutorService() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
com.google.common.util.concurrent.MoreExecutors#directExecutor() @ Use org.apache.druid.java.util.common.concurrent.Execs#directExecutor()
-com.google.common.util.concurrent.Futures#transform(com.google.common.util.concurrent.ListenableFuture, com.google.common.util.concurrent.AsyncFunction) @ Use org.apache.druid.java.util.common.concurrent.ListenableFutures#transformAsync
java.io.File#toURL() @ Use java.io.File#toURI() and java.net.URI#toURL() instead
java.lang.String#matches(java.lang.String) @ Use startsWith(), endsWith(), contains(), or compile and cache a Pattern explicitly
java.lang.String#replace(java.lang.CharSequence,java.lang.CharSequence) @ Use one of the appropriate methods in StringUtils instead
diff --git a/core/pom.xml b/core/pom.xml
index 627b3af4b80f..839d19b480c4 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -394,6 +394,7 @@
@{jacocoArgLine}
-Djava.library.path=${project.build.directory}/hyperic-sigar-${sigar.base.version}/sigar-bin/lib/
+ -Duser.language=en
diff --git a/core/src/main/java/org/apache/druid/collections/BlockingPool.java b/core/src/main/java/org/apache/druid/collections/BlockingPool.java
index 91c3b35b6549..c17329917cd2 100644
--- a/core/src/main/java/org/apache/druid/collections/BlockingPool.java
+++ b/core/src/main/java/org/apache/druid/collections/BlockingPool.java
@@ -19,31 +19,12 @@
package org.apache.druid.collections;
-import javax.annotation.Nullable;
import java.util.List;
public interface BlockingPool
{
int maxSize();
- /**
- * Take a resource from the pool, waiting up to the
- * specified wait time if necessary for an element to become available.
- *
- * @param timeoutMs maximum time to wait for a resource, in milliseconds.
- *
- * @return a resource, or null if the timeout was reached
- */
- @Nullable
- ReferenceCountingResourceHolder take(long timeoutMs);
-
- /**
- * Take a resource from the pool, waiting if necessary until an element becomes available.
- *
- * @return a resource
- */
- ReferenceCountingResourceHolder take();
-
/**
* Take resources from the pool, waiting up to the
* specified wait time if necessary for elements of the given number to become available.
diff --git a/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java b/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
index b1b43fd22477..a32236db20db 100644
--- a/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
+++ b/core/src/main/java/org/apache/druid/collections/DefaultBlockingPool.java
@@ -76,32 +76,6 @@ public int getPoolSize()
return objects.size();
}
- @Override
- @Nullable
- public ReferenceCountingResourceHolder take(final long timeoutMs)
- {
- Preconditions.checkArgument(timeoutMs >= 0, "timeoutMs must be a non-negative value, but was [%s]", timeoutMs);
- checkInitialized();
- try {
- return wrapObject(timeoutMs > 0 ? pollObject(timeoutMs) : pollObject());
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public ReferenceCountingResourceHolder take()
- {
- checkInitialized();
- try {
- return wrapObject(takeObject());
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
@Nullable
private ReferenceCountingResourceHolder wrapObject(T theObject)
{
@@ -144,21 +118,6 @@ private T pollObject(long timeoutMs) throws InterruptedException
}
}
- private T takeObject() throws InterruptedException
- {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (objects.isEmpty()) {
- notEnough.await();
- }
- return objects.pop();
- }
- finally {
- lock.unlock();
- }
- }
-
@Override
public List> takeBatch(final int elementNum, final long timeoutMs)
{
diff --git a/core/src/main/java/org/apache/druid/collections/DummyBlockingPool.java b/core/src/main/java/org/apache/druid/collections/DummyBlockingPool.java
index 037a489e1f19..dcd6cea07aa7 100644
--- a/core/src/main/java/org/apache/druid/collections/DummyBlockingPool.java
+++ b/core/src/main/java/org/apache/druid/collections/DummyBlockingPool.java
@@ -44,18 +44,6 @@ public int maxSize()
return 0;
}
- @Override
- public ReferenceCountingResourceHolder take(long timeoutMs)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ReferenceCountingResourceHolder take()
- {
- throw new UnsupportedOperationException();
- }
-
@Override
public List> takeBatch(int elementNum, long timeoutMs)
{
diff --git a/core/src/main/java/org/apache/druid/collections/StupidPool.java b/core/src/main/java/org/apache/druid/collections/StupidPool.java
index a69a8fde30a9..d1d6a9b9b7b1 100644
--- a/core/src/main/java/org/apache/druid/collections/StupidPool.java
+++ b/core/src/main/java/org/apache/druid/collections/StupidPool.java
@@ -34,6 +34,7 @@
import java.util.concurrent.atomic.AtomicReference;
/**
+ *
*/
public class StupidPool implements NonBlockingPool
{
@@ -61,6 +62,7 @@ public class StupidPool implements NonBlockingPool
private final String name;
private final Supplier generator;
+ private final AtomicLong createdObjectsCounter = new AtomicLong(0);
private final AtomicLong leakedObjectsCounter = new AtomicLong(0);
//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
@@ -114,6 +116,7 @@ public ResourceHolder take()
private ObjectResourceHolder makeObjectWithHandler()
{
T object = generator.get();
+ createdObjectsCounter.incrementAndGet();
ObjectId objectId = new ObjectId();
ObjectLeakNotifier notifier = new ObjectLeakNotifier(this);
// Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken
@@ -122,7 +125,7 @@ private ObjectResourceHolder makeObjectWithHandler()
}
@VisibleForTesting
- long poolSize()
+ public long poolSize()
{
return poolSize.get();
}
@@ -133,6 +136,12 @@ long leakedObjectsCount()
return leakedObjectsCounter.get();
}
+ @VisibleForTesting
+ public long objectsCreatedCount()
+ {
+ return createdObjectsCounter.get();
+ }
+
private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier)
{
long currentPoolSize;
@@ -160,7 +169,12 @@ private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cle
* This should be impossible, because {@link ConcurrentLinkedQueue#offer(Object)} event don't have `return false;` in
* it's body in OpenJDK 8.
*/
- private void impossibleOffsetFailed(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier)
+ private void impossibleOffsetFailed(
+ T object,
+ ObjectId objectId,
+ Cleaners.Cleanable cleanable,
+ ObjectLeakNotifier notifier
+ )
{
poolSize.decrementAndGet();
notifier.disable();
diff --git a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
index 9e9a7d77df57..b779fc29d356 100644
--- a/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
+++ b/core/src/main/java/org/apache/druid/common/guava/CombiningSequence.java
@@ -29,8 +29,6 @@
import java.util.Comparator;
import java.util.function.BinaryOperator;
-/**
- */
public class CombiningSequence implements Sequence
{
public static CombiningSequence create(
@@ -76,9 +74,22 @@ public Yielder toYielder(OutType initValue, final YieldingAcc
new CombiningYieldingAccumulator<>(ordering, mergeFn, accumulator);
combiningAccumulator.setRetVal(initValue);
- Yielder baseYielder = baseSequence.toYielder(null, combiningAccumulator);
- return makeYielder(baseYielder, combiningAccumulator, false);
+ final Yielder baseYielder = baseSequence.toYielder(null, combiningAccumulator);
+
+ try {
+ return makeYielder(baseYielder, combiningAccumulator, false);
+ }
+ catch (Throwable t1) {
+ try {
+ baseYielder.close();
+ }
+ catch (Throwable t2) {
+ t1.addSuppressed(t2);
+ }
+
+ throw t1;
+ }
}
private Yielder makeYielder(
diff --git a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java
index 2a241f66de7c..c03e5f6d8e84 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/prefetch/JsonIterator.java
@@ -27,11 +27,11 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Iterator;
import java.util.NoSuchElementException;
/**
@@ -39,7 +39,7 @@
*
* @param the type of object returned by this iterator
*/
-public class JsonIterator implements Iterator, Closeable
+public class JsonIterator implements CloseableIterator
{
private JsonParser jp;
private ObjectCodec objectCodec;
diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
index 33a4e3c74d58..bf9540015d33 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java
@@ -21,6 +21,7 @@
import com.google.common.base.Strings;
+import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
@@ -472,28 +473,34 @@ public static String repeat(String s, int count)
/**
* Returns the string left-padded with the string pad to a length of len characters.
* If str is longer than len, the return value is shortened to len characters.
- * Lpad and rpad functions are migrated from flink's scala function with minor refactor
+ * This function is migrated from flink's scala function with minor refactor
* https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+ * - Modified to handle empty pad string.
+ * - Padding of negative length return an empty string.
*
* @param base The base string to be padded
* @param len The length of padded string
* @param pad The pad string
*
- * @return the string left-padded with pad to a length of len
+ * @return the string left-padded with pad to a length of len or null if the pad is empty or the len is less than 0.
*/
- public static String lpad(String base, Integer len, String pad)
+ @Nonnull
+ public static String lpad(@Nonnull String base, int len, @Nonnull String pad)
{
- if (len < 0) {
- return null;
- } else if (len == 0) {
+ if (len <= 0) {
return "";
}
- char[] data = new char[len];
-
// The length of the padding needed
int pos = Math.max(len - base.length(), 0);
+ // short-circuit if there is no pad and we need to add a padding
+ if (pos > 0 && pad.isEmpty()) {
+ return base;
+ }
+
+ char[] data = new char[len];
+
// Copy the padding
for (int i = 0; i < pos; i += pad.length()) {
for (int j = 0; j < pad.length() && j < pos - i; j++) {
@@ -512,37 +519,48 @@ public static String lpad(String base, Integer len, String pad)
/**
* Returns the string right-padded with the string pad to a length of len characters.
* If str is longer than len, the return value is shortened to len characters.
+ * This function is migrated from flink's scala function with minor refactor
+ * https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/functions/ScalarFunctions.scala
+ * - Modified to handle empty pad string.
+ * - Modified to only copy the pad string if needed (this implementation mimics lpad).
+ * - Padding of negative length return an empty string.
*
* @param base The base string to be padded
* @param len The length of padded string
* @param pad The pad string
*
- * @return the string right-padded with pad to a length of len
+ * @return the string right-padded with pad to a length of len or null if the pad is empty or the len is less than 0.
*/
- public static String rpad(String base, Integer len, String pad)
+ @Nonnull
+ public static String rpad(@Nonnull String base, int len, @Nonnull String pad)
{
- if (len < 0) {
- return null;
- } else if (len == 0) {
+ if (len <= 0) {
return "";
}
- char[] data = new char[len];
-
- int pos = 0;
+ // The length of the padding needed
+ int paddingLen = Math.max(len - base.length(), 0);
- // Copy the base
- for (; pos < base.length() && pos < len; pos++) {
- data[pos] = base.charAt(pos);
+ // short-circuit if there is no pad and we need to add a padding
+ if (paddingLen > 0 && pad.isEmpty()) {
+ return base;
}
+ char[] data = new char[len];
+
+
// Copy the padding
- for (; pos < len; pos += pad.length()) {
- for (int i = 0; i < pad.length() && i < len - pos; i++) {
- data[pos + i] = pad.charAt(i);
+ for (int i = len - paddingLen; i < len; i += pad.length()) {
+ for (int j = 0; j < pad.length() && i + j < data.length; j++) {
+ data[i + j] = pad.charAt(j);
}
}
+ // Copy the base
+ for (int i = 0; i < len && i < base.length(); i++) {
+ data[i] = base.charAt(i);
+ }
+
return new String(data);
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java
deleted file mode 100644
index 1722e4a9c663..000000000000
--- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ListenableFutures.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.java.util.common.concurrent;
-
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
-import javax.annotation.Nullable;
-import java.util.function.Function;
-
-public class ListenableFutures
-{
- /**
- * Guava 19 changes the Futures.transform signature so that the async form is different. This is here as a
- * compatability layer until such a time as druid only supports Guava 19 or later, in which case
- * Futures.transformAsync should be used
- *
- * This is NOT copied from guava.
- */
- public static ListenableFuture transformAsync(
- final ListenableFuture inFuture,
- final Function> transform
- )
- {
- final SettableFuture finalFuture = SettableFuture.create();
- Futures.addCallback(inFuture, new FutureCallback()
- {
- @Override
- public void onSuccess(@Nullable I result)
- {
- final ListenableFuture transformFuture = transform.apply(result);
- Futures.addCallback(transformFuture, new FutureCallback()
- {
- @Override
- public void onSuccess(@Nullable O result)
- {
- finalFuture.set(result);
- }
-
- @Override
- public void onFailure(Throwable t)
- {
- finalFuture.setException(t);
- }
- });
- }
-
- @Override
- public void onFailure(Throwable t)
- {
- finalFuture.setException(t);
- }
- });
- return finalFuture;
- }
-}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ExplodingSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ExplodingSequence.java
new file mode 100644
index 000000000000..5468a5d86033
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ExplodingSequence.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.guava;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Wraps an underlying sequence and allows us to force it to explode at various points.
+ */
+public class ExplodingSequence extends YieldingSequenceBase
+{
+ private final Sequence baseSequence;
+ private final boolean getThrowsException;
+ private final boolean closeThrowsException;
+ private final AtomicLong closed = new AtomicLong();
+
+ public ExplodingSequence(Sequence baseSequence, boolean getThrowsException, boolean closeThrowsException)
+ {
+ this.baseSequence = baseSequence;
+ this.getThrowsException = getThrowsException;
+ this.closeThrowsException = closeThrowsException;
+ }
+
+ @Override
+ public Yielder toYielder(
+ final OutType initValue,
+ final YieldingAccumulator accumulator
+ )
+ {
+ return wrapYielder(baseSequence.toYielder(initValue, accumulator));
+ }
+
+ public long getCloseCount()
+ {
+ return closed.get();
+ }
+
+ private Yielder wrapYielder(final Yielder baseYielder)
+ {
+ return new Yielder()
+ {
+ @Override
+ public OutType get()
+ {
+ if (getThrowsException) {
+ throw new RuntimeException("get");
+ } else {
+ return baseYielder.get();
+ }
+ }
+
+ @Override
+ public Yielder next(OutType initValue)
+ {
+ return wrapYielder(baseYielder.next(initValue));
+ }
+
+ @Override
+ public boolean isDone()
+ {
+ return baseYielder.isDone();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ closed.incrementAndGet();
+
+ if (closeThrowsException) {
+ throw new IOException("close");
+ } else {
+ baseYielder.close();
+ }
+ }
+ };
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java
index 912775cba8ed..90f9b9a48ff3 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/MergeSequence.java
@@ -58,38 +58,62 @@ public Yielder toYielder(OutType initValue, YieldingAccumulat
)
);
- pQueue = baseSequences.accumulate(
- pQueue,
- (queue, in) -> {
- final Yielder yielder = in.toYielder(
- null,
- new YieldingAccumulator()
- {
- @Override
- public T accumulate(T accumulated, T in)
+ try {
+ pQueue = baseSequences.accumulate(
+ pQueue,
+ (queue, in) -> {
+ final Yielder yielder = in.toYielder(
+ null,
+ new YieldingAccumulator()
{
- yield();
- return in;
+ @Override
+ public T accumulate(T accumulated, T in)
+ {
+ yield();
+ return in;
+ }
}
+ );
+
+ if (!yielder.isDone()) {
+ try {
+ queue.add(yielder);
}
- );
+ catch (Throwable t1) {
+ try {
+ yielder.close();
+ }
+ catch (Throwable t2) {
+ t1.addSuppressed(t2);
+ }
- if (!yielder.isDone()) {
- queue.add(yielder);
- } else {
- try {
- yielder.close();
- }
- catch (IOException e) {
- throw new RuntimeException(e);
+ throw t1;
+ }
+ } else {
+ try {
+ yielder.close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
+
+ return queue;
}
+ );
- return queue;
- }
- );
+ return makeYielder(pQueue, initValue, accumulator);
+ }
+ catch (Throwable t1) {
+ try {
+ closeAll(pQueue);
+ }
+ catch (Throwable t2) {
+ t1.addSuppressed(t2);
+ }
- return makeYielder(pQueue, initValue, accumulator);
+ throw t1;
+ }
}
private Yielder makeYielder(
@@ -101,8 +125,22 @@ private Yielder makeYielder(
OutType retVal = initVal;
while (!accumulator.yielded() && !pQueue.isEmpty()) {
Yielder yielder = pQueue.remove();
- retVal = accumulator.accumulate(retVal, yielder.get());
- yielder = yielder.next(null);
+
+ try {
+ retVal = accumulator.accumulate(retVal, yielder.get());
+ yielder = yielder.next(null);
+ }
+ catch (Throwable t1) {
+ try {
+ yielder.close();
+ }
+ catch (Throwable t2) {
+ t1.addSuppressed(t2);
+ }
+
+ throw t1;
+ }
+
if (yielder.isDone()) {
try {
yielder.close();
@@ -144,12 +182,21 @@ public boolean isDone()
@Override
public void close() throws IOException
{
- Closer closer = Closer.create();
- while (!pQueue.isEmpty()) {
- closer.register(pQueue.remove());
- }
- closer.close();
+ closeAll(pQueue);
}
};
}
+
+ private static void closeAll(final PriorityQueue> pQueue) throws IOException
+ {
+ Closer closer = Closer.create();
+ while (!pQueue.isEmpty()) {
+ final Yielder yielder = pQueue.poll();
+ if (yielder != null) {
+ // Note: yielder can be null if our comparator threw an exception during queue.add.
+ closer.register(yielder);
+ }
+ }
+ closer.close();
+ }
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/YieldingSequenceBase.java b/core/src/main/java/org/apache/druid/java/util/common/guava/YieldingSequenceBase.java
index c5ed0111d074..9247989f5f64 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/YieldingSequenceBase.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/YieldingSequenceBase.java
@@ -19,6 +19,8 @@
package org.apache.druid.java.util.common.guava;
+import java.io.IOException;
+
/**
* A Sequence that is based entirely on the Yielder implementation.
*
@@ -29,16 +31,33 @@ public abstract class YieldingSequenceBase implements Sequence
@Override
public OutType accumulate(OutType initValue, Accumulator accumulator)
{
+ final OutType retVal;
Yielder yielder = toYielder(initValue, YieldingAccumulators.fromAccumulator(accumulator));
try {
while (!yielder.isDone()) {
yielder = yielder.next(yielder.get());
}
- return yielder.get();
+ retVal = yielder.get();
}
- finally {
- CloseQuietly.close(yielder);
+ catch (Throwable t1) {
+ try {
+ yielder.close();
+ }
+ catch (Throwable t2) {
+ t1.addSuppressed(t2);
+ }
+
+ throw t1;
}
+
+ try {
+ yielder.close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ return retVal;
}
}
diff --git a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
index d5915369eada..af1baafe4198 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
@@ -66,10 +66,10 @@ default CloseableIterator flatMap(Function> funct
return new CloseableIterator()
{
- CloseableIterator iterator = findNextIeteratorIfNecessary();
+ CloseableIterator iterator = findNextIteratorIfNecessary();
@Nullable
- private CloseableIterator findNextIeteratorIfNecessary()
+ private CloseableIterator findNextIteratorIfNecessary()
{
while ((iterator == null || !iterator.hasNext()) && delegate.hasNext()) {
if (iterator != null) {
@@ -105,7 +105,7 @@ public R next()
return iterator.next();
}
finally {
- findNextIeteratorIfNecessary();
+ findNextIteratorIfNecessary();
}
}
diff --git a/core/src/main/java/org/apache/druid/math/expr/Function.java b/core/src/main/java/org/apache/druid/math/expr/Function.java
index af52555793b7..a20863929875 100644
--- a/core/src/main/java/org/apache/druid/math/expr/Function.java
+++ b/core/src/main/java/org/apache/druid/math/expr/Function.java
@@ -694,6 +694,11 @@ protected ExprEval eval(double param)
class Round implements Function
{
+ //CHECKSTYLE.OFF: Regexp
+ private static final BigDecimal MAX_FINITE_VALUE = BigDecimal.valueOf(Double.MAX_VALUE);
+ private static final BigDecimal MIN_FINITE_VALUE = BigDecimal.valueOf(-1 * Double.MAX_VALUE);
+ //CHECKSTYLE.ON: Regexp
+
@Override
public String name()
{
@@ -705,7 +710,11 @@ public ExprEval apply(List args, Expr.ObjectBinding bindings)
{
ExprEval value1 = args.get(0).eval(bindings);
if (value1.type() != ExprType.LONG && value1.type() != ExprType.DOUBLE) {
- throw new IAE("The first argument to the function[%s] should be integer or double type but get the %s type", name(), value1.type());
+ throw new IAE(
+ "The first argument to the function[%s] should be integer or double type but got the type: %s",
+ name(),
+ value1.type()
+ );
}
if (args.size() == 1) {
@@ -713,7 +722,11 @@ public ExprEval apply(List args, Expr.ObjectBinding bindings)
} else {
ExprEval value2 = args.get(1).eval(bindings);
if (value2.type() != ExprType.LONG) {
- throw new IAE("The second argument to the function[%s] should be integer type but get the %s type", name(), value2.type());
+ throw new IAE(
+ "The second argument to the function[%s] should be integer type but got the type: %s",
+ name(),
+ value2.type()
+ );
}
return eval(value1, value2.asInt());
}
@@ -737,11 +750,27 @@ private ExprEval eval(ExprEval param, int scale)
if (param.type() == ExprType.LONG) {
return ExprEval.of(BigDecimal.valueOf(param.asLong()).setScale(scale, RoundingMode.HALF_UP).longValue());
} else if (param.type() == ExprType.DOUBLE) {
- return ExprEval.of(BigDecimal.valueOf(param.asDouble()).setScale(scale, RoundingMode.HALF_UP).doubleValue());
+ BigDecimal decimal = safeGetFromDouble(param.asDouble());
+ return ExprEval.of(decimal.setScale(scale, RoundingMode.HALF_UP).doubleValue());
} else {
return ExprEval.of(null);
}
}
+
+ /**
+ * Converts non-finite doubles to BigDecimal values instead of throwing a NumberFormatException.
+ */
+ private static BigDecimal safeGetFromDouble(double val)
+ {
+ if (Double.isNaN(val)) {
+ return BigDecimal.ZERO;
+ } else if (val == Double.POSITIVE_INFINITY) {
+ return MAX_FINITE_VALUE;
+ } else if (val == Double.NEGATIVE_INFINITY) {
+ return MIN_FINITE_VALUE;
+ }
+ return BigDecimal.valueOf(val);
+ }
}
class Signum extends UnivariateMathFunction
diff --git a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
index 7e9eb9234b76..98c75505c05f 100644
--- a/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
+++ b/core/src/main/java/org/apache/druid/segment/loading/DataSegmentPusher.java
@@ -20,9 +20,11 @@
package org.apache.druid.segment.loading;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
import java.io.File;
import java.io.IOException;
@@ -105,6 +107,13 @@ default List getAllowedPropertyPrefixesForHadoop()
// on segment deletion if segment being deleted was the only segment
static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath)
{
+ // Sanity check for shardSpec type.
+ // BucketNumberedShardSpec should never be used in segment push.
+ Preconditions.checkArgument(
+ !(segment.getShardSpec() instanceof BucketNumberedShardSpec),
+ "Illegal shardSpec type[%s]",
+ segment.getShardSpec()
+ );
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java
new file mode 100644
index 000000000000..d692dad113da
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BucketNumberedShardSpec.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.RangeSet;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is one of the special shardSpecs which are temporarily used during batch ingestion. In Druid, there is a
+ * concept of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
+ * partition set is represented as a range of partitionIds, i.e., [0, {@link ShardSpec#getNumCorePartitions()}).
+ *
+ * When you run a batch ingestion task with a non-linear partitioning scheme, the task populates all possible buckets
+ * upfront at the beginning (see {@code CachingLocalSegmentAllocator}) and uses them to partition input rows. However,
+ * some of the buckets can be empty even after the task consumes all inputs if the data is highly skewed. Since Druid
+ * doesn't create empty segments, the partitionId should be dynamically allocated when a bucket is actually in use,
+ * so that we can always create the packed core partition set without missing partitionIds.
+ *
+ * This BucketNumberedShardSpec is used for such use case. The task with a non-linear partitioning scheme uses it
+ * to postpone the partitionId allocation until all empty buckets are identified. See
+ * {@code ParallelIndexSupervisorTask.groupGenericPartitionLocationsPerPartition} and
+ * {@code CachingLocalSegmentAllocator} for parallel and sequential ingestion, respectively.
+ *
+ * Note that {@link org.apache.druid.timeline.SegmentId} requires the partitionId. Since the segmentId is used
+ * everwhere during ingestion, this class should implement {@link #getPartitionNum()} which returns the bucketId.
+ * This should be fine because the segmentId is only used to identify each segment until pushing them to deep storage.
+ * The bucketId should be enough to uniquely identify each segment. However, when pushing segments to deep storage,
+ * the partitionId is used to create the path to store the segment on deep storage
+ * ({@link org.apache.druid.segment.loading.DataSegmentPusher#getDefaultStorageDir} which should be correct.
+ * As a result, this shardSpec should not be used in pushing segments.
+ *
+ * This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
+ *
+ * This interface doesn't really have to extend {@link ShardSpec}. The only reason is the ShardSpec is used in many
+ * places such as {@link org.apache.druid.timeline.DataSegment}, and we have to modify those places to allow other
+ * types than ShardSpec which seems pretty invasive. Maybe we could clean up this mess someday in the future.
+ *
+ * @see BuildingShardSpec
+ */
+public interface BucketNumberedShardSpec extends ShardSpec
+{
+ int getBucketId();
+
+ T convert(int partitionId);
+
+ @Override
+ default PartitionChunk createChunk(O obj)
+ {
+ // The partitionId (or partitionNum, chunkNumber) is not determined yet. Use bucketId for now.
+ return new NumberedPartitionChunk<>(getBucketId(), 0, obj);
+ }
+
+ @Override
+ default int getPartitionNum()
+ {
+ // See the class-level Javadoc for returning bucketId here.
+ return getBucketId();
+ }
+
+ @Override
+ default int getNumCorePartitions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // The below methods are used on the query side, and so must not be called for this shardSpec.
+
+ @JsonIgnore
+ @Override
+ default List getDomainDimensions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default boolean possibleInDomain(Map> domain)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default boolean isCompatible(Class extends ShardSpec> other)
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java
new file mode 100644
index 000000000000..fb896fc2ac84
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingHashBasedNumberedShardSpec.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * See {@link BuildingShardSpec} for how this class is used.
+ *
+ * @see HashBasedNumberedShardSpec
+ */
+public class BuildingHashBasedNumberedShardSpec implements BuildingShardSpec
+{
+ public static final String TYPE = "building_hashed";
+
+ private final int partitionId;
+ private final int bucketId;
+ private final int numBuckets;
+ private final List partitionDimensions;
+ private final ObjectMapper jsonMapper;
+
+ @JsonCreator
+ public BuildingHashBasedNumberedShardSpec(
+ @JsonProperty("partitionId") int partitionId,
+ @JsonProperty("bucketId") int bucketId,
+ @JsonProperty("numBuckets") int numBuckets,
+ @JsonProperty("partitionDimensions") @Nullable List partitionDimensions,
+ @JacksonInject ObjectMapper jsonMapper
+ )
+ {
+ this.partitionId = partitionId;
+ this.bucketId = bucketId;
+ this.numBuckets = numBuckets;
+ this.partitionDimensions = partitionDimensions == null
+ ? HashBasedNumberedShardSpec.DEFAULT_PARTITION_DIMENSIONS
+ : partitionDimensions;
+ this.jsonMapper = jsonMapper;
+ }
+
+ @JsonProperty("partitionId")
+ @Override
+ public int getPartitionNum()
+ {
+ return partitionId;
+ }
+
+ @Override
+ @JsonProperty
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
+ @JsonProperty
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ @JsonProperty
+ public List getPartitionDimensions()
+ {
+ return partitionDimensions;
+ }
+
+ @Override
+ public PartitionChunk createChunk(T obj)
+ {
+ // This method can be called in AppenderatorImpl to create a sinkTimeline.
+ // The sinkTimeline doesn't seem in use in batch ingestion, let's set 'chunks' to 0 for now.
+ // HashBasedNumberedShardSpec is using NumberedPartitionChunk, so we use it here too.
+ return new NumberedPartitionChunk<>(partitionId, 0, obj);
+ }
+
+ @Override
+ public HashBasedNumberedShardSpec convert(int numCorePartitions)
+ {
+ return new HashBasedNumberedShardSpec(
+ partitionId,
+ numCorePartitions,
+ bucketId,
+ numBuckets,
+ partitionDimensions,
+ jsonMapper
+ );
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BuildingHashBasedNumberedShardSpec that = (BuildingHashBasedNumberedShardSpec) o;
+ return partitionId == that.partitionId &&
+ bucketId == that.bucketId &&
+ numBuckets == that.numBuckets &&
+ Objects.equals(partitionDimensions, that.partitionDimensions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(partitionId, bucketId, numBuckets, partitionDimensions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BuildingHashBasedNumberedShardSpec{" +
+ "partitionId=" + partitionId +
+ ", bucketId=" + bucketId +
+ ", numBuckets=" + numBuckets +
+ ", partitionDimensions=" + partitionDimensions +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java
new file mode 100644
index 000000000000..a179d3ca7bac
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingNumberedShardSpec.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * See {@link BuildingShardSpec} for how this class is used.
+ *
+ * This shardSpec has only partitionId which is same as {@link LinearShardSpec}. The difference between
+ * them is this shardSpec should never be published and so never be used in other places such as Broker timeline.
+ *
+ * @see NumberedShardSpec
+ */
+public class BuildingNumberedShardSpec implements BuildingShardSpec
+{
+ public static final String TYPE = "building_numbered";
+
+ private final int partitionId;
+
+ @JsonCreator
+ public BuildingNumberedShardSpec(@JsonProperty("partitionId") int partitionId)
+ {
+ Preconditions.checkArgument(partitionId >= 0, "partitionId >= 0");
+ this.partitionId = partitionId;
+ }
+
+ @Override
+ public int getBucketId()
+ {
+ // This method is currently not called when the shardSpec type is this class.
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public NumberedShardSpec convert(int numTotalPartitions)
+ {
+ return new NumberedShardSpec(partitionId, numTotalPartitions);
+ }
+
+ @Override
+ public PartitionChunk createChunk(T obj)
+ {
+ // This method can be called in AppenderatorImpl to create a sinkTimeline.
+ // The sinkTimeline doesn't seem in use in batch ingestion, let's set 'chunks' to 0 for now.
+ return new NumberedPartitionChunk<>(partitionId, 0, obj);
+ }
+
+ @JsonProperty("partitionId")
+ @Override
+ public int getPartitionNum()
+ {
+ return partitionId;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BuildingNumberedShardSpec shardSpec = (BuildingNumberedShardSpec) o;
+ return partitionId == shardSpec.partitionId;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(partitionId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BuildingNumberedShardSpec{" +
+ "partitionId=" + partitionId +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java
new file mode 100644
index 000000000000..973fdf4d2a7d
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingShardSpec.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.RangeSet;
+import org.apache.druid.data.input.InputRow;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is one of the special shardSpecs which are temporarily used during batch ingestion. In Druid, there is a
+ * concept of core partition set which is a set of segments atomically becoming queryable together in Brokers. The core
+ * partition set is represented as a range of partitionIds, i.e., [0, {@link ShardSpec#getNumCorePartitions()}).
+ *
+ * In streaming ingestion, the core partition set size cannot be determined since it's impossible to know how many
+ * segments will be created per time chunk upfront. However, in batch ingestion with time chunk locking, the core
+ * partition set is the set of segments created by an initial task or an overwriting task. Since the core partition
+ * set is determined when the task publishes segments at the end, the task postpones creating proper {@link ShardSpec}
+ * until the end.
+ *
+ * This BuildingShardSpec is used for such use case. A non-appending batch task can use this shardSpec until it
+ * publishes segments at last. When it publishes segments, it should convert the buildingShardSpec of those segments
+ * to a proper shardSpec type {@link T}. See {@code SegmentPublisherHelper#annotateShardSpec} for converting shardSpec.
+ * Note that, when the segment lock is used, the Overlord coordinates the segment allocation and this class is never
+ * used. Instead, the task sends {@link PartialShardSpec} to the Overlord to allocate a new segment. The result segment
+ * could have either a {@link ShardSpec} (for root generation segments) or an {@link OverwriteShardSpec} (for non-root
+ * generation segments).
+ *
+ * This class should be Jackson-serializable as the subtasks can send it to the parallel task in parallel ingestion.
+ *
+ * This interface doesn't really have to extend {@link ShardSpec}. The only reason is the ShardSpec is used in many
+ * places such as {@link org.apache.druid.timeline.DataSegment}, and we have to modify those places to allow other
+ * types than ShardSpec which seems pretty invasive. Maybe we could clean up this mess someday in the future.
+ *
+ * @see BucketNumberedShardSpec
+ */
+public interface BuildingShardSpec extends ShardSpec
+{
+ int getBucketId();
+
+ T convert(int numCorePartitions);
+
+ @Override
+ default int getNumCorePartitions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@link BucketNumberedShardSpec} should be used for shard spec lookup.
+ */
+ @Override
+ default ShardSpecLookup getLookup(List extends ShardSpec> shardSpecs)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ // The below methods are used on the query side, and so must not be called for this shardSpec.
+
+ @Override
+ default boolean isInChunk(long timestamp, InputRow inputRow)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @JsonIgnore
+ @Override
+ default List getDomainDimensions()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default boolean possibleInDomain(Map> domain)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default boolean isCompatible(Class extends ShardSpec> other)
+ {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpec.java
new file mode 100644
index 000000000000..6dd099205447
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/timeline/partition/BuildingSingleDimensionShardSpec.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * See {@link BuildingShardSpec} for how this class is used.
+ *
+ * @see SingleDimensionShardSpec
+ */
+public class BuildingSingleDimensionShardSpec implements BuildingShardSpec
+{
+ public static final String TYPE = "building_single_dim";
+
+ private final int bucketId;
+ private final String dimension;
+ @Nullable
+ private final String start;
+ @Nullable
+ private final String end;
+ private final int partitionId;
+
+ @JsonCreator
+ public BuildingSingleDimensionShardSpec(
+ @JsonProperty("bucketId") int bucketId,
+ @JsonProperty("dimension") String dimension,
+ @JsonProperty("start") @Nullable String start,
+ @JsonProperty("end") @Nullable String end,
+ @JsonProperty("partitionNum") int partitionNum
+ )
+ {
+ this.bucketId = bucketId;
+ this.dimension = dimension;
+ this.start = start;
+ this.end = end;
+ this.partitionId = partitionNum;
+ }
+
+ @JsonProperty("dimension")
+ public String getDimension()
+ {
+ return dimension;
+ }
+
+ @Nullable
+ @JsonProperty("start")
+ public String getStart()
+ {
+ return start;
+ }
+
+ @Nullable
+ @JsonProperty("end")
+ public String getEnd()
+ {
+ return end;
+ }
+
+ @Override
+ @JsonProperty("partitionNum")
+ public int getPartitionNum()
+ {
+ return partitionId;
+ }
+
+ @Override
+ @JsonProperty("bucketId")
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
+ @Override
+ public SingleDimensionShardSpec convert(int numCorePartitions)
+ {
+ return new SingleDimensionShardSpec(dimension, start, end, partitionId, numCorePartitions);
+ }
+
+ @Override
+ public PartitionChunk createChunk(T obj)
+ {
+ return new NumberedPartitionChunk<>(partitionId, 0, obj);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BuildingSingleDimensionShardSpec that = (BuildingSingleDimensionShardSpec) o;
+ return bucketId == that.bucketId &&
+ partitionId == that.partitionId &&
+ Objects.equals(dimension, that.dimension) &&
+ Objects.equals(start, that.start) &&
+ Objects.equals(end, that.end);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bucketId, dimension, start, end, partitionId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "BuildingSingleDimensionShardSpec{" +
+ "bucketId=" + bucketId +
+ ", dimension='" + dimension + '\'' +
+ ", start='" + start + '\'' +
+ ", end='" + end + '\'' +
+ ", partitionNum=" + partitionId +
+ '}';
+ }
+}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java
index 495a85284bb3..0e32ee04bcf1 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedPartialShardSpec.java
@@ -33,15 +33,18 @@ public class HashBasedNumberedPartialShardSpec implements PartialShardSpec
@Nullable
private final List partitionDimensions;
+ private final int bucketId;
private final int numBuckets;
@JsonCreator
public HashBasedNumberedPartialShardSpec(
@JsonProperty("partitionDimensions") @Nullable List partitionDimensions,
+ @JsonProperty("bucketId") int bucketId,
@JsonProperty("numPartitions") int numBuckets
)
{
this.partitionDimensions = partitionDimensions;
+ this.bucketId = bucketId;
this.numBuckets = numBuckets;
}
@@ -52,6 +55,12 @@ public List getPartitionDimensions()
return partitionDimensions;
}
+ @JsonProperty
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
@JsonProperty("numPartitions")
public int getNumBuckets()
{
@@ -61,9 +70,16 @@ public int getNumBuckets()
@Override
public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfPreviousMaxPartitionId)
{
- final HashBasedNumberedShardSpec prevSpec = (HashBasedNumberedShardSpec) specOfPreviousMaxPartitionId;
+ // The shardSpec is created by the Overlord.
+ // For batch tasks, this code can be executed only with segment locking (forceTimeChunkLock = false).
+ // In this mode, you can have 2 or more tasks concurrently ingesting into the same time chunk of
+ // the same datasource. Since there is no restriction for those tasks in segment allocation, the
+ // allocated IDs for each task can interleave. As a result, the core partition set cannot be
+ // represented as a range. We always set 0 for the core partition set size if this is an initial segment.
return new HashBasedNumberedShardSpec(
- prevSpec == null ? 0 : prevSpec.getPartitionNum() + 1,
+ specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getPartitionNum() + 1,
+ specOfPreviousMaxPartitionId == null ? 0 : specOfPreviousMaxPartitionId.getNumCorePartitions(),
+ bucketId,
numBuckets,
partitionDimensions,
objectMapper
@@ -73,7 +89,7 @@ public ShardSpec complete(ObjectMapper objectMapper, @Nullable ShardSpec specOfP
@Override
public ShardSpec complete(ObjectMapper objectMapper, int partitionId)
{
- return new HashBasedNumberedShardSpec(partitionId, numBuckets, partitionDimensions, objectMapper);
+ return new HashBasedNumberedShardSpec(partitionId, 0, bucketId, numBuckets, partitionDimensions, objectMapper);
}
@Override
@@ -92,13 +108,14 @@ public boolean equals(Object o)
return false;
}
HashBasedNumberedPartialShardSpec that = (HashBasedNumberedPartialShardSpec) o;
- return numBuckets == that.numBuckets &&
+ return bucketId == that.bucketId &&
+ numBuckets == that.numBuckets &&
Objects.equals(partitionDimensions, that.partitionDimensions);
}
@Override
public int hashCode()
{
- return Objects.hash(partitionDimensions, numBuckets);
+ return Objects.hash(partitionDimensions, bucketId, numBuckets);
}
}
diff --git a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
index a03fddceaf3a..23cdb4ef3868 100644
--- a/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
+++ b/core/src/main/java/org/apache/druid/timeline/partition/HashBasedNumberedShardSpec.java
@@ -35,29 +35,55 @@
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Objects;
public class HashBasedNumberedShardSpec extends NumberedShardSpec
{
+ static final List DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
+
private static final HashFunction HASH_FUNCTION = Hashing.murmur3_32();
- private static final List DEFAULT_PARTITION_DIMENSIONS = ImmutableList.of();
+ private final int bucketId;
+ /**
+ * Number of hash buckets
+ */
+ private final int numBuckets;
private final ObjectMapper jsonMapper;
@JsonIgnore
private final List partitionDimensions;
@JsonCreator
public HashBasedNumberedShardSpec(
- @JsonProperty("partitionNum") int partitionNum, // partitionId
- @JsonProperty("partitions") int partitions, // # of partitions
+ @JsonProperty("partitionNum") int partitionNum, // partitionId, hash bucketId
+ @JsonProperty("partitions") int partitions, // core partition set size
+ @JsonProperty("bucketId") @Nullable Integer bucketId, // nullable for backward compatibility
+ @JsonProperty("numBuckets") @Nullable Integer numBuckets, // nullable for backward compatibility
@JsonProperty("partitionDimensions") @Nullable List partitionDimensions,
@JacksonInject ObjectMapper jsonMapper
)
{
super(partitionNum, partitions);
+ // Use partitionId as bucketId if it's missing.
+ this.bucketId = bucketId == null ? partitionNum : bucketId;
+ // If numBuckets is missing, assume that any hash bucket is not empty.
+ // Use the core partition set size as the number of buckets.
+ this.numBuckets = numBuckets == null ? partitions : numBuckets;
this.jsonMapper = jsonMapper;
this.partitionDimensions = partitionDimensions == null ? DEFAULT_PARTITION_DIMENSIONS : partitionDimensions;
}
+ @JsonProperty
+ public int getBucketId()
+ {
+ return bucketId;
+ }
+
+ @JsonProperty
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
@JsonProperty("partitionDimensions")
public List getPartitionDimensions()
{
@@ -73,12 +99,27 @@ public boolean isCompatible(Class extends ShardSpec> other)
@Override
public boolean isInChunk(long timestamp, InputRow inputRow)
{
- return (((long) hash(timestamp, inputRow)) - getPartitionNum()) % getPartitions() == 0;
+ return (((long) hash(timestamp, inputRow)) - bucketId) % numBuckets == 0;
}
+ /**
+ * This method calculates the hash based on whether {@param partitionDimensions} is null or not.
+ * If yes, then both {@param timestamp} and dimension columns in {@param inputRow} are used {@link Rows#toGroupKey}
+ * Or else, columns in {@param partitionDimensions} are used
+ *
+ * @param timestamp should be bucketed with query granularity
+ * @param inputRow row from input data
+ *
+ * @return hash value
+ */
protected int hash(long timestamp, InputRow inputRow)
{
- final List