Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
package com.baidu.hugegraph.backend.store.cassandra;

import java.util.Iterator;
import java.util.List;
import java.util.function.BiFunction;

import com.baidu.hugegraph.backend.page.PageState;
import com.baidu.hugegraph.backend.query.Query;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendEntryIterator;
import com.baidu.hugegraph.util.E;
import com.datastax.driver.core.ExecutionInfo;
import com.datastax.driver.core.PagingState;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
Expand All @@ -37,24 +39,50 @@ public class CassandraEntryIterator extends BackendEntryIterator {
private final Iterator<Row> rows;
private final BiFunction<BackendEntry, Row, BackendEntry> merger;

private long remaining;
private int fetchdPageSize;
private long expected;
private BackendEntry next;

public CassandraEntryIterator(ResultSet results, Query query,
BiFunction<BackendEntry, Row, BackendEntry> merger) {
super(query);
this.results = results;
this.rows = results.iterator();
this.remaining = results.getAvailableWithoutFetching();
this.merger = merger;
this.next = null;

this.skipOffset();
this.fetchdPageSize = results.getAvailableWithoutFetching();
this.next = null;

if (query.paging()) {
E.checkState(this.remaining == query.limit() ||
results.isFullyFetched(),
"Unexpected fetched page size: %s", this.remaining);
assert query.offset() == 0L;
assert query.limit() >= 0L || query.noLimit() : query.limit();
// Skip page offset
this.expected = PageState.fromString(query.page()).offset();
this.skipPageOffset(query.page());
// Check the number of available rows
E.checkState(this.fetchdPageSize <= query.limit(),
"Unexpected fetched page size: %s",
this.fetchdPageSize);
if (results.isFullyFetched()) {
/*
* All results fetched
* NOTE: it may be enough or not enough for the entire page
*/
this.expected = this.fetchdPageSize;
} else {
/*
* Not fully fetched, that's fetchdPageSize == query.limit(),
*
* NOTE: but there may be fetchdPageSize < query.limit(), means
* not fetched the entire page (ScyllaDB may go here #1340),
* try to fetch next page later until got the expected count.
* Can simulate by: `select.setFetchSize(total - 1)`
*/
this.expected = query.total();
}
} else {
this.expected = query.total();
this.skipOffset();
}
}

Expand All @@ -71,11 +99,18 @@ protected final boolean fetch() {
this.next = null;
}

while (this.remaining > 0 && this.rows.hasNext()) {
while (this.expected > 0L && this.rows.hasNext()) {
// Limit expected count, due to rows.hasNext() will fetch next page
this.expected--;
Row row = this.rows.next();
if (this.query.paging()) {
this.remaining--;
// Update fetchdPageSize if auto fetch the next page
if (this.expected > 0L && this.availableLocal() == 0) {
if (this.rows.hasNext()) {
this.fetchdPageSize = this.availableLocal();
}
}
}
Row row = this.rows.next();
BackendEntry merged = this.merger.apply(this.current, row);
if (this.current == null) {
// The first time to read
Expand Down Expand Up @@ -112,11 +147,50 @@ protected final long skip(BackendEntry entry, long skip) {

@Override
protected PageState pageState() {
PagingState page = this.results.getExecutionInfo().getPagingState();
if (page == null || this.results.isExhausted()) {
return new PageState(PageState.EMPTY_BYTES, 0, (int) this.count());
byte[] position;
int offset = 0;
int count = (int) this.count();
assert this.fetched() == count;
int extra = this.availableLocal();
List<ExecutionInfo> infos = this.results.getAllExecutionInfo();
if (extra > 0 && infos.size() >= 2) {
/*
* Go back to the previous page if there are still available
* results fetched to local memory but not consumed, and set page
* offset with consumed amount of results.
*
* Safely, we should get the remaining size of the current page by:
* `Whitebox.getInternalState(results, "currentPage").size()`
* instead of
* `results.getAvailableWithoutFetching()`
*/
ExecutionInfo previous = infos.get(infos.size() - 2);
PagingState page = previous.getPagingState();
position = page.toBytes();
offset = this.fetchdPageSize - extra;
} else {
PagingState page = this.results.getExecutionInfo().getPagingState();
if (page == null || this.expected > 0L) {
// Call isExhausted() will lead to try to fetch the next page
E.checkState(this.results.isExhausted(),
"Unexpected paging state with expected=%s, " +
"ensure consume all the fetched results before " +
"calling pageState()", this.expected);
position = PageState.EMPTY_BYTES;
} else {
/*
* Exist page position which used to fetch the next page.
* Maybe it happens to the last page (that's the position is
* at the end of results and next page is empty)
*/
position = page.toBytes();
}
}
byte[] position = page.toBytes();
return new PageState(position, 0, (int) this.count());

return new PageState(position, offset, count);
}

private int availableLocal() {
return this.results.getAvailableWithoutFetching();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,39 @@ protected List<Select> query2Select(String table, Query query) {
}

protected void setPageState(Query query, List<Select> selects) {
if (query.noLimit()) {
if (query.noLimit() && !query.paging()) {
return;
}
for (Select select : selects) {
long total = query.total();
int total = (int) query.total();
if (!query.noLimit()) {
E.checkArgument(total == query.total(),
"Invalid query limit %s", query.limit());
} else {
assert total == -1 : total;
}

String page = query.page();
if (page == null) {
// Set limit
select.limit((int) total);
assert total > 0 : total;
select.limit(total);
} else {
select.setFetchSize((int) total);
// It's the first time if page is empty
/*
* NOTE: the `total` may be -1 when query.noLimit(),
* setFetchSize(-1) means the default fetch size will be used.
*/
assert total > 0 || total == -1 : total;
select.setFetchSize(total);

/*
* Can't set limit here `select.limit(total)`
* due to it will cause can't get the next page-state.
* Also can't set `select.limit(total + 1)` due to it will
* cause error "Paging state mismatch" when setPagingState().
*/

// It's the first time if page is empty, skip setPagingState
if (!page.isEmpty()) {
byte[] position = PageState.fromString(page).position();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ private byte[] toBytes() {
}

public static PageState fromString(String page) {
E.checkNotNull(page, "page");
return fromBytes(toBytes(page));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ public BinaryEntryIterator(BackendIterator<Elem> results, Query query,
this.merger = m;
this.next = null;

this.skipOffset();

if (query.paging()) {
assert query.offset() == 0L;
assert PageState.fromString(query.page()).offset() == 0;
this.skipPageOffset(query.page());
} else {
this.skipOffset();
}
}

Expand Down Expand Up @@ -98,22 +100,9 @@ protected final boolean fetch() {
return this.current != null;
}

public final static long sizeOfBackendEntry(BackendEntry entry) {
/*
* 3 cases:
* 1) one vertex per entry
* 2) one edge per column (one entry <==> a vertex),
* 3) one element id per column (one entry <==> an index)
*/
if (entry.type().isEdge() || entry.type().isIndex()) {
return entry.columnsSize();
}
return 1L;
}

@Override
protected final long sizeOf(BackendEntry entry) {
return sizeOfBackendEntry(entry);
return sizeOfEntry(entry);
}

@Override
Expand All @@ -140,10 +129,16 @@ private void removeLastRecord() {
((BinaryBackendEntry) this.current).removeColumn(lastOne);
}

private void skipPageOffset(String page) {
PageState pagestate = PageState.fromString(page);
if (pagestate.offset() > 0 && this.fetch()) {
this.skip(this.current, pagestate.offset());
public final static long sizeOfEntry(BackendEntry entry) {
/*
* 3 cases:
* 1) one vertex per entry
* 2) one edge per column (one entry <==> a vertex),
* 3) one element id per column (one entry <==> an index)
*/
if (entry.type().isEdge() || entry.type().isIndex()) {
return entry.columnsSize();
}
return 1L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,28 +128,47 @@ protected final long fetched() {
return this.count + ccount;
}

protected final void skipPageOffset(String page) {
PageState pageState = PageState.fromString(page);
int pageOffset = pageState.offset();
if (pageOffset > 0) {
/*
* Don't update this.count even if skipped page offset,
* because the skipped records belongs to the last page.
*/
this.skipOffset(pageOffset);
}
}

protected void skipOffset() {
long offset = this.query.offset() - this.query.actualOffset();
if (offset <= 0L) {
return;
}
long skipped = this.skipOffset(offset);
this.count += skipped;
this.query.goOffset(skipped);
}

protected long skipOffset(long offset) {
assert offset >= 0L;
long skipped = 0L;
// Skip offset
while (this.count < offset && this.fetch()) {
while (skipped < offset && this.fetch()) {
assert this.current != null;
final long size = this.sizeOf(this.current);
this.count += size;
if (this.count > offset) {
skipped += size;
if (skipped > offset) {
// Skip part of sub-items in an entry
final long skip = size - (this.count - offset);
this.count -= this.skip(this.current, skip);
assert this.count == offset;
final long skip = size - (skipped - offset);
skipped -= this.skip(this.current, skip);
assert skipped == offset;
} else {
// Skip entry
this.current = null;
}
}
this.query.goOffset(this.count);
return skipped;
}

protected long sizeOf(BackendEntry entry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ protected static final BackendEntryIterator newEntryIterator(
}

protected static final long sizeOfBackendEntry(BackendEntry entry) {
return BinaryEntryIterator.sizeOfBackendEntry(entry);
return BinaryEntryIterator.sizeOfEntry(entry);
}

private static class RocksDBShardSpliter extends ShardSpliter<Session> {
Expand Down
Loading