Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.data.input.InputRow;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -362,9 +363,9 @@ public int getBufferGrouperInitialBuckets()
}

@Override
public long getMaxOnDiskStorage()
public HumanReadableBytes getMaxOnDiskStorage()
{
return 1_000_000_000L;
return HumanReadableBytes.valueOf(1_000_000_000L);
}
};
config.setSingleThreaded(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
Expand Down Expand Up @@ -477,9 +478,9 @@ public int getBufferGrouperInitialBuckets()
}

@Override
public long getMaxOnDiskStorage()
public HumanReadableBytes getMaxOnDiskStorage()
{
return 1_000_000_000L;
return HumanReadableBytes.valueOf(1_000_000_000L);
}
};
config.setSingleThreaded(false);
Expand Down
3 changes: 2 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2079,14 +2079,15 @@ Supported runtime properties:
|`druid.query.groupBy.maxSelectorDictionarySize`|Maximum amount of heap space (approximately) to use for per-segment string dictionaries. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxMergingDictionarySize`|Maximum amount of heap space (approximately) to use for per-query string dictionaries. When the dictionary exceeds this size, a spill to disk will be triggered. See [groupBy memory tuning and resource limits](../querying/groupbyquery.md#memory-tuning-and-resource-limits) for details.|100000000|
|`druid.query.groupBy.maxOnDiskStorage`|Maximum amount of disk space to use, per-query, for spilling result sets to disk when either the merging buffer or the dictionary fills up. Queries that exceed this limit will fail. Set to zero to disable disk spilling.|0 (disabled)|
|`druid.query.groupBy.defaultOnDiskStorage`|Default amount of disk space to use, per-query, for spilling the result sets to disk when either the merging buffer or the dictionary fills up. Set to zero to disable disk spilling for queries which don't override `maxOnDiskStorage` in their context.|`druid.query.groupBy.maxOnDiskStorage`|

Supported query contexts:

|Key|Description|
|---|-----------|
|`maxSelectorDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxMergingDictionarySize`|Can be used to lower the value of `druid.query.groupBy.maxMergingDictionarySize` for this query.|
|`maxOnDiskStorage`|Can be used to lower the value of `druid.query.groupBy.maxOnDiskStorage` for this query.|
|`maxOnDiskStorage`|Can be used to set `maxOnDiskStorage` to a value between 0 and `druid.query.groupBy.maxOnDiskStorage` for this query. If this query context override exceeds `druid.query.groupBy.maxOnDiskStorage`, the query will use `druid.query.groupBy.maxOnDiskStorage`. Omitting this from the query context will cause the query to use `druid.query.groupBy.defaultOnDiskStorage` for `maxOnDiskStorage`|


### Advanced configurations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
Expand Down Expand Up @@ -170,6 +171,12 @@ public boolean getContextBoolean(String key, boolean defaultValue)
return query.getContextBoolean(key, defaultValue);
}

@Override
public HumanReadableBytes getContextHumanReadableBytes(String key, HumanReadableBytes defaultValue)
{
return query.getContextHumanReadableBytes(key, defaultValue);
}

@Override
public boolean isDescending()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerTestHelper;
Expand Down Expand Up @@ -89,4 +91,37 @@ public void testQuerySerialization() throws IOException
Assert.assertEquals(QueryRunnerTestHelper.ALL_GRAN, query.getGranularity());
Assert.assertEquals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC.getIntervals(), query.getIntervals());
}

@Test
public void testGetContextHumanReadableBytes()
{
TopNQuery topNQuery = new TopNQueryBuilder()
.dataSource(QueryRunnerTestHelper.DATA_SOURCE)
.granularity(QueryRunnerTestHelper.ALL_GRAN)
.dimension(QueryRunnerTestHelper.MARKET_DIMENSION)
.metric(QueryRunnerTestHelper.INDEX_METRIC)
.threshold(4)
.intervals(QueryRunnerTestHelper.FULL_ON_INTERVAL_SPEC)
.aggregators(
Lists.newArrayList(
Iterables.concat(
QueryRunnerTestHelper.COMMON_DOUBLE_AGGREGATORS,
Lists.newArrayList(
new DoubleMaxAggregatorFactory("maxIndex", "index"),
new DoubleMinAggregatorFactory("minIndex", "index")
)
)
)
)
.context(
ImmutableMap.of(
"maxOnDiskStorage", "20M"
)
)
.postAggregators(QueryRunnerTestHelper.ADD_ROWS_INDEX_CONSTANT)
.build();
MaterializedViewQuery query = new MaterializedViewQuery(topNQuery, optimizer);
Assert.assertEquals(20_000_000, query.getContextHumanReadableBytes("maxOnDiskStorage", HumanReadableBytes.ZERO).getBytes());

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
Expand Down Expand Up @@ -199,6 +200,12 @@ public boolean getContextBoolean(String key, boolean defaultValue)
return context.getAsBoolean(key, defaultValue);
}

@Override
public HumanReadableBytes getContextHumanReadableBytes(String key, HumanReadableBytes defaultValue)
{
return context.getAsHumanReadableBytes(key, defaultValue);
}

/**
* @deprecated use {@link #computeOverriddenContext(Map, Map) computeOverriddenContext(getContext(), overrides))}
* instead. This method may be removed in the next minor or major version of Druid.
Expand Down
19 changes: 19 additions & 0 deletions processing/src/main/java/org/apache/druid/query/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Ordering;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.datasourcemetadata.DataSourceMetadataQuery;
import org.apache.druid.query.filter.DimFilter;
Expand Down Expand Up @@ -129,6 +130,24 @@ default QueryContext getQueryContext()

boolean getContextBoolean(String key, boolean defaultValue);

/**
* Returns {@link HumanReadableBytes} for a specified context key. If the context is null or the key doesn't exist
* a caller specified default value is returned. A default implementation is provided since Query is an extension
* point. Extensions can choose to rely on this default to retain compatibility with core Druid.
*
* @param key The context key value being looked up
* @param defaultValue The default to return if the key value doesn't exist or the context is null.
* @return {@link HumanReadableBytes}
*/
default HumanReadableBytes getContextHumanReadableBytes(String key, HumanReadableBytes defaultValue)
{
if (null != getQueryContext()) {
return getQueryContext().getAsHumanReadableBytes(key, defaultValue);
} else {
return defaultValue;
}
}

boolean isDescending();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.query;

import org.apache.druid.java.util.common.HumanReadableBytes;

import javax.annotation.Nullable;

import java.util.Collections;
Expand Down Expand Up @@ -176,6 +178,11 @@ public long getAsLong(final String parameter, final long defaultValue)
return QueryContexts.getAsLong(parameter, get(parameter), defaultValue);
}

public HumanReadableBytes getAsHumanReadableBytes(final String parameter, final HumanReadableBytes defaultValue)
{
return QueryContexts.getAsHumanReadableBytes(parameter, get(parameter), defaultValue);
}

public Map<String, Object> getMergedParams()
{
if (mergedParams == null) {
Expand Down
18 changes: 18 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Numbers;
Expand Down Expand Up @@ -568,6 +569,23 @@ public static long getAsLong(
}
}

public static HumanReadableBytes getAsHumanReadableBytes(
final String parameter,
final Object value,
final HumanReadableBytes defaultValue
)
{
if (null == value) {
return defaultValue;
} else if (value instanceof Number) {
return HumanReadableBytes.valueOf(Numbers.parseLong(value));
} else if (value instanceof String) {
return new HumanReadableBytes((String) value);
} else {
throw new IAE("Expected parameter [%s] to be in human readable format", parameter);
}
}

public static Map<String, Object> override(
final Map<String, Object> context,
final Map<String, Object> overrides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
Expand All @@ -31,6 +32,8 @@
*/
public class GroupByQueryConfig
{
private static final Logger logger = new Logger(GroupByQueryConfig.class);

public static final long AUTOMATIC = 0;

public static final String CTX_KEY_STRATEGY = "groupByStrategy";
Expand Down Expand Up @@ -100,7 +103,10 @@ public class GroupByQueryConfig

@JsonProperty
// Max on-disk temporary storage, per-query; when exceeded, the query fails
private long maxOnDiskStorage = 0L;
private HumanReadableBytes maxOnDiskStorage = HumanReadableBytes.valueOf(0);

@JsonProperty
private HumanReadableBytes defaultOnDiskStorage = HumanReadableBytes.valueOf(-1);

@JsonProperty
private boolean forcePushDownLimit = false;
Expand Down Expand Up @@ -258,11 +264,24 @@ public long getActualMaxMergingDictionarySize(final DruidProcessingConfig proces
);
}

public long getMaxOnDiskStorage()
public HumanReadableBytes getMaxOnDiskStorage()
{
return maxOnDiskStorage;
}

/**
* Mirror maxOnDiskStorage if defaultOnDiskStorage's default is not overridden by cluster operator.
*
* This mirroring is done to maintain continuity in behavior between Druid versions. If an operator wants to use
* defaultOnDiskStorage, they have to explicitly override it.
*
* @return The working value for defaultOnDiskStorage
*/
public HumanReadableBytes getDefaultOnDiskStorage()
{
return defaultOnDiskStorage.getBytes() < 0L ? getMaxOnDiskStorage() : defaultOnDiskStorage;
}

public boolean isForcePushDownLimit()
{
return forcePushDownLimit;
Expand Down Expand Up @@ -338,9 +357,14 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
CTX_KEY_BUFFER_GROUPER_INITIAL_BUCKETS,
getBufferGrouperInitialBuckets()
);
newConfig.maxOnDiskStorage = Math.min(
((Number) query.getContextValue(CTX_KEY_MAX_ON_DISK_STORAGE, getMaxOnDiskStorage())).longValue(),
getMaxOnDiskStorage()
// If the client overrides do not provide "maxOnDiskStorage" context key, the server side "defaultOnDiskStorage"
// value is used in the calculation of the newConfig value of maxOnDiskStorage. This allows the operator to
// choose a default value lower than the max allowed when the context key is missing in the client query.
newConfig.maxOnDiskStorage = HumanReadableBytes.valueOf(
Math.min(
query.getContextHumanReadableBytes(CTX_KEY_MAX_ON_DISK_STORAGE, getDefaultOnDiskStorage()).getBytes(),
getMaxOnDiskStorage().getBytes()
)
);
newConfig.maxSelectorDictionarySize = maxSelectorDictionarySize; // No overrides
newConfig.maxMergingDictionarySize = maxMergingDictionarySize; // No overrides
Expand Down Expand Up @@ -368,6 +392,8 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
CTX_KEY_ENABLE_MULTI_VALUE_UNNESTING,
isMultiValueUnnestingEnabled()
);

logger.debug("Override config for GroupBy query %s - %s", query.getId(), newConfig.toString());
return newConfig;
}

Expand All @@ -383,7 +409,8 @@ public String toString()
", bufferGrouperMaxLoadFactor=" + bufferGrouperMaxLoadFactor +
", bufferGrouperInitialBuckets=" + bufferGrouperInitialBuckets +
", maxMergingDictionarySize=" + maxMergingDictionarySize +
", maxOnDiskStorage=" + maxOnDiskStorage +
", maxOnDiskStorage=" + maxOnDiskStorage.getBytes() +
", defaultOnDiskStorage=" + getDefaultOnDiskStorage().getBytes() + // use the getter because of special behavior for mirroring maxOnDiskStorage if defaultOnDiskStorage not explicitly set.
", forcePushDownLimit=" + forcePushDownLimit +
", forceHashAggregation=" + forceHashAggregation +
", intermediateCombineDegree=" + intermediateCombineDegree +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public CloseableGrouperIterator<RowBasedKey, ResultRow> make()
try {
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
temporaryStorageDirectory,
querySpecificConfig.getMaxOnDiskStorage()
querySpecificConfig.getMaxOnDiskStorage().getBytes()
);
final ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder =
ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static ResultSupplier process(

final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
temporaryStorageDirectory,
querySpecificConfig.getMaxOnDiskStorage()
querySpecificConfig.getMaxOnDiskStorage().getBytes()
);

closeOnExit.register(temporaryStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
Expand Down Expand Up @@ -134,6 +135,12 @@ public boolean getContextBoolean(String key, boolean defaultValue)
throw new RuntimeException(REMOVED_ERROR_MESSAGE);
}

@Override
public HumanReadableBytes getContextHumanReadableBytes(String key, HumanReadableBytes defaultValue)
{
throw new RuntimeException(REMOVED_ERROR_MESSAGE);
}

@Override
public boolean isDescending()
{
Expand Down
Loading