From d8878545a155cb42094d0fd520be0f4436b0fb3a Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Tue, 30 Mar 2021 20:40:45 +0800 Subject: [PATCH 1/3] fix ScyllaDB: lost page results due to not fetched the entire page fix #1340 Change-Id: Ib89d9790af4236a137d9215e27f0d17f0de10afd --- .../cassandra/CassandraEntryIterator.java | 16 ++++++++-- .../store/cassandra/CassandraTable.java | 31 ++++++++++++++++--- 2 files changed, 39 insertions(+), 8 deletions(-) diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java index 6f09008033..a00dd92990 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java @@ -52,9 +52,18 @@ public CassandraEntryIterator(ResultSet results, Query query, this.skipOffset(); if (query.paging()) { - E.checkState(this.remaining == query.limit() || - results.isFullyFetched(), - "Unexpected fetched page size: %s", this.remaining); + assert query.offset() == 0L; + assert query.limit() >= 0L || query.noLimit() : query.limit(); + // Check the number of available rows + if (results.isFullyFetched()) { + // All results fetched (maybe not enough for the entire page) + E.checkState(this.remaining <= query.limit(), + "Unexpected fetched page size: %s", + this.remaining); + } else { + // Not fetched the entire page (ScyllaDB may go here #1340) + this.remaining = query.limit(); + } } } @@ -73,6 +82,7 @@ protected final boolean fetch() { while (this.remaining > 0 && this.rows.hasNext()) { if (this.query.paging()) { + // Limit page size(due to rows.hasNext() will fetch next page) this.remaining--; } Row row = this.rows.next(); diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java index 5b068109cc..184505d84f 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraTable.java @@ -204,18 +204,39 @@ protected List selects) { - if (query.noLimit()) { + if (query.noLimit() && !query.paging()) { return; } for (Select select : selects) { - long total = query.total(); + int total = (int) query.total(); + if (!query.noLimit()) { + E.checkArgument(total == query.total(), + "Invalid query limit %s", query.limit()); + } else { + assert total == -1 : total; + } + String page = query.page(); if (page == null) { // Set limit - select.limit((int) total); + assert total > 0 : total; + select.limit(total); } else { - select.setFetchSize((int) total); - // It's the first time if page is empty + /* + * NOTE: the `total` may be -1 when query.noLimit(), + * setFetchSize(-1) means the default fetch size will be used. + */ + assert total > 0 || total == -1 : total; + select.setFetchSize(total); + + /* + * Can't set limit here `select.limit(total)` + * due to it will cause can't get the next page-state. + * Also can't set `select.limit(total + 1)` due to it will + * cause error "Paging state mismatch" when setPagingState(). + */ + + // It's the first time if page is empty, skip setPagingState if (!page.isEmpty()) { byte[] position = PageState.fromString(page).position(); try { From ee91a8b751bc129acac3de55297df38093cfa6ff Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Sat, 17 Apr 2021 12:49:08 +0800 Subject: [PATCH 2/3] return page offset if not return local fetched data to users Change-Id: Ife31d04509c1815793b07badc101bde27ba1299c --- .../cassandra/CassandraEntryIterator.java | 102 ++++++++++++++---- .../hugegraph/backend/page/PageState.java | 1 + .../serializer/BinaryEntryIterator.java | 35 +++--- .../backend/store/BackendEntryIterator.java | 33 ++++-- .../backend/store/rocksdb/RocksDBTable.java | 2 +- .../baidu/hugegraph/core/EdgeCoreTest.java | 87 +++++++++++++-- .../baidu/hugegraph/core/VertexCoreTest.java | 53 ++++++++- 7 files changed, 256 insertions(+), 57 deletions(-) diff --git a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java index a00dd92990..4c466f3826 100644 --- a/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java +++ b/hugegraph-cassandra/src/main/java/com/baidu/hugegraph/backend/store/cassandra/CassandraEntryIterator.java @@ -20,6 +20,7 @@ package com.baidu.hugegraph.backend.store.cassandra; import java.util.Iterator; +import java.util.List; import java.util.function.BiFunction; import com.baidu.hugegraph.backend.page.PageState; @@ -27,6 +28,7 @@ import com.baidu.hugegraph.backend.store.BackendEntry; import com.baidu.hugegraph.backend.store.BackendEntryIterator; import com.baidu.hugegraph.util.E; +import com.datastax.driver.core.ExecutionInfo; import com.datastax.driver.core.PagingState; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; @@ -37,7 +39,8 @@ public class CassandraEntryIterator extends BackendEntryIterator { private final Iterator rows; private final BiFunction merger; - private long remaining; + private int fetchdPageSize; + private long expected; private BackendEntry next; public CassandraEntryIterator(ResultSet results, Query query, @@ -45,25 +48,41 @@ public CassandraEntryIterator(ResultSet results, Query query, super(query); this.results = results; this.rows = results.iterator(); - this.remaining = results.getAvailableWithoutFetching(); this.merger = merger; - this.next = null; - this.skipOffset(); + this.fetchdPageSize = results.getAvailableWithoutFetching(); + this.next = null; if (query.paging()) { assert query.offset() == 0L; assert query.limit() >= 0L || query.noLimit() : query.limit(); + // Skip page offset + this.expected = PageState.fromString(query.page()).offset(); + this.skipPageOffset(query.page()); // Check the number of available rows + E.checkState(this.fetchdPageSize <= query.limit(), + "Unexpected fetched page size: %s", + this.fetchdPageSize); if (results.isFullyFetched()) { - // All results fetched (maybe not enough for the entire page) - E.checkState(this.remaining <= query.limit(), - "Unexpected fetched page size: %s", - this.remaining); + /* + * All results fetched + * NOTE: it may be enough or not enough for the entire page + */ + this.expected = this.fetchdPageSize; } else { - // Not fetched the entire page (ScyllaDB may go here #1340) - this.remaining = query.limit(); + /* + * Not fully fetched, that's fetchdPageSize == query.limit(), + * + * NOTE: but there may be fetchdPageSize < query.limit(), means + * not fetched the entire page (ScyllaDB may go here #1340), + * try to fetch next page later until got the expected count. + * Can simulate by: `select.setFetchSize(total - 1)` + */ + this.expected = query.total(); } + } else { + this.expected = query.total(); + this.skipOffset(); } } @@ -80,12 +99,18 @@ protected final boolean fetch() { this.next = null; } - while (this.remaining > 0 && this.rows.hasNext()) { + while (this.expected > 0L && this.rows.hasNext()) { + // Limit expected count, due to rows.hasNext() will fetch next page + this.expected--; + Row row = this.rows.next(); if (this.query.paging()) { - // Limit page size(due to rows.hasNext() will fetch next page) - this.remaining--; + // Update fetchdPageSize if auto fetch the next page + if (this.expected > 0L && this.availableLocal() == 0) { + if (this.rows.hasNext()) { + this.fetchdPageSize = this.availableLocal(); + } + } } - Row row = this.rows.next(); BackendEntry merged = this.merger.apply(this.current, row); if (this.current == null) { // The first time to read @@ -122,11 +147,50 @@ protected final long skip(BackendEntry entry, long skip) { @Override protected PageState pageState() { - PagingState page = this.results.getExecutionInfo().getPagingState(); - if (page == null || this.results.isExhausted()) { - return new PageState(PageState.EMPTY_BYTES, 0, (int) this.count()); + byte[] position; + int offset = 0; + int count = (int) this.count(); + assert this.fetched() == count; + int extra = this.availableLocal(); + List infos = this.results.getAllExecutionInfo(); + if (extra > 0 && infos.size() >= 2) { + /* + * Go back to the previous page if there are still available + * results fetched to local memory but not consumed, and set page + * offset with consumed amount of results. + * + * Safely, we should get the remaining size of the current page by: + * `Whitebox.getInternalState(results, "currentPage").size()` + * instead of + * `results.getAvailableWithoutFetching()` + */ + ExecutionInfo previous = infos.get(infos.size() - 2); + PagingState page = previous.getPagingState(); + position = page.toBytes(); + offset = this.fetchdPageSize - extra; + } else { + PagingState page = this.results.getExecutionInfo().getPagingState(); + if (page == null || this.expected > 0L) { + // Call isExhausted() will lead to try to fetch the next page + E.checkState(this.results.isExhausted(), + "Unexpected paging state with expected=%s, " + + "ensure consume all the fetched results before " + + "calling pageState()", this.expected); + position = PageState.EMPTY_BYTES; + } else { + /* + * Exist page position which used to fetch the next page. + * Maybe it happens to the last page (that's the position is + * at the end of results and next page is empty) + */ + position = page.toBytes(); + } } - byte[] position = page.toBytes(); - return new PageState(position, 0, (int) this.count()); + + return new PageState(position, offset, count); + } + + private int availableLocal() { + return this.results.getAvailableWithoutFetching(); } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java index 7f6e3d595b..19311e2b3b 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java @@ -72,6 +72,7 @@ private byte[] toBytes() { } public static PageState fromString(String page) { + E.checkNotNull("page", "page"); return fromBytes(toBytes(page)); } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryEntryIterator.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryEntryIterator.java index 57daccf56c..c9817a36a5 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryEntryIterator.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/serializer/BinaryEntryIterator.java @@ -46,10 +46,12 @@ public BinaryEntryIterator(BackendIterator results, Query query, this.merger = m; this.next = null; - this.skipOffset(); - if (query.paging()) { + assert query.offset() == 0L; + assert PageState.fromString(query.page()).offset() == 0; this.skipPageOffset(query.page()); + } else { + this.skipOffset(); } } @@ -98,22 +100,9 @@ protected final boolean fetch() { return this.current != null; } - public final static long sizeOfBackendEntry(BackendEntry entry) { - /* - * 3 cases: - * 1) one vertex per entry - * 2) one edge per column (one entry <==> a vertex), - * 3) one element id per column (one entry <==> an index) - */ - if (entry.type().isEdge() || entry.type().isIndex()) { - return entry.columnsSize(); - } - return 1L; - } - @Override protected final long sizeOf(BackendEntry entry) { - return sizeOfBackendEntry(entry); + return sizeOfEntry(entry); } @Override @@ -140,10 +129,16 @@ private void removeLastRecord() { ((BinaryBackendEntry) this.current).removeColumn(lastOne); } - private void skipPageOffset(String page) { - PageState pagestate = PageState.fromString(page); - if (pagestate.offset() > 0 && this.fetch()) { - this.skip(this.current, pagestate.offset()); + public final static long sizeOfEntry(BackendEntry entry) { + /* + * 3 cases: + * 1) one vertex per entry + * 2) one edge per column (one entry <==> a vertex), + * 3) one element id per column (one entry <==> an index) + */ + if (entry.type().isEdge() || entry.type().isIndex()) { + return entry.columnsSize(); } + return 1L; } } diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java index aa08913041..32803ed374 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/BackendEntryIterator.java @@ -128,28 +128,47 @@ protected final long fetched() { return this.count + ccount; } + protected final void skipPageOffset(String page) { + PageState pageState = PageState.fromString(page); + int pageOffset = pageState.offset(); + if (pageOffset > 0) { + /* + * Don't update this.count even if skipped page offset, + * because the skipped records belongs to the last page. + */ + this.skipOffset(pageOffset); + } + } + protected void skipOffset() { long offset = this.query.offset() - this.query.actualOffset(); if (offset <= 0L) { return; } + long skipped = this.skipOffset(offset); + this.count += skipped; + this.query.goOffset(skipped); + } + protected long skipOffset(long offset) { + assert offset >= 0L; + long skipped = 0L; // Skip offset - while (this.count < offset && this.fetch()) { + while (skipped < offset && this.fetch()) { assert this.current != null; final long size = this.sizeOf(this.current); - this.count += size; - if (this.count > offset) { + skipped += size; + if (skipped > offset) { // Skip part of sub-items in an entry - final long skip = size - (this.count - offset); - this.count -= this.skip(this.current, skip); - assert this.count == offset; + final long skip = size - (skipped - offset); + skipped -= this.skip(this.current, skip); + assert skipped == offset; } else { // Skip entry this.current = null; } } - this.query.goOffset(this.count); + return skipped; } protected long sizeOf(BackendEntry entry) { diff --git a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java index 45ae08a9f0..55e5f5c9b6 100644 --- a/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java +++ b/hugegraph-rocksdb/src/main/java/com/baidu/hugegraph/backend/store/rocksdb/RocksDBTable.java @@ -271,7 +271,7 @@ protected static final BackendEntryIterator newEntryIterator( } protected static final long sizeOfBackendEntry(BackendEntry entry) { - return BinaryEntryIterator.sizeOfBackendEntry(entry); + return BinaryEntryIterator.sizeOfEntry(entry); } private static class RocksDBShardSpliter extends ShardSpliter { diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java index 7c070da341..7a74f83d54 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/EdgeCoreTest.java @@ -5112,9 +5112,15 @@ public void testQueryBothEdgesOfVertexInPaging() { while (page != null) { GraphTraversal iterator = graph.traversal().V(james).bothE() .has("~page", page).limit(1); - Assert.assertEquals(1, IteratorUtils.count(iterator)); + long size = IteratorUtils.count(iterator); + if (size == 0L) { + // The last page is empty + Assert.assertEquals(6, count); + } else { + Assert.assertEquals(1, size); + } page = TraversalUtil.page(iterator); - count++; + count += size; } Assert.assertEquals(6, count); } @@ -5132,9 +5138,15 @@ public void testQueryOutEdgesOfVertexInPaging() { while (page != null) { GraphTraversal iterator = graph.traversal().V(james).outE() .has("~page", page).limit(1); - Assert.assertEquals(1, IteratorUtils.count(iterator)); + long size = IteratorUtils.count(iterator); + if (size == 0L) { + // The last page is empty + Assert.assertEquals(4, count); + } else { + Assert.assertEquals(1, size); + } page = TraversalUtil.page(iterator); - count++; + count += size; } Assert.assertEquals(4, count); } @@ -5152,9 +5164,15 @@ public void testQueryInEdgesOfVertexInPaging() { while (page != null) { GraphTraversal iterator = graph.traversal().V(james).inE() .has("~page", page).limit(1); - Assert.assertEquals(1, IteratorUtils.count(iterator)); + long size = IteratorUtils.count(iterator); + if (size == 0L) { + // The last page is empty + Assert.assertEquals(2, count); + } else { + Assert.assertEquals(1, size); + } page = TraversalUtil.page(iterator); - count++; + count += size; } Assert.assertEquals(2, count); } @@ -6869,7 +6887,48 @@ public void testQueryEdgeByPage() { } @Test - public void testQueryEdgeByPageResultsMatched() { + public void testQueryEdgeByPageResultsMatchedAll() { + Assume.assumeTrue("Not support paging", + storeFeatures().supportsQueryByPage()); + + HugeGraph graph = graph(); + init100LookEdges(); + + List all = graph.traversal().E().toList(); + + GraphTraversal iter; + + String page = PageInfo.PAGE_NONE; + int size = 21; + + Set pageAll = new HashSet<>(); + for (int i = 0; i < 100 / size; i++) { + iter = graph.traversal().E() + .has("~page", page).limit(size); + @SuppressWarnings("unchecked") + List edges = IteratorUtils.asList(iter); + Assert.assertEquals(size, edges.size()); + + pageAll.addAll(edges); + + page = TraversalUtil.page(iter); + } + + iter = graph.traversal().E() + .has("~page", page).limit(size); + @SuppressWarnings("unchecked") + List edges = IteratorUtils.asList(iter); + Assert.assertEquals(16, edges.size()); + pageAll.addAll(edges); + page = TraversalUtil.page(iter); + + Assert.assertEquals(100, pageAll.size()); + Assert.assertTrue(all.containsAll(pageAll)); + Assert.assertNull(page); + } + + @Test + public void testQueryEdgeByPageResultsMatchedAllWithFullPage() { Assume.assumeTrue("Not support paging", storeFeatures().supportsQueryByPage()); @@ -6897,6 +6956,15 @@ public void testQueryEdgeByPageResultsMatched() { } Assert.assertEquals(100, pageAll.size()); Assert.assertTrue(all.containsAll(pageAll)); + + if (page != null) { + iter = graph.traversal().E().has("~page", page); + long count = IteratorUtils.count(iter); + Assert.assertEquals(0L, count); + + page = TraversalUtil.page(iter); + CloseableIterator.closeIterator(iter); + } Assert.assertNull(page); } @@ -7103,9 +7171,10 @@ private int traverseInPage(Function> fetcher) { int count = 0; while (page != null) { GraphTraversal iterator = fetcher.apply(page); - Assert.assertEquals(1, IteratorUtils.count(iterator)); + long size = IteratorUtils.count(iterator); + Assert.assertLte(1L, size); page = TraversalUtil.page(iterator); - count++; + count += size; } return count; } diff --git a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexCoreTest.java b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexCoreTest.java index 196589f361..df73ef0c82 100644 --- a/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexCoreTest.java +++ b/hugegraph-test/src/main/java/com/baidu/hugegraph/core/VertexCoreTest.java @@ -6561,7 +6561,49 @@ public void testQueryByPage() { } @Test - public void testQueryByPageResultsMatched() { + public void testQueryByPageResultsMatchedAll() { + Assume.assumeTrue("Not support paging", + storeFeatures().supportsQueryByPage()); + + HugeGraph graph = graph(); + init100Books(); + + List all = graph.traversal().V().toList(); + + GraphTraversal iter; + + String page = PageInfo.PAGE_NONE; + int size = 22; + + Set pageAll = new HashSet<>(); + for (int i = 0; i < 100 / size; i++) { + iter = graph.traversal().V() + .has("~page", page).limit(size); + @SuppressWarnings("unchecked") + List vertices = IteratorUtils.asList(iter); + Assert.assertEquals(size, vertices.size()); + + pageAll.addAll(vertices); + + page = TraversalUtil.page(iter); + CloseableIterator.closeIterator(iter); + } + + iter = graph.traversal().V() + .has("~page", page).limit(size); + @SuppressWarnings("unchecked") + List vertices = IteratorUtils.asList(iter); + Assert.assertEquals(12, vertices.size()); + pageAll.addAll(vertices); + page = TraversalUtil.page(iter); + + Assert.assertEquals(100, pageAll.size()); + Assert.assertTrue(all.containsAll(pageAll)); + Assert.assertNull(page); + } + + @Test + public void testQueryByPageResultsMatchedAllWithFullPage() { Assume.assumeTrue("Not support paging", storeFeatures().supportsQueryByPage()); @@ -6590,6 +6632,15 @@ public void testQueryByPageResultsMatched() { } Assert.assertEquals(100, pageAll.size()); Assert.assertTrue(all.containsAll(pageAll)); + + if (page != null) { + iter = graph.traversal().V().has("~page", page); + long count = IteratorUtils.count(iter); + Assert.assertEquals(0L, count); + + page = TraversalUtil.page(iter); + CloseableIterator.closeIterator(iter); + } Assert.assertNull(page); } From 92c91dcfaedd96e2bb9b51e716b487fcf45615d7 Mon Sep 17 00:00:00 2001 From: Zhangmei Li Date: Sun, 25 Apr 2021 21:29:51 +0800 Subject: [PATCH 3/3] fix page=null check Change-Id: I8abe05cbdc95c28bc384e73d554a30300042f3bb --- .../main/java/com/baidu/hugegraph/backend/page/PageState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java index 19311e2b3b..cfd75f6bc4 100644 --- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java +++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/page/PageState.java @@ -72,7 +72,7 @@ private byte[] toBytes() { } public static PageState fromString(String page) { - E.checkNotNull("page", "page"); + E.checkNotNull(page, "page"); return fromBytes(toBytes(page)); }