Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/content/querying/select-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,15 @@ This can be used with the next query's pagingSpec:
"pagingSpec":{"pagingIdentifiers": {"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9" : 5}, "threshold":5}

}
```

Note that in the second query, an offset is specified and that it is 1 greater than the largest offset found in the initial results. To return the next "page", this offset must be incremented by 1 (should be decremented by 1 for descending query), with each new query, but with option `fromNext` enabled, this operation is not needed. When an empty results set is received, the very last page has been returned.

`fromNext` options is in pagingSpec:

Note that in the second query, an offset is specified and that it is 1 greater than the largest offset found in the initial results. To return the next "page", this offset must be incremented by 1 (should be decremented by 1 for descending query), with each new query. When an empty results set is received, the very last page has been returned.
```json
{
...
"pagingSpec":{"pagingIdentifiers": {}, "threshold":5, "fromNext": true}
}
```
79 changes: 62 additions & 17 deletions processing/src/main/java/io/druid/query/select/PagingSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,60 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.common.StringUtils;

import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;

/**
*/
public class PagingSpec
{
private final LinkedHashMap<String, Integer> pagingIdentifiers;
public static PagingSpec newSpec(int threshold)
{
return new PagingSpec(null, threshold);
}

public static Map<String, Integer> merge(Iterable<Map<String, Integer>> cursors)
{
Map<String, Integer> next = Maps.newHashMap();
for (Map<String, Integer> cursor : cursors) {
for (Map.Entry<String, Integer> entry : cursor.entrySet()) {
next.put(entry.getKey(), entry.getValue());
}
}
return next;
}

public static Map<String, Integer> next(Map<String, Integer> cursor, boolean descending)
{
for (Map.Entry<String, Integer> entry : cursor.entrySet()) {
entry.setValue(descending ? entry.getValue() - 1 : entry.getValue() + 1);
}
return cursor;
}

private final Map<String, Integer> pagingIdentifiers;
private final int threshold;
private final boolean fromNext;

@JsonCreator
public PagingSpec(
@JsonProperty("pagingIdentifiers") LinkedHashMap<String, Integer> pagingIdentifiers,
@JsonProperty("threshold") int threshold
@JsonProperty("pagingIdentifiers") Map<String, Integer> pagingIdentifiers,
@JsonProperty("threshold") int threshold,
@JsonProperty("fromNext") boolean fromNext
)
{
this.pagingIdentifiers = pagingIdentifiers == null ? new LinkedHashMap<String, Integer>() : pagingIdentifiers;
this.pagingIdentifiers = pagingIdentifiers == null ? Maps.<String, Integer>newHashMap() : pagingIdentifiers;
this.threshold = threshold;
this.fromNext = fromNext;
}

public PagingSpec(Map<String, Integer> pagingIdentifiers, int threshold)
{
this(pagingIdentifiers, threshold, false);
}

@JsonProperty
Expand All @@ -57,6 +89,12 @@ public int getThreshold()
return threshold;
}

@JsonProperty
public boolean isFromNext()
{
return fromNext;
}

public byte[] getCacheKey()
{
final byte[][] pagingKeys = new byte[pagingIdentifiers.size()][];
Expand All @@ -75,7 +113,7 @@ public byte[] getCacheKey()

final byte[] thresholdBytes = ByteBuffer.allocate(Ints.BYTES).putInt(threshold).array();

final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length);
final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length + 1);

for (byte[] pagingKey : pagingKeys) {
queryCacheKey.put(pagingKey);
Expand All @@ -86,22 +124,37 @@ public byte[] getCacheKey()
}

queryCacheKey.put(thresholdBytes);
queryCacheKey.put(isFromNext() ? (byte) 0x01 : 0x00);

return queryCacheKey.array();
}

public PagingOffset getOffset(String identifier, boolean descending)
{
Integer offset = pagingIdentifiers.get(identifier);
if (offset == null) {
offset = PagingOffset.toOffset(0, descending);
} else if (fromNext) {
offset = descending ? offset - 1 : offset + 1;
}
return PagingOffset.of(offset, threshold);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof PagingSpec)) {
if (o == null || getClass() != o.getClass()) {
return false;
}

PagingSpec that = (PagingSpec) o;

if (fromNext != that.fromNext) {
return false;
}
if (threshold != that.threshold) {
return false;
}
Expand All @@ -117,6 +170,7 @@ public int hashCode()
{
int result = pagingIdentifiers.hashCode();
result = 31 * result + threshold;
result = 31 * result + (fromNext ? 1 : 0);
return result;
}

Expand All @@ -126,16 +180,7 @@ public String toString()
return "PagingSpec{" +
"pagingIdentifiers=" + pagingIdentifiers +
", threshold=" + threshold +
", fromNext=" + fromNext +
'}';
}

public PagingOffset getOffset(String identifier, boolean descending)
{
Integer offset = pagingIdentifiers.get(identifier);
if (offset == null) {
offset = PagingOffset.toOffset(0, descending);
}
return PagingOffset.of(offset, threshold);
}

}
Loading