Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-sql</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.formatString`|Realtime and historical nodes use this format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers available for merging query results. The buffers are sized by `druid.processing.buffer.sizeBytes`. This property is effectively a concurrency limit for queries that require merging buffers. If you are using any queries that require merge buffers (currently, just groupBy v2) then you should have at least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have available for parallel processing of segments. Our rule of thumb is `num_cores - 1`, which means that even under heavy load there will still be one core available to do background tasks like talking with ZooKeeper and pulling down segments. If only one core is available, this property defaults to the value `1`.|Number of cores - 1 (or 1)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default. Enabling the lookup cache can significantly improve the performance of aggregators operating on dimension values, such as the JavaScript aggregator, or cardinality aggregator, but can slow things down if the cache hit rate is low (i.e. dimensions with few repeating values). Enabling it may also require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.columnCache.sizeBytes`|Maximum size in bytes for the dimension value lookup cache. Any value greater than `0` enables the cache. It is currently disabled by default, since most query features that would benefit from it already implement their own caching (with the notable exception of the JavaScript aggregator). Enabling this may require additional garbage collection tuning to avoid long GC pauses.|`0` (disabled)|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal priority in a FIFO manner|`false`|
|`druid.processing.tmpDir`|Path where temporary files created while processing a query should be stored. If specified, this configuration takes priority over the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ static void hashRow(
}

ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> selectorPlus = selectorPluses[k];
selectorPlus.getColumnSelectorStrategy().hashRow(selectorPlus.getSelector(), hasher);
selectorPlus.getColumnSelectorStrategy().hashRow(hasher);
}
collector.add(hasher.hash().asBytes());
}
Expand All @@ -61,7 +61,7 @@ static void hashValues(
)
{
for (final ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy> selectorPlus : selectorPluses) {
selectorPlus.getColumnSelectorStrategy().hashValues(selectorPlus.getSelector(), collector);
selectorPlus.getColumnSelectorStrategy().hashValues(collector);
}
}

Expand All @@ -75,7 +75,7 @@ static void hashValues(
boolean byRow
)
{
this(name, selectorPlusList.toArray(new ColumnSelectorPlus[] {}), byRow);
this(name, selectorPlusList.toArray(new ColumnSelectorPlus[]{}), byRow);
}

CardinalityAggregator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,19 @@
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.dimension.ColumnSelectorStrategy;

public interface CardinalityAggregatorColumnSelectorStrategy<ValueSelectorType> extends ColumnSelectorStrategy
public interface CardinalityAggregatorColumnSelectorStrategy extends ColumnSelectorStrategy
{
/***
* Retrieve the current row from dimSelector and add the row values to the hasher.
*
* @param dimSelector Dimension value selector
* @param hasher Hasher used for cardinality aggregator calculations
*/
void hashRow(ValueSelectorType dimSelector, Hasher hasher);

void hashRow(Hasher hasher);

/**
* Retrieve the current row from dimSelector and add the row values to HyperLogLogCollector.
*
* @param dimSelector Dimension value selector
* @param collector HLL collector used for cardinality aggregator calculations
*/
void hashValues(ValueSelectorType dimSelector, HyperLogLogCollector collector);
void hashValues(HyperLogLogCollector collector);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.druid.java.util.common.IAE;
import io.druid.query.dimension.ColumnSelectorStrategyFactory;
import io.druid.segment.ColumnValueSelector;
import io.druid.segment.DimensionSelector;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ValueType;

Expand All @@ -30,19 +31,21 @@ public class CardinalityAggregatorColumnSelectorStrategyFactory
{
@Override
public CardinalityAggregatorColumnSelectorStrategy makeColumnSelectorStrategy(
ColumnCapabilities capabilities, ColumnValueSelector selector
ColumnCapabilities capabilities,
ColumnValueSelector selector,
int numRows
)
{
ValueType type = capabilities.getType();
switch (type) {
case STRING:
return new StringCardinalityAggregatorColumnSelectorStrategy();
return new StringCardinalityAggregatorColumnSelectorStrategy((DimensionSelector) selector, numRows);
case LONG:
return new LongCardinalityAggregatorColumnSelectorStrategy();
return new LongCardinalityAggregatorColumnSelectorStrategy(selector);
case FLOAT:
return new FloatCardinalityAggregatorColumnSelectorStrategy();
return new FloatCardinalityAggregatorColumnSelectorStrategy(selector);
case DOUBLE:
return new DoubleCardinalityAggregatorColumnSelectorStrategy();
return new DoubleCardinalityAggregatorColumnSelectorStrategy(selector);
default:
throw new IAE("Cannot create query type helper from invalid type [%s]", type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,24 @@
import io.druid.segment.BaseDoubleColumnValueSelector;


public class DoubleCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseDoubleColumnValueSelector>
public class DoubleCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy
{
private final BaseDoubleColumnValueSelector selector;

public DoubleCardinalityAggregatorColumnSelectorStrategy(final BaseDoubleColumnValueSelector selector)
{
this.selector = selector;
}

@Override
public void hashRow(BaseDoubleColumnValueSelector dimSelector, Hasher hasher)
public void hashRow(Hasher hasher)
{
hasher.putDouble(dimSelector.getDouble());
hasher.putDouble(selector.getDouble());
}

@Override
public void hashValues(BaseDoubleColumnValueSelector dimSelector, HyperLogLogCollector collector)
public void hashValues(HyperLogLogCollector collector)
{
collector.add(CardinalityAggregator.hashFn.hashLong(Double.doubleToLongBits(dimSelector.getDouble())).asBytes());
collector.add(CardinalityAggregator.hashFn.hashLong(Double.doubleToLongBits(selector.getDouble())).asBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,23 @@
import io.druid.query.aggregation.cardinality.CardinalityAggregator;
import io.druid.segment.BaseFloatColumnValueSelector;

public class FloatCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseFloatColumnValueSelector>
public class FloatCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy
{
private final BaseFloatColumnValueSelector selector;

public FloatCardinalityAggregatorColumnSelectorStrategy(final BaseFloatColumnValueSelector selector)
{
this.selector = selector;
}

@Override
public void hashRow(BaseFloatColumnValueSelector selector, Hasher hasher)
public void hashRow(Hasher hasher)
{
hasher.putFloat(selector.getFloat());
}

@Override
public void hashValues(BaseFloatColumnValueSelector selector, HyperLogLogCollector collector)
public void hashValues(HyperLogLogCollector collector)
{
collector.add(CardinalityAggregator.hashFn.hashInt(Float.floatToIntBits(selector.getFloat())).asBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@
import io.druid.query.aggregation.cardinality.CardinalityAggregator;
import io.druid.segment.BaseLongColumnValueSelector;

public class LongCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseLongColumnValueSelector>
public class LongCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy
{
private BaseLongColumnValueSelector selector;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Field could be final


public LongCardinalityAggregatorColumnSelectorStrategy(final BaseLongColumnValueSelector selector)
{
this.selector = selector;
}

@Override
public void hashRow(BaseLongColumnValueSelector dimSelector, Hasher hasher)
public void hashRow(Hasher hasher)
{
hasher.putLong(dimSelector.getLong());
hasher.putLong(selector.getLong());
}

@Override
public void hashValues(BaseLongColumnValueSelector dimSelector, HyperLogLogCollector collector)
public void hashValues(HyperLogLogCollector collector)
{
collector.add(CardinalityAggregator.hashFn.hashLong(dimSelector.getLong()).asBytes());
collector.add(CardinalityAggregator.hashFn.hashLong(selector.getLong()).asBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,42 @@
import io.druid.hll.HyperLogLogCollector;
import io.druid.query.aggregation.cardinality.CardinalityAggregator;
import io.druid.segment.DimensionSelector;
import io.druid.segment.DimensionSelectorUtils;
import io.druid.segment.data.IndexedInts;

import java.util.Arrays;
import java.util.function.IntFunction;

public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy<DimensionSelector>
public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy
{
public static final String CARDINALITY_AGG_NULL_STRING = "\u0000";
public static final char CARDINALITY_AGG_SEPARATOR = '\u0001';
private static final String CARDINALITY_AGG_NULL_STRING = "\u0000";
private static final char CARDINALITY_AGG_SEPARATOR = '\u0001';

// Number of entries to cache. Each one is a 128 bit hash, so with overhead, 12500 entries occupies about 250KB
private static final int CACHE_SIZE = 12500;

private final DimensionSelector selector;
private final IntFunction<byte[]> hashFunction;

public StringCardinalityAggregatorColumnSelectorStrategy(final DimensionSelector selector, final int numRows)
{
this.selector = selector;
this.hashFunction = DimensionSelectorUtils.cacheIfPossible(selector, this::hashOneValue, numRows, CACHE_SIZE);
}

@Override
public void hashRow(DimensionSelector dimSelector, Hasher hasher)
public void hashRow(final Hasher hasher)
{
final IndexedInts row = dimSelector.getRow();
final IndexedInts row = selector.getRow();
final int size = row.size();
// nothing to add to hasher if size == 0, only handle size == 1 and size != 0 cases.
if (size == 1) {
final String value = dimSelector.lookupName(row.get(0));
final String value = selector.lookupName(row.get(0));
hasher.putUnencodedChars(nullToSpecial(value));
} else if (size != 0) {
final String[] values = new String[size];
for (int i = 0; i < size; ++i) {
final String value = dimSelector.lookupName(row.get(i));
final String value = selector.lookupName(row.get(i));
values[i] = nullToSpecial(value);
}
// Values need to be sorted to ensure consistent multi-value ordering across different segments
Expand All @@ -59,17 +73,21 @@ public void hashRow(DimensionSelector dimSelector, Hasher hasher)
}

@Override
public void hashValues(DimensionSelector dimSelector, HyperLogLogCollector collector)
public void hashValues(final HyperLogLogCollector collector)
{
IndexedInts row = dimSelector.getRow();
IndexedInts row = selector.getRow();
for (int i = 0; i < row.size(); i++) {
int index = row.get(i);
final String value = dimSelector.lookupName(index);
collector.add(CardinalityAggregator.hashFn.hashUnencodedChars(nullToSpecial(value)).asBytes());
collector.add(hashFunction.apply(row.get(i)));
}
}

private String nullToSpecial(String value)
private byte[] hashOneValue(final int id)
{
final String value = selector.lookupName(id);
return CardinalityAggregator.hashFn.hashUnencodedChars(nullToSpecial(value)).asBytes();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better the field called HASH_FN

}

private static String nullToSpecial(String value)
{
return value == null ? CARDINALITY_AGG_NULL_STRING : value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query.dimension;

import java.util.function.IntFunction;

/**
* Array cache for an IntFunction, intended for use with DimensionSelectors.
*
* @see io.druid.segment.DimensionSelectorUtils#cacheIfPossible
*/
public class ArrayCacheIntFunction<T> implements IntFunction<T>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a note that this class is unsafe for concurrent use.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just call it "ArrayDimensionCache" (and "LruDimensionCache") with a method like "getOrCompute()", and then return method reference in DimensionSelectorUtils.cacheIfPossible()

{
private final IntFunction<T> function;
private final Object[] cache;

public ArrayCacheIntFunction(final IntFunction<T> function, final int cacheSize)
{
this.function = function;
this.cache = new Object[cacheSize];
}

@Override
public T apply(final int id)
{
// Will not cache the result if "function" returns null. I'm hoping that this is the right choice, and enabling
// null caching isn't worth the overhead of using some additional data structures to differentiate between a null
// result and an uncached result.

if (cache[id] == null) {
final T value = function.apply(id);
cache[id] = value;
return value;
} else {
//noinspection unchecked
return (T) cache[id];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@

public interface ColumnSelectorStrategyFactory<ColumnSelectorStrategyClass extends ColumnSelectorStrategy>
{
ColumnSelectorStrategyClass makeColumnSelectorStrategy(ColumnCapabilities capabilities, ColumnValueSelector selector);
ColumnSelectorStrategyClass makeColumnSelectorStrategy(
ColumnCapabilities capabilities,
ColumnValueSelector selector,
int numRows
);
}
Loading