Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ private long goStoreOffsetBySubQuery(long offset) {
}

public <T> Set<T> skipOffsetIfNeeded(Set<T> elems) {
/*
* Skip index(index query with offset) for performance optimization.
* We assume one result is returned by each index, but if there are
* overridden index it will cause confusing offset and results.
*/
long fromIndex = this.offset() - this.actualOffset();
if (fromIndex < 0L) {
// Skipping offset is overhead, no need to skip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,22 @@ private IdHolderList queryByLabel(ConditionQuery query) {
indexQuery.eq(HugeKeys.INDEX_LABEL_ID, il.id());
indexQuery.eq(HugeKeys.FIELD_VALUES, label);
/*
* Set offset and limit to avoid redundant element ids
* NOTE: the backend itself will skip the offset
* We can avoid redundant element ids if set limit, but if there are
* label index overridden by other vertices with different label,
* query with limit like g.V().hasLabel('xx').limit(10) may lose some
* results, so can't set limit here. But in this case, the following
* query results may be still different:
* g.V().hasLabel('xx').count() // label index count
* g.V().hasLabel('xx').limit(-1).count() // actual vertices count
* It’s a similar situation for the offset, like:
* g.V().hasLabel('xx').range(26, 27)
* g.V().hasLabel('xx').range(27, 28)
* we just reset limit here, but don't reset offset due to performance
* optimization with index+offset query, see Query.skipOffsetIfNeeded().
* NOTE: if set offset the backend itself will skip the offset
*/
indexQuery.copyBasic(query);
indexQuery.limit(Query.NO_LIMIT);

IdHolder idHolder = this.doIndexQuery(il, indexQuery);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import com.baidu.hugegraph.iterator.FlatMapperIterator;
import com.baidu.hugegraph.iterator.ListIterator;
import com.baidu.hugegraph.iterator.MapperIterator;
import com.baidu.hugegraph.iterator.WrappedIterator;
import com.baidu.hugegraph.job.system.DeleteExpiredJob;
import com.baidu.hugegraph.perf.PerfUtil.Watched;
import com.baidu.hugegraph.schema.EdgeLabel;
Expand Down Expand Up @@ -747,7 +748,7 @@ public Iterator<Vertex> queryVertices(Query query) {

@SuppressWarnings("unchecked")
Iterator<Vertex> r = (Iterator<Vertex>) joinTxVertices(query, results);
return this.filterOffsetLimitRecords(r, query);
return this.skipOffsetOrStopLimit(r, query);
}

protected Iterator<HugeVertex> queryVerticesFromBackend(Query query) {
Expand Down Expand Up @@ -930,7 +931,7 @@ public Iterator<Edge> queryEdges(Query query) {
@SuppressWarnings("unchecked")
Iterator<Edge> r = (Iterator<Edge>) joinTxEdges(query, results,
this.removedVertices);
return this.filterOffsetLimitRecords(r, query);
return this.skipOffsetOrStopLimit(r, query);
}

protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
Expand Down Expand Up @@ -1599,15 +1600,15 @@ private <T extends HugeElement> Iterator<T> filterUnmatchedRecords(
}
// Process results that query from left index or primary-key
if (query.resultType().isVertex() == elem.type().isVertex() &&
!filterResultFromIndexQuery(query, elem)) {
!rightResultFromIndexQuery(query, elem)) {
// Only index query will come here
return false;
}
return true;
});
}

private boolean filterResultFromIndexQuery(Query query, HugeElement elem) {
private boolean rightResultFromIndexQuery(Query query, HugeElement elem) {
/*
* If query is ConditionQuery or query.originQuery() is ConditionQuery
* means it's index query
Expand Down Expand Up @@ -1637,8 +1638,24 @@ private boolean filterResultFromIndexQuery(Query query, HugeElement elem) {
return false;
}

private <T> Iterator<T> filterOffsetLimitRecords(Iterator<T> results,
Query query) {
private <T extends HugeElement> Iterator<T>
filterExpiredResultFromFromBackend(
Query query, Iterator<T> results) {
if (this.store().features().supportsTtl() || query.showExpired()) {
return results;
}
// Filter expired vertices/edges with TTL
return new FilterIterator<>(results, elem -> {
if (elem.expired()) {
DeleteExpiredJob.asyncDeleteExpiredObject(this.params(), elem);
return false;
}
return true;
});
}

private <T> Iterator<T> skipOffsetOrStopLimit(Iterator<T> results,
Query query) {
if (query.noLimitAndOffset()) {
return results;
}
Expand All @@ -1655,26 +1672,10 @@ private <T> Iterator<T> filterOffsetLimitRecords(Iterator<T> results,
query.goOffset(1L);
}
}
// Filter limit
return new FilterIterator<>(results, elem -> {
// Stop if reach limit
return new LimitIterator<>(results, elem -> {
long count = query.goOffset(1L);
return !query.reachLimit(count - 1L);
});
}

private <T extends HugeElement> Iterator<T>
filterExpiredResultFromFromBackend(
Query query, Iterator<T> results) {
if (this.store().features().supportsTtl() || query.showExpired()) {
return results;
}
// Filter expired vertices/edges with TTL
return new FilterIterator<>(results, elem -> {
if (elem.expired()) {
DeleteExpiredJob.asyncDeleteExpiredObject(this.params(), elem);
return false;
}
return true;
return query.reachLimit(count - 1L);
});
}

Expand Down Expand Up @@ -1986,4 +1987,47 @@ private <T> void traverseByLabel(SchemaLabel label,
} while (page != null);
}
}

// TODO: move to common module
public static class LimitIterator<T> extends WrappedIterator<T> {

private final Iterator<T> originIterator;
private final Function<T, Boolean> filterCallback;

public LimitIterator(Iterator<T> origin, Function<T, Boolean> filter) {
this.originIterator = origin;
this.filterCallback = filter;
}

@Override
protected Iterator<T> originIterator() {
return this.originIterator;
}

@Override
protected final boolean fetch() {
while (this.originIterator.hasNext()) {
T next = this.originIterator.next();
// Do filter
boolean reachLimit = this.filterCallback.apply(next);
if (reachLimit) {
this.closeOriginIterator();
return false;
}
if (next != null) {
assert this.current == none();
this.current = next;
return true;
}
}
return false;
}

protected final void closeOriginIterator() {
if (this.originIterator == null) {
return;
}
close(this.originIterator);
}
}
}