Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/querying/timeboundaryquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Time boundary queries return the earliest and latest data points of a data set.
"queryType" : "timeBoundary",
"dataSource": "sample_datasource",
"bound" : < "maxTime" | "minTime" > # optional, defaults to returning both timestamps if not set
"filter" : { "type": "and", "fields": [<filter>, <filter>, ...] } # optional
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is an "and" filter rather than just <filter>?

Copy link
Copy Markdown
Contributor Author

@rajk-tetration rajk-tetration Jul 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find an example of just using "filter": <filter> anywhere else in the docs, for example in TimeSeries an actual filter is given. I can change it, just wanted to be consistent.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, fair enough, I guess this style predated your edit so feel free to leave it that way.

}
```

Expand All @@ -19,6 +20,7 @@ There are 3 main parts to a time boundary query:
|queryType|This String should always be "timeBoundary"; this is the first thing Druid looks at to figure out how to interpret the query|yes|
|dataSource|A String or Object defining the data source to query, very similar to a table in a relational database. See [DataSource](../querying/datasource.html) for more information.|yes|
|bound | Optional, set to `maxTime` or `minTime` to return only the latest or earliest timestamp. Default to returning both if not set| no |
|filter|See [Filters](../querying/filters.html)|no|
|context|See [Context](../querying/query-context.html)|no|

The format of the result is:
Expand Down
22 changes: 22 additions & 0 deletions processing/src/main/java/io/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -775,13 +775,15 @@ public static class TimeBoundaryQueryBuilder
private DataSource dataSource;
private QuerySegmentSpec querySegmentSpec;
private String bound;
private DimFilter dimFilter;
private Map<String, Object> context;

public TimeBoundaryQueryBuilder()
{
dataSource = null;
querySegmentSpec = null;
bound = null;
dimFilter = null;
context = null;
}

Expand All @@ -791,6 +793,7 @@ public TimeBoundaryQuery build()
dataSource,
querySegmentSpec,
bound,
dimFilter,
context
);
}
Expand All @@ -801,6 +804,7 @@ public TimeBoundaryQueryBuilder copy(TimeBoundaryQueryBuilder builder)
.dataSource(builder.dataSource)
.intervals(builder.querySegmentSpec)
.bound(builder.bound)
.filters(builder.dimFilter)
.context(builder.context);
}

Expand Down Expand Up @@ -840,6 +844,24 @@ public TimeBoundaryQueryBuilder bound(String b)
return this;
}

public TimeBoundaryQueryBuilder filters(String dimensionName, String value)
{
dimFilter = new SelectorDimFilter(dimensionName, value, null);
return this;
}

public TimeBoundaryQueryBuilder filters(String dimensionName, String value, String... values)
{
dimFilter = new InDimFilter(dimensionName, Lists.asList(value, values), null);
return this;
}

public TimeBoundaryQueryBuilder filters(DimFilter f)
{
dimFilter = f;
return this;
}

public TimeBoundaryQueryBuilder context(Map<String, Object> c)
{
context = c;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.druid.query.Result;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand All @@ -52,13 +53,15 @@ public class TimeBoundaryQuery extends BaseQuery<Result<TimeBoundaryResultValue>

private static final byte CACHE_TYPE_ID = 0x0;

private final DimFilter dimFilter;
private final String bound;

@JsonCreator
public TimeBoundaryQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("bound") String bound,
@JsonProperty("filter") DimFilter dimFilter,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This requires updating the timeboundary query documentation in docs/content/querying

@JsonProperty("context") Map<String, Object> context
)
{
Expand All @@ -70,13 +73,13 @@ public TimeBoundaryQuery(
context
);

this.dimFilter = dimFilter;
this.bound = bound == null ? "" : bound;
}

@Override
public boolean hasFilters()
{
return false;
public boolean hasFilters() {
return dimFilter != null;
}

@Override
Expand All @@ -85,6 +88,12 @@ public String getType()
return Query.TIME_BOUNDARY;
}

@JsonProperty("filter")
public DimFilter getDimensionsFilter()
{
return dimFilter;
}

@JsonProperty
public String getBound()
{
Expand All @@ -98,6 +107,7 @@ public TimeBoundaryQuery withOverriddenContext(Map<String, Object> contextOverri
getDataSource(),
getQuerySegmentSpec(),
bound,
dimFilter,
computeOverridenContext(contextOverrides)
);
}
Expand All @@ -109,6 +119,7 @@ public TimeBoundaryQuery withQuerySegmentSpec(QuerySegmentSpec spec)
getDataSource(),
spec,
bound,
dimFilter,
getContext()
);
}
Expand All @@ -120,16 +131,21 @@ public Query<Result<TimeBoundaryResultValue>> withDataSource(DataSource dataSour
dataSource,
getQuerySegmentSpec(),
bound,
dimFilter,
getContext()
);
}

public byte[] getCacheKey()
{
final byte[] filterBytes = dimFilter == null ? new byte[]{} : dimFilter.getCacheKey();
final byte[] boundBytes = StringUtils.toUtf8(bound);
return ByteBuffer.allocate(1 + boundBytes.length)
final byte delimiter = (byte) 0xff;
return ByteBuffer.allocate(2 + boundBytes.length + filterBytes.length)
.put(CACHE_TYPE_ID)
.put(boundBytes)
.put(delimiter)
.put(filterBytes)
.array();
}

Expand Down Expand Up @@ -211,6 +227,7 @@ public String toString()
", querySegmentSpec=" + getQuerySegmentSpec() +
", duration=" + getDuration() +
", bound=" + bound +
", dimFilter=" + dimFilter +
'}';
}

Expand All @@ -233,6 +250,10 @@ public boolean equals(Object o)
return false;
}

if (dimFilter != null ? !dimFilter.equals(that.dimFilter) : that.dimFilter != null) {
return false;
}

return true;
}

Expand All @@ -241,6 +262,7 @@ public int hashCode()
{
int result = super.hashCode();
result = 31 * result + bound.hashCode();
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class TimeBoundaryQueryQueryToolChest
@Override
public <T extends LogicalSegment> List<T> filterSegments(TimeBoundaryQuery query, List<T> segments)
{
if (segments.size() <= 1) {
if (segments.size() <= 1 || query.hasFilters()) {
return segments;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,35 @@
package io.druid.query.timeboundary;

import com.google.inject.Inject;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.query.ChainedExecutionQueryRunner;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryWatcher;
import io.druid.query.Result;
import io.druid.granularity.AllGranularity;
import io.druid.segment.Segment;
import io.druid.segment.StorageAdapter;
import io.druid.segment.filter.Filters;
import io.druid.query.QueryRunnerHelper;
import io.druid.segment.Cursor;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.column.Column;
import org.joda.time.DateTime;

import java.util.Iterator;
import java.util.Map;
import java.util.List;
import java.util.concurrent.ExecutorService;


/**
*/
public class TimeBoundaryQueryRunnerFactory
Expand Down Expand Up @@ -75,10 +86,45 @@ public QueryToolChest<Result<TimeBoundaryResultValue>, TimeBoundaryQuery> getToo
private static class TimeBoundaryQueryRunner implements QueryRunner<Result<TimeBoundaryResultValue>>
{
private final StorageAdapter adapter;
private final Function<Cursor, Result<DateTime>> skipToFirstMatching;

public TimeBoundaryQueryRunner(Segment segment)
{
this.adapter = segment.asStorageAdapter();
this.skipToFirstMatching = new Function<Cursor, Result<DateTime>>()
{
@Override
public Result<DateTime> apply(Cursor cursor)
{
if (cursor.isDone()) {
return null;
}
final LongColumnSelector timestampColumnSelector = cursor.makeLongColumnSelector(Column.TIME_COLUMN_NAME);
final DateTime timestamp = new DateTime(timestampColumnSelector.get());
return new Result<>(adapter.getInterval().getStart(), timestamp);
}
};
}

private DateTime getTimeBoundary(StorageAdapter adapter, TimeBoundaryQuery legacyQuery, boolean descending)
{
final Sequence<Result<DateTime>> resultSequence = QueryRunnerHelper.makeCursorBasedQuery(
adapter,
legacyQuery.getQuerySegmentSpec().getIntervals(),
Filters.toFilter(legacyQuery.getDimensionsFilter()),
descending,
new AllGranularity(),
this.skipToFirstMatching
);
final List<Result<DateTime>> resultList = Sequences.toList(
Sequences.limit(resultSequence, 1),
Lists.<Result<DateTime>>newArrayList()
);
if (resultList.size() > 0) {
return resultList.get(0).getValue();
}

return null;
}

@Override
Expand All @@ -104,14 +150,24 @@ public Iterator<Result<TimeBoundaryResultValue>> make()
"Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped."
);
}

final DateTime minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
? null
: adapter.getMinTime();
final DateTime maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
? null
: adapter.getMaxTime();

final DateTime minTime;
final DateTime maxTime;

if (legacyQuery.getDimensionsFilter() != null) {
minTime = getTimeBoundary(adapter, legacyQuery, false);
if (minTime == null) {
maxTime = null;
} else {
maxTime = getTimeBoundary(adapter, legacyQuery, true);
}
} else {
minTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MAX_TIME)
? null
: adapter.getMinTime();
maxTime = legacyQuery.getBound().equalsIgnoreCase(TimeBoundaryQuery.MIN_TIME)
? null
: adapter.getMaxTime();
}

return legacyQuery.buildResult(
adapter.getInterval().getStart(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.Druids;
import io.druid.query.CacheStrategy;
import io.druid.query.Result;
import io.druid.query.TableDataSource;
Expand All @@ -45,23 +46,30 @@ public class TimeBoundaryQueryQueryToolChestTest
new TableDataSource("test"),
null,
null,
null,
null
);

private static final TimeBoundaryQuery MAXTIME_BOUNDARY_QUERY = new TimeBoundaryQuery(
new TableDataSource("test"),
null,
TimeBoundaryQuery.MAX_TIME,
null,
null
);

private static final TimeBoundaryQuery MINTIME_BOUNDARY_QUERY = new TimeBoundaryQuery(
new TableDataSource("test"),
null,
TimeBoundaryQuery.MIN_TIME,
null,
null
);

private static final TimeBoundaryQuery FILTERED_BOUNDARY_QUERY = Druids.newTimeBoundaryQueryBuilder()
.dataSource("testing")
.filters("foo", "bar")
.build();

private static LogicalSegment createLogicalSegment(final Interval interval)
{
Expand Down Expand Up @@ -165,6 +173,24 @@ public void testMinTimeFilterSegments() throws Exception
}
}

@Test
public void testFilteredFilterSegments() throws Exception
{
List<LogicalSegment> segments = new TimeBoundaryQueryQueryToolChest().filterSegments(
FILTERED_BOUNDARY_QUERY,
Arrays.asList(
createLogicalSegment(new Interval("2013-01-01/P1D")),
createLogicalSegment(new Interval("2013-01-01T01/PT1H")),
createLogicalSegment(new Interval("2013-01-01T02/PT1H")),
createLogicalSegment(new Interval("2013-01-02/P1D")),
createLogicalSegment(new Interval("2013-01-03T01/PT1H")),
createLogicalSegment(new Interval("2013-01-03T02/PT1H")),
createLogicalSegment(new Interval("2013-01-03/P1D"))
)
);

Assert.assertEquals(7, segments.size());
}
@Test
public void testCacheStrategy() throws Exception
{
Expand All @@ -180,6 +206,7 @@ public void testCacheStrategy() throws Exception
)
),
null,
null,
null
)
);
Expand Down
Loading