Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/querying/timeseriesquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ An example timeseries query object is shown below:
"queryType": "timeseries",
"dataSource": "sample_datasource",
"granularity": "day",
"descending": "true",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this flag looks to be on the BaseQuery object, which means this code should work for all query types no?

"filter": {
"type": "and",
"fields": [
Expand Down Expand Up @@ -49,6 +50,7 @@ There are 7 main parts to a timeseries query:
|--------|-----------|---------|
|queryType|This String should always be "timeseries"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes|
|descending|Whether to make descending ordered result.|no|
|intervals|A JSON Object representing ISO-8601 Intervals. This defines the time ranges to run the query over.|yes|
|granularity|Defines the granularity to bucket query results. See [Granularities](../querying/granularities.html)|yes|
|filter|See [Filters](../querying/filters.html)|no|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public AsyncQueryRunner(QueryRunner<T> baseRunner, ExecutorService executor, Que
@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = query.getContextPriority(0);
final int priority = BaseQuery.getContextPriority(query, 0);
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override
Expand Down
140 changes: 78 additions & 62 deletions processing/src/main/java/io/druid/query/BaseQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.metamx.common.ISE;
import com.metamx.common.guava.Sequence;
import io.druid.query.spec.QuerySegmentSpec;
Expand All @@ -34,17 +35,74 @@

/**
*/
public abstract class BaseQuery<T> implements Query<T>
public abstract class BaseQuery<T extends Comparable<T>> implements Query<T>
{
public static <T> int getContextPriority(Query<T> query, int defaultValue)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all those method are marked as @deprecated in the Query interface, should we mark them deprecated here too?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all those methods in Query interface are changed to static method and that can be regarded as committing the deprecation, because it's not backward compatible anymore.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. We deprecated them because we planned avoid using string parsing going forward, but that may warrant a separate discussion. I'm fine leaving as is.

{
return parseInt(query, "priority", defaultValue);
}

public static <T> boolean getContextBySegment(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "bySegment", defaultValue);
}

public static <T> boolean getContextPopulateCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "populateCache", defaultValue);
}

public static <T> boolean getContextUseCache(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "useCache", defaultValue);
}

public static <T> boolean getContextFinalize(Query<T> query, boolean defaultValue)
{
return parseBoolean(query, "finalize", defaultValue);
}

private static <T> int parseInt(Query<T> query, String key, int defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Integer) {
return (int) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}

private static <T> boolean parseBoolean(Query<T> query, String key, boolean defaultValue)
{
Object val = query.getContextValue(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
}

public static final String QUERYID = "queryId";
private final DataSource dataSource;
private final boolean descending;
private final Map<String, Object> context;
private final QuerySegmentSpec querySegmentSpec;
private volatile Duration duration;

public BaseQuery(
DataSource dataSource,
QuerySegmentSpec querySegmentSpec,
boolean descending,
Map<String, Object> context
)
{
Expand All @@ -54,6 +112,7 @@ public BaseQuery(
this.dataSource = dataSource;
this.context = context;
this.querySegmentSpec = querySegmentSpec;
this.descending = descending;
}

@JsonProperty
Expand All @@ -63,6 +122,13 @@ public DataSource getDataSource()
return dataSource;
}

@JsonProperty
@Override
public boolean isDescending()
{
return descending;
}

@JsonProperty("intervals")
public QuerySegmentSpec getQuerySegmentSpec()
{
Expand Down Expand Up @@ -122,67 +188,6 @@ public <ContextType> ContextType getContextValue(String key, ContextType default
return retVal == null ? defaultValue : retVal;
}

@Override
public int getContextPriority(int defaultValue)
{
if (context == null) {
return defaultValue;
}
Object val = context.get("priority");
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Integer.parseInt((String) val);
} else if (val instanceof Integer) {
return (int) val;
} else {
throw new ISE("Unknown type [%s]", val.getClass());
}
}

@Override
public boolean getContextBySegment(boolean defaultValue)
{
return parseBoolean("bySegment", defaultValue);
}

@Override
public boolean getContextPopulateCache(boolean defaultValue)
{
return parseBoolean("populateCache", defaultValue);
}

@Override
public boolean getContextUseCache(boolean defaultValue)
{
return parseBoolean("useCache", defaultValue);
}

@Override
public boolean getContextFinalize(boolean defaultValue)
{
return parseBoolean("finalize", defaultValue);
}

private boolean parseBoolean(String key, boolean defaultValue)
{
if (context == null) {
return defaultValue;
}
Object val = context.get(key);
if (val == null) {
return defaultValue;
}
if (val instanceof String) {
return Boolean.parseBoolean((String) val);
} else if (val instanceof Boolean) {
return (boolean) val;
} else {
throw new ISE("Unknown type [%s]. Cannot parse!", val.getClass());
}
}

protected Map<String, Object> computeOverridenContext(Map<String, Object> overrides)
{
Map<String, Object> overridden = Maps.newTreeMap();
Expand All @@ -195,6 +200,13 @@ protected Map<String, Object> computeOverridenContext(Map<String, Object> overri
return overridden;
}

@Override
public Ordering<T> getResultOrdering()
{
Ordering<T> retVal = Ordering.natural();
return descending ? retVal.reverse() : retVal;
}

@Override
public String getId()
{
Expand All @@ -219,6 +231,9 @@ public boolean equals(Object o)

BaseQuery baseQuery = (BaseQuery) o;

if (descending != baseQuery.descending) {
return false;
}
if (context != null ? !context.equals(baseQuery.context) : baseQuery.context != null) {
return false;
}
Expand All @@ -241,6 +256,7 @@ public boolean equals(Object o)
public int hashCode()
{
int result = dataSource != null ? dataSource.hashCode() : 0;
result = 31 * result + (descending ? 1 : 0);
result = 31 * result + (context != null ? context.hashCode() : 0);
result = 31 * result + (querySegmentSpec != null ? querySegmentSpec.hashCode() : 0);
result = 31 * result + (duration != null ? duration.hashCode() : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public BySegmentQueryRunner(
@SuppressWarnings("unchecked")
public Sequence<T> run(final Query<T> query, Map<String, Object> responseContext)
{
if (query.getContextBySegment(false)) {
if (BaseQuery.getContextBySegment(query, false)) {
final Sequence<T> baseSequence = base.run(query, responseContext);
final List<T> results = Sequences.toList(baseSequence, Lists.<T>newArrayList());
return Sequences.simple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public BySegmentSkippingQueryRunner(
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
if (query.getContextBySegment(false)) {
if (BaseQuery.getContextBySegment(query, false)) {
return baseRunner.run(query, responseContext);
}

Expand Down
35 changes: 27 additions & 8 deletions processing/src/main/java/io/druid/query/CacheStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,39 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;

/**
*/
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
public byte[] computeCacheKey(QueryType query);
/**
* Computes the cache key for the given query
*
* @param query the query to compute a cache key for
* @return the cache key
*/
byte[] computeCacheKey(QueryType query);

public TypeReference<CacheType> getCacheObjectClazz();
/**
* Returns the class type of what is used in the cache
*
* @return Returns the class type of what is used in the cache
*/
TypeReference<CacheType> getCacheObjectClazz();

// Resultant function must be THREAD SAFE
public Function<T, CacheType> prepareForCache();
/**
* Returns a function that converts from the QueryType's result type to something cacheable.
*
* The resulting function must be thread-safe.
*
* @return a thread-safe function that converts the QueryType's result type into something cacheable
*/
Function<T, CacheType> prepareForCache();

public Function<CacheType, T> pullFromCache();

public Sequence<T> mergeSequences(Sequence<Sequence<T>> seqOfSequences);
/**
* A function that does the inverse of the operation that the function prepareForCache returns
*
* @return A function that does the inverse of the operation that the function prepareForCache returns
*/
Function<CacheType, T> pullFromCache();
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,38 +64,35 @@ public class ChainedExecutionQueryRunner<T> implements QueryRunner<T>

private final Iterable<QueryRunner<T>> queryables;
private final ListeningExecutorService exec;
private final Ordering<T> ordering;
private final QueryWatcher queryWatcher;

public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
QueryRunner<T>... queryables
)
{
this(exec, ordering, queryWatcher, Arrays.asList(queryables));
this(exec, queryWatcher, Arrays.asList(queryables));
}

public ChainedExecutionQueryRunner(
ExecutorService exec,
Ordering<T> ordering,
QueryWatcher queryWatcher,
Iterable<QueryRunner<T>> queryables
)
{
// listeningDecorator will leave PrioritizedExecutorService unchanged,
// since it already implements ListeningExecutorService
this.exec = MoreExecutors.listeningDecorator(exec);
this.ordering = ordering;
this.queryables = Iterables.unmodifiableIterable(queryables);
this.queryWatcher = queryWatcher;
}

@Override
public Sequence<T> run(final Query<T> query, final Map<String, Object> responseContext)
{
final int priority = query.getContextPriority(0);
final int priority = BaseQuery.getContextPriority(query, 0);
final Ordering ordering = query.getResultOrdering();

return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
Expand Down
16 changes: 16 additions & 0 deletions processing/src/main/java/io/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ public static class TimeseriesQueryBuilder
private List<PostAggregator> postAggregatorSpecs;
private Map<String, Object> context;

private boolean descending;

private TimeseriesQueryBuilder()
{
dataSource = null;
Expand All @@ -348,6 +350,7 @@ public TimeseriesQuery build()
return new TimeseriesQuery(
dataSource,
querySegmentSpec,
descending,
dimFilter,
granularity,
aggregatorSpecs,
Expand All @@ -362,6 +365,7 @@ public TimeseriesQueryBuilder copy(TimeseriesQuery query)
.dataSource(query.getDataSource())
.intervals(query.getIntervals())
.filters(query.getDimensionsFilter())
.descending(query.isDescending())
.granularity(query.getGranularity())
.aggregators(query.getAggregatorSpecs())
.postAggregators(query.getPostAggregatorSpecs())
Expand All @@ -374,6 +378,7 @@ public TimeseriesQueryBuilder copy(TimeseriesQueryBuilder builder)
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.filters(builder.dimFilter)
.descending(builder.descending)
.granularity(builder.granularity)
.aggregators(builder.aggregatorSpecs)
.postAggregators(builder.postAggregatorSpecs)
Expand All @@ -395,6 +400,11 @@ public DimFilter getDimFilter()
return dimFilter;
}

public boolean isDescending()
{
return descending;
}

public QueryGranularity getGranularity()
{
return granularity;
Expand Down Expand Up @@ -467,6 +477,12 @@ public TimeseriesQueryBuilder filters(DimFilter f)
return this;
}

public TimeseriesQueryBuilder descending(boolean d)
{
descending = d;
return this;
}

public TimeseriesQueryBuilder granularity(String g)
{
granularity = QueryGranularity.fromString(g);
Expand Down
Loading