Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

package org.apache.druid.benchmark.datagen;

import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.column.ValueType;

import java.util.List;
Expand Down Expand Up @@ -144,6 +150,22 @@ public BenchmarkColumnValueGenerator makeGenerator(long seed)
return new BenchmarkColumnValueGenerator(this, seed);
}

public DimensionSchema getDimensionSchema()
{
switch (type) {
case LONG:
return new LongDimensionSchema(name);
case FLOAT:
return new FloatDimensionSchema(name);
case DOUBLE:
return new DoubleDimensionSchema(name);
case STRING:
return new StringDimensionSchema(name);
default:
throw new IAE("unable to make dimension schema for %s", type);
}
}

public String getName()
{
return name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ private Object convertType(Object input, ValueType type)
ret = Long.parseLong(input.toString());
}
break;
case DOUBLE:
if (input instanceof Number) {
ret = ((Number) input).doubleValue();
} else {
ret = Double.parseDouble(input.toString());
}
break;
case FLOAT:
if (input instanceof Number) {
ret = ((Number) input).floatValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.apache.druid.benchmark.datagen;

import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Interval;

import java.util.List;
import java.util.stream.Collectors;

public class BenchmarkSchemaInfo
{
Expand All @@ -49,6 +52,16 @@ public List<BenchmarkColumnSchema> getColumnSchemas()
return columnSchemas;
}

public DimensionsSpec getDimensionsSpec()
{
List<DimensionSchema> specs = getColumnSchemas().stream()
.filter(x -> !x.isMetric())
.map(BenchmarkColumnSchema::getDimensionSchema)
.collect(Collectors.toList());

return new DimensionsSpec(specs);
}

public List<AggregatorFactory> getAggs()
{
return aggs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,32 @@ public class BenchmarkSchemas
);
SCHEMA_MAP.put("rollo", rolloSchema);
}

static { // simple schema with null valued rows, no aggs on numeric columns
List<BenchmarkColumnSchema> simpleNullsSchemaColumns = ImmutableList.of(
// dims
BenchmarkColumnSchema.makeZipf("stringZipf", ValueType.STRING, false, 1, 0.5, 1, 101, 1.5),
BenchmarkColumnSchema.makeDiscreteUniform("stringUniform", ValueType.STRING, false, 1, 0.25, 1, 100000),
BenchmarkColumnSchema.makeSequential("stringSequentialHalfNull", ValueType.STRING, false, 1, 0.5, 0, 1000),

// metrics
BenchmarkColumnSchema.makeSequential("longSequential", ValueType.LONG, false, 1, 0.5, 0, 10000),
BenchmarkColumnSchema.makeDiscreteUniform("longUniform", ValueType.LONG, false, 1, 0.25, 0, 500),
BenchmarkColumnSchema.makeZipf("doubleZipf", ValueType.DOUBLE, false, 1, 0.25, 0, 1000, 2.0)
);

List<AggregatorFactory> simpleNullsSchemaIngestAggs = new ArrayList<>();
simpleNullsSchemaIngestAggs.add(new CountAggregatorFactory("rows"));

Interval simpleNullsSchemaDataInterval = Intervals.of("2000-01-01/P1D");

BenchmarkSchemaInfo basicSchema = new BenchmarkSchemaInfo(
simpleNullsSchemaColumns,
simpleNullsSchemaIngestAggs,
simpleNullsSchemaDataInterval,
false
);

SCHEMA_MAP.put("simple-nulls", basicSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,10 @@

package org.apache.druid.benchmark.datagen;

import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.DoubleDimensionSchema;
import org.apache.druid.data.input.impl.FloatDimensionSchema;
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
Expand Down Expand Up @@ -141,30 +134,8 @@ public QueryableIndex generate(
numRows
);

final List<DimensionSchema> dimensions = new ArrayList<>();
for (BenchmarkColumnSchema columnSchema : schemaInfo.getColumnSchemas()) {
if (schemaInfo.getAggs().stream().noneMatch(agg -> agg.getName().equals(columnSchema.getName()))) {
switch (columnSchema.getType()) {
case STRING:
dimensions.add(new StringDimensionSchema(columnSchema.getName()));
break;
case LONG:
dimensions.add(new LongDimensionSchema(columnSchema.getName()));
break;
case DOUBLE:
dimensions.add(new DoubleDimensionSchema(columnSchema.getName()));
break;
case FLOAT:
dimensions.add(new FloatDimensionSchema(columnSchema.getName()));
break;
default:
throw new ISE("Unhandleable type[%s]", columnSchema.getType());
}
}
}

final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withDimensionsSpec(new DimensionsSpec(dimensions, ImmutableList.of(), ImmutableList.of()))
.withDimensionsSpec(schemaInfo.getDimensionsSpec())
.withMetrics(schemaInfo.getAggsArray())
.withRollup(schemaInfo.isWithRollup())
.withQueryGranularity(granularity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.benchmark.query;

import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Supplier;
Expand All @@ -41,6 +42,7 @@
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.offheap.OffheapBufferGenerator;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.FinalizeResultsQueryRunner;
Expand All @@ -58,6 +60,7 @@
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
Expand Down Expand Up @@ -162,7 +165,11 @@ public class GroupByBenchmark
static {
JSON_MAPPER = new DefaultObjectMapper();
INDEX_IO = new IndexIO(
JSON_MAPPER,
JSON_MAPPER.setInjectableValues(
new InjectableValues.Std()
.addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE)
.addValue(ObjectMapper.class.getName(), JSON_MAPPER)
),
new ColumnConfig()
{
@Override
Expand Down Expand Up @@ -387,6 +394,34 @@ private void setupQueries()
simpleFloatQueries.put("A", queryA);
}
SCHEMA_QUERY_MAP.put("simpleFloat", simpleFloatQueries);

// simple one column schema, for testing performance difference between querying on numeric values as Strings and
// directly as longs
Map<String, GroupByQuery> nullQueries = new LinkedHashMap<>();
BenchmarkSchemaInfo nullSchema = BenchmarkSchemas.SCHEMA_MAP.get("simple-nulls");

{ // simple-null
QuerySegmentSpec intervalSpec = new MultipleIntervalSegmentSpec(Collections.singletonList(nullSchema.getDataInterval()));
List<AggregatorFactory> queryAggs = new ArrayList<>();
queryAggs.add(new DoubleSumAggregatorFactory(
"doubleSum",
"doubleZipf"
));
GroupByQuery queryA = GroupByQuery
.builder()
.setDataSource("blah")
.setQuerySegmentSpec(intervalSpec)
.setDimensions(new DefaultDimensionSpec("stringZipf", "stringZipf", ValueType.STRING))
.setAggregatorSpecs(
queryAggs
)
.setGranularity(Granularity.fromString(queryGranularity))
.setContext(ImmutableMap.of("vectorize", vectorize))
.build();

nullQueries.put("A", queryA);
}
SCHEMA_QUERY_MAP.put("simple-nulls", nullQueries);
}

@Setup(Level.Trial)
Expand Down Expand Up @@ -545,6 +580,7 @@ private IncrementalIndex makeIncIndex(boolean withRollup)
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withDimensionsSpec(schemaInfo.getDimensionsSpec())
.withMetrics(schemaInfo.getAggsArray())
.withRollup(withRollup)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,22 @@ public class NullHandling
* It does not take effect in all unit tests since we don't use Guice Injection.
*/
@Inject
private static NullValueHandlingConfig INSTANCE = new NullValueHandlingConfig(
Boolean.valueOf(System.getProperty(NULL_HANDLING_CONFIG_STRING, "true"))
);
private static NullValueHandlingConfig INSTANCE;

private static volatile Boolean isReplaceWithDefault = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure having a volatile static is a good idea for something that will be used by multiple threads while processing every row. Your benchmark checked single-threaded performance but this could degrade under multithreading.

Could you please either look into this, or do a hybrid of your last two approaches (require explicit initialization in tests, of a non-volatile static, and if-guard it in methods that use it).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussion, maybe the explicit initialization is better in order to rid magic from our world, closing this PR and re-opening #8876


/**
* whether nulls should be replaced with default value.
*/
public static boolean replaceWithDefault()
{
return INSTANCE.isUseDefaultValuesForNull();
// INSTANCE should only be null in unit tests, otherwise the null handling module will inject this value
if (isReplaceWithDefault == null) {
isReplaceWithDefault = INSTANCE != null
? INSTANCE.isUseDefaultValuesForNull()
: Boolean.parseBoolean(System.getProperty(NULL_HANDLING_CONFIG_STRING, "true"));
}
return isReplaceWithDefault;
}

public static boolean sqlCompatible()
Expand Down