From eb7d5a2f12d081438eecded23774290db548f6dc Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 27 Jun 2025 17:26:24 +0200 Subject: [PATCH 01/13] feat: stream() and list() objects in collection CursorSpliterator powers 2 patterns for iterating over objects: - list() returns an Iterable that can be used in a for-loop - stream() presents the internal Iterator via a familiar Stream API --- .../weaviate/integration/PaginationITest.java | 63 +++++++++++++++++++ .../v1/api/collections/CollectionHandle.java | 20 ++++++ .../v1/api/collections/CursorSpliterator.java | 62 ++++++++++++++++++ 3 files changed, 145 insertions(+) create mode 100644 src/it/java/io/weaviate/integration/PaginationITest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/CursorSpliterator.java diff --git a/src/it/java/io/weaviate/integration/PaginationITest.java b/src/it/java/io/weaviate/integration/PaginationITest.java new file mode 100644 index 000000000..f57955aec --- /dev/null +++ b/src/it/java/io/weaviate/integration/PaginationITest.java @@ -0,0 +1,63 @@ +package io.weaviate.integration; + +import static org.junit.Assume.assumeTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import io.weaviate.ConcurrentTest; +import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.collections.WeaviateMetadata; +import io.weaviate.client6.v1.api.collections.WeaviateObject; +import io.weaviate.containers.Container; + +public class PaginationITest extends ConcurrentTest { + private static WeaviateClient client = Container.WEAVIATE.getClient(); + + @Test + public void test_stream() throws IOException { + // Arrange + var nsThings = ns("Things"); + var count = 10; + + client.collections.create(nsThings); + var things = client.collections.use(nsThings); + + var inserted = new ArrayList(); + for (var i = 0; i < count; i++) { + var object = things.data.insert(Collections.emptyMap()); + inserted.add(object.metadata().uuid()); + } + assumeTrue("all objects were inserted", inserted.size() == count); + + // Act: stream + var gotStream = things.stream() + .map(WeaviateObject::metadata).map(WeaviateMetadata::uuid).toList(); + + // Assert + Assertions.assertThat(gotStream) + .as("stream fetched all objects") + .hasSize(inserted.size()) + .containsAll(inserted); + + // Act: list + var gotList = new ArrayList(); + for (var object : things.list()) { + gotList.add(object.metadata().uuid()); + } + + // Assert + Assertions.assertThat(gotList) + .as("list fetched all objects") + .hasSize(inserted.size()) + .containsAll(inserted); + + Assertions.assertThat(gotStream) + .as("stream and list return consistent order") + .containsExactlyElementsOf(gotList); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandle.java b/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandle.java index 2dd4529bd..7cc01e714 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandle.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandle.java @@ -1,8 +1,14 @@ package io.weaviate.client6.v1.api.collections; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + import io.weaviate.client6.v1.api.collections.aggregate.WeaviateAggregateClient; import io.weaviate.client6.v1.api.collections.config.WeaviateConfigClient; import io.weaviate.client6.v1.api.collections.data.WeaviateDataClient; +import io.weaviate.client6.v1.api.collections.query.QueryMetadata; import io.weaviate.client6.v1.api.collections.query.WeaviateQueryClient; import io.weaviate.client6.v1.internal.grpc.GrpcTransport; import io.weaviate.client6.v1.internal.orm.CollectionDescriptor; @@ -24,4 +30,18 @@ public CollectionHandle( this.data = new WeaviateDataClient<>(collectionDescriptor, restTransport, this.query); this.aggregate = new WeaviateAggregateClient(collectionDescriptor, grpcTransport); } + + public Stream> stream() { + return StreamSupport.stream(spliterator(2), false); + } + + public Iterable> list() { + return () -> Spliterators.iterator(spliterator(2)); + } + + private Spliterator> spliterator(int batchSize) { + return new CursorSpliterator<>(batchSize, + (after, limit) -> this.query.fetchObjects( + query -> query.after(after).limit(limit)).objects()); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/CursorSpliterator.java b/src/main/java/io/weaviate/client6/v1/api/collections/CursorSpliterator.java new file mode 100644 index 000000000..d220ee9e6 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/collections/CursorSpliterator.java @@ -0,0 +1,62 @@ +package io.weaviate.client6.v1.api.collections; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.function.BiFunction; +import java.util.function.Consumer; + +import io.weaviate.client6.v1.api.collections.query.QueryMetadata; + +class CursorSpliterator implements Spliterator> { + private final int batchSize; + private final BiFunction>> fetch; + + // Spliterators do not promise thread-safety, so there's no mechanism + // to protect access to its internal state. + private String cursor; + private Iterator> currentPage = Collections.emptyIterator(); + + public CursorSpliterator(int batchSize, + BiFunction>> fetch) { + this.batchSize = batchSize; + this.fetch = fetch; + } + + @Override + public boolean tryAdvance(Consumer> action) { + // Happy path: there are remaining objects in the current page. + if (currentPage.hasNext()) { + action.accept(currentPage.next()); + return true; + } + + // It's OK for the cursor to be null, because it's String (object). + var nextPage = fetch.apply(cursor, batchSize); + if (nextPage.isEmpty()) { + return false; + } + cursor = nextPage.get(nextPage.size() - 1).metadata().uuid(); + currentPage = nextPage.iterator(); + return tryAdvance(action); + } + + @Override + public Spliterator> trySplit() { + // Do not support splitting just now; + return null; + } + + @Override + public long estimateSize() { + // CursorSpliterator does not have SIZED characteristic, so this is our + // best-effort estimate. The number of objects in the db is unbounded. + return Long.MAX_VALUE; + } + + @Override + public int characteristics() { + return ORDERED | NONNULL; + } +} From b2badd1d488f53208d7e5d2f5128a8a70d6f6655 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 30 Jun 2025 11:10:18 +0200 Subject: [PATCH 02/13] refactor: move .stream() and .iterator() begin paginate namespace This lets us keep all configurations 'on the left' of the operator and all operations on the right --- .../weaviate/integration/PaginationITest.java | 47 ++++++++--- .../v1/api/collections/CollectionHandle.java | 22 ++---- .../{ => pagination}/CursorSpliterator.java | 14 ++-- .../api/collections/pagination/Paginator.java | 77 +++++++++++++++++++ 4 files changed, 130 insertions(+), 30 deletions(-) rename src/main/java/io/weaviate/client6/v1/api/collections/{ => pagination}/CursorSpliterator.java (80%) create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java diff --git a/src/it/java/io/weaviate/integration/PaginationITest.java b/src/it/java/io/weaviate/integration/PaginationITest.java index f57955aec..4b920e749 100644 --- a/src/it/java/io/weaviate/integration/PaginationITest.java +++ b/src/it/java/io/weaviate/integration/PaginationITest.java @@ -19,10 +19,10 @@ public class PaginationITest extends ConcurrentTest { private static WeaviateClient client = Container.WEAVIATE.getClient(); @Test - public void test_stream() throws IOException { + public void testIterateAll() throws IOException { // Arrange var nsThings = ns("Things"); - var count = 10; + var count = 150; client.collections.create(nsThings); var things = client.collections.use(nsThings); @@ -34,8 +34,10 @@ public void test_stream() throws IOException { } assumeTrue("all objects were inserted", inserted.size() == count); + var allThings = things.paginate(); + // Act: stream - var gotStream = things.stream() + var gotStream = allThings.stream() .map(WeaviateObject::metadata).map(WeaviateMetadata::uuid).toList(); // Assert @@ -44,20 +46,47 @@ public void test_stream() throws IOException { .hasSize(inserted.size()) .containsAll(inserted); - // Act: list - var gotList = new ArrayList(); - for (var object : things.list()) { - gotList.add(object.metadata().uuid()); + // Act: for-loop + var gotLoop = new ArrayList(); + for (var thing : allThings) { + gotLoop.add(thing.metadata().uuid()); } // Assert - Assertions.assertThat(gotList) + Assertions.assertThat(gotLoop) .as("list fetched all objects") .hasSize(inserted.size()) .containsAll(inserted); Assertions.assertThat(gotStream) .as("stream and list return consistent order") - .containsExactlyElementsOf(gotList); + .containsExactlyElementsOf(gotLoop); + } + + @Test + public void testResumePagination() throws IOException { + // Arrange + var nsThings = ns("Things"); + var count = 10; + + client.collections.create(nsThings); + + var things = client.collections.use(nsThings); + var inserted = new ArrayList(); + for (var i = 0; i < count; i++) { + var object = things.data.insert(Collections.emptyMap()); + inserted.add(object.metadata().uuid()); + } + + // Iterate over first 5 objects + String lastId = things.paginate(p -> p.pageSize(5)).stream() + .limit(5).map(thing -> thing.metadata().uuid()) + .reduce((prev, next) -> next).get(); + + // Act + var remaining = things.paginate(p -> p.resumeFrom(lastId)).stream().count(); + + // Assert + Assertions.assertThat(remaining).isEqualTo(5); } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandle.java b/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandle.java index 7cc01e714..802c324f1 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandle.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandle.java @@ -1,15 +1,13 @@ package io.weaviate.client6.v1.api.collections; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; +import java.util.function.Function; import io.weaviate.client6.v1.api.collections.aggregate.WeaviateAggregateClient; import io.weaviate.client6.v1.api.collections.config.WeaviateConfigClient; import io.weaviate.client6.v1.api.collections.data.WeaviateDataClient; -import io.weaviate.client6.v1.api.collections.query.QueryMetadata; +import io.weaviate.client6.v1.api.collections.pagination.Paginator; import io.weaviate.client6.v1.api.collections.query.WeaviateQueryClient; +import io.weaviate.client6.v1.internal.ObjectBuilder; import io.weaviate.client6.v1.internal.grpc.GrpcTransport; import io.weaviate.client6.v1.internal.orm.CollectionDescriptor; import io.weaviate.client6.v1.internal.rest.RestTransport; @@ -31,17 +29,11 @@ public CollectionHandle( this.aggregate = new WeaviateAggregateClient(collectionDescriptor, grpcTransport); } - public Stream> stream() { - return StreamSupport.stream(spliterator(2), false); + public Paginator paginate() { + return Paginator.of(this.query); } - public Iterable> list() { - return () -> Spliterators.iterator(spliterator(2)); - } - - private Spliterator> spliterator(int batchSize) { - return new CursorSpliterator<>(batchSize, - (after, limit) -> this.query.fetchObjects( - query -> query.after(after).limit(limit)).objects()); + public Paginator paginate(Function, ObjectBuilder>> fn) { + return Paginator.of(this.query, fn); } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/CursorSpliterator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java similarity index 80% rename from src/main/java/io/weaviate/client6/v1/api/collections/CursorSpliterator.java rename to src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java index d220ee9e6..442c62726 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/CursorSpliterator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java @@ -1,4 +1,4 @@ -package io.weaviate.client6.v1.api.collections; +package io.weaviate.client6.v1.api.collections.pagination; import java.util.Collections; import java.util.Iterator; @@ -7,10 +7,11 @@ import java.util.function.BiFunction; import java.util.function.Consumer; +import io.weaviate.client6.v1.api.collections.WeaviateObject; import io.weaviate.client6.v1.api.collections.query.QueryMetadata; -class CursorSpliterator implements Spliterator> { - private final int batchSize; +public class CursorSpliterator implements Spliterator> { + private final int pageSize; private final BiFunction>> fetch; // Spliterators do not promise thread-safety, so there's no mechanism @@ -18,9 +19,10 @@ class CursorSpliterator implements Spliterator> currentPage = Collections.emptyIterator(); - public CursorSpliterator(int batchSize, + public CursorSpliterator(String cursor, int pageSize, BiFunction>> fetch) { - this.batchSize = batchSize; + this.cursor = cursor; + this.pageSize = pageSize; this.fetch = fetch; } @@ -33,7 +35,7 @@ public boolean tryAdvance(Consumer implements Iterable> { + private static final int DEFAULT_PAGE_SIZE = 100; + + private final WeaviateQueryClient query; + private final int pageSize; + private final String cursor; + + @Override + public Iterator> iterator() { + return Spliterators.iterator(spliterator()); + } + + public Stream> stream() { + return StreamSupport.stream(spliterator(), false); + } + + public Spliterator> spliterator() { + return new CursorSpliterator(cursor, pageSize, + (after, limit) -> query.fetchObjects(q -> q.after(after).limit(limit)).objects()); + } + + public static Paginator of(WeaviateQueryClient query) { + return of(query, ObjectBuilder.identity()); + } + + public static Paginator of(WeaviateQueryClient query, + Function, ObjectBuilder>> fn) { + return fn.apply(new Builder<>(query)).build(); + } + + Paginator(Builder builder) { + this.query = builder.query; + this.cursor = builder.cursor; + this.pageSize = builder.pageSize; + } + + public static class Builder implements ObjectBuilder> { + private final WeaviateQueryClient query; + + int pageSize = DEFAULT_PAGE_SIZE; + String cursor; + + public Builder(WeaviateQueryClient query) { + this.query = query; + } + + public Builder pageSize(int pageSize) { + this.pageSize = pageSize; + return this; + } + + public Builder resumeFrom(String uuid) { + this.cursor = uuid; + return this; + } + + @Override + public Paginator build() { + return new Paginator<>(this); + } + } + +} From f7197e3fa3a4629599e4dd1aae5ff7594c046f4b Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 30 Jun 2025 12:38:54 +0200 Subject: [PATCH 03/13] feat: set query options in Paginator ObjectBuilder.partial is a new util for composing ObjectBuilder-style functions. --- .../weaviate/integration/PaginationITest.java | 28 +++++++++++ .../api/collections/pagination/Paginator.java | 49 +++++++++++++++++-- .../collections/query/BaseQueryOptions.java | 18 +++++-- .../client6/v1/internal/ObjectBuilder.java | 5 ++ 4 files changed, 94 insertions(+), 6 deletions(-) diff --git a/src/it/java/io/weaviate/integration/PaginationITest.java b/src/it/java/io/weaviate/integration/PaginationITest.java index 4b920e749..359159f16 100644 --- a/src/it/java/io/weaviate/integration/PaginationITest.java +++ b/src/it/java/io/weaviate/integration/PaginationITest.java @@ -11,6 +11,7 @@ import io.weaviate.ConcurrentTest; import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.collections.Property; import io.weaviate.client6.v1.api.collections.WeaviateMetadata; import io.weaviate.client6.v1.api.collections.WeaviateObject; import io.weaviate.containers.Container; @@ -89,4 +90,31 @@ public void testResumePagination() throws IOException { // Assert Assertions.assertThat(remaining).isEqualTo(5); } + + @Test + public void testWithQueryOptions() throws IOException { + // Arrange + var nsThings = ns("Things"); + var count = 10; + + client.collections.create(nsThings, + c -> c.properties( + Property.text("fetch_me"), + Property.integer("dont_fetch"))); + + var things = client.collections.use(nsThings); + var inserted = new ArrayList(); + for (var i = 0; i < count; i++) { + var object = things.data.insert(Collections.emptyMap()); + inserted.add(object.metadata().uuid()); + } + + // Act / Assert + var withSomeProperties = things.paginate(p -> p.returnProperties("fetch_me")); + for (var thing : withSomeProperties) { + Assertions.assertThat(thing.properties()) + .as("uuid=" + thing.metadata().uuid()) + .doesNotContainKey("dont_fetch"); + } + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java index f39118b42..77ed1ec12 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java @@ -1,6 +1,7 @@ package io.weaviate.client6.v1.api.collections.pagination; import java.util.Iterator; +import java.util.List; import java.util.Spliterator; import java.util.Spliterators; import java.util.function.Function; @@ -8,7 +9,10 @@ import java.util.stream.StreamSupport; import io.weaviate.client6.v1.api.collections.WeaviateObject; +import io.weaviate.client6.v1.api.collections.query.FetchObjects; +import io.weaviate.client6.v1.api.collections.query.Metadata; import io.weaviate.client6.v1.api.collections.query.QueryMetadata; +import io.weaviate.client6.v1.api.collections.query.QueryReference; import io.weaviate.client6.v1.api.collections.query.WeaviateQueryClient; import io.weaviate.client6.v1.internal.ObjectBuilder; @@ -16,6 +20,7 @@ public class Paginator implements Iterable query; + private final Function> queryOptions; private final int pageSize; private final String cursor; @@ -30,7 +35,10 @@ public Stream> stream() { public Spliterator> spliterator() { return new CursorSpliterator(cursor, pageSize, - (after, limit) -> query.fetchObjects(q -> q.after(after).limit(limit)).objects()); + (after, limit) -> { + var fn = ObjectBuilder.partial(queryOptions, q -> q.after(after).limit(limit)); + return query.fetchObjects(fn).objects(); + }); } public static Paginator of(WeaviateQueryClient query) { @@ -44,6 +52,7 @@ public static Paginator of(WeaviateQueryClient query, Paginator(Builder builder) { this.query = builder.query; + this.queryOptions = builder.queryOptions; this.cursor = builder.cursor; this.pageSize = builder.pageSize; } @@ -51,13 +60,16 @@ public static Paginator of(WeaviateQueryClient query, public static class Builder implements ObjectBuilder> { private final WeaviateQueryClient query; - int pageSize = DEFAULT_PAGE_SIZE; - String cursor; + private Function> queryOptions = ObjectBuilder.identity(); + private int pageSize = DEFAULT_PAGE_SIZE; + private String cursor; public Builder(WeaviateQueryClient query) { this.query = query; } + // Pagination options ----------------------------------------------------- + public Builder pageSize(int pageSize) { this.pageSize = pageSize; return this; @@ -68,6 +80,37 @@ public Builder resumeFrom(String uuid) { return this; } + // Query options ---------------------------------------------------------- + + public final Builder returnProperties(String... properties) { + return applyQueryOption(q -> q.returnProperties(properties)); + } + + public final Builder returnProperties(List properties) { + return applyQueryOption(q -> q.returnProperties(properties)); + } + + public final Builder returnReferences(QueryReference... references) { + return applyQueryOption(q -> q.returnReferences(references)); + } + + public final Builder returnReferences(List references) { + return applyQueryOption(q -> q.returnReferences(references)); + } + + public final Builder returnMetadata(Metadata... metadata) { + return applyQueryOption(q -> q.returnMetadata(metadata)); + } + + public final Builder returnMetadata(List metadata) { + return applyQueryOption(q -> q.returnMetadata(metadata)); + } + + private final Builder applyQueryOption(Function options) { + this.queryOptions = ObjectBuilder.partial(this.queryOptions, options); + return this; + } + @Override public Paginator build() { return new Paginator<>(this); diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/query/BaseQueryOptions.java b/src/main/java/io/weaviate/client6/v1/api/collections/query/BaseQueryOptions.java index bcf4bd59d..c67bf64c1 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/query/BaseQueryOptions.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/query/BaseQueryOptions.java @@ -86,17 +86,29 @@ public final SELF where(Where where) { } public final SELF returnProperties(String... properties) { - this.returnProperties = Arrays.asList(properties); + return returnProperties(Arrays.asList(properties)); + } + + public final SELF returnProperties(List properties) { + this.returnProperties.addAll(properties); return (SELF) this; } public final SELF returnReferences(QueryReference... references) { - this.returnReferences = Arrays.asList(references); + return returnReferences(Arrays.asList(references)); + } + + public final SELF returnReferences(List references) { + this.returnReferences.addAll(references); return (SELF) this; } public final SELF returnMetadata(Metadata... metadata) { - this.returnMetadata = Arrays.asList(metadata); + return returnMetadata(metadata); + } + + public final SELF returnMetadata(List metadata) { + this.returnMetadata.addAll(metadata); return (SELF) this; } diff --git a/src/main/java/io/weaviate/client6/v1/internal/ObjectBuilder.java b/src/main/java/io/weaviate/client6/v1/internal/ObjectBuilder.java index 9d6ffe567..550b4266d 100644 --- a/src/main/java/io/weaviate/client6/v1/internal/ObjectBuilder.java +++ b/src/main/java/io/weaviate/client6/v1/internal/ObjectBuilder.java @@ -8,4 +8,9 @@ public interface ObjectBuilder { static , T> Function> identity() { return builder -> builder; } + + static , T> Function> partial(Function> fn, + Function partialFn) { + return partialFn.andThen(fn); + } } From b7d58f46ddad0a7b1d14b97744e5bf9230a5d55d Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 30 Jun 2025 12:44:09 +0200 Subject: [PATCH 04/13] fix: remove unsupported Where overloads E.g.: Boolean properties cannot be compared using gt(-e) and lt(-e) operators. Also LIKE operator only makes sense to text properties and should accept a single pattern. --- .../v1/api/collections/query/Where.java | 72 ------------------- 1 file changed, 72 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/query/Where.java b/src/main/java/io/weaviate/client6/v1/api/collections/query/Where.java index 0542db68e..09c460acb 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/query/Where.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/query/Where.java @@ -217,14 +217,6 @@ public Where lt(String... values) { return new Where(Operator.LESS_THAN, left, new TextArrayOperand(values)); } - public Where lt(Boolean value) { - return new Where(Operator.LESS_THAN, left, new BooleanOperand(value)); - } - - public Where lt(Boolean... values) { - return new Where(Operator.LESS_THAN, left, new BooleanArrayOperand(values)); - } - public Where lt(Integer value) { return new Where(Operator.LESS_THAN, left, new IntegerOperand(value)); } @@ -263,14 +255,6 @@ public Where lte(String... values) { return new Where(Operator.LESS_THAN_EQUAL, left, new TextArrayOperand(values)); } - public Where lte(Boolean value) { - return new Where(Operator.LESS_THAN_EQUAL, left, new BooleanOperand(value)); - } - - public Where lte(Boolean... values) { - return new Where(Operator.LESS_THAN_EQUAL, left, new BooleanArrayOperand(values)); - } - public Where lte(Integer value) { return new Where(Operator.LESS_THAN_EQUAL, left, new IntegerOperand(value)); } @@ -309,14 +293,6 @@ public Where gt(String... values) { return new Where(Operator.GREATER_THAN, left, new TextArrayOperand(values)); } - public Where gt(Boolean value) { - return new Where(Operator.GREATER_THAN, left, new BooleanOperand(value)); - } - - public Where gt(Boolean... values) { - return new Where(Operator.GREATER_THAN, left, new BooleanArrayOperand(values)); - } - public Where gt(Integer value) { return new Where(Operator.GREATER_THAN, left, new IntegerOperand(value)); } @@ -355,14 +331,6 @@ public Where gte(String... values) { return new Where(Operator.GREATER_THAN_EQUAL, left, new TextArrayOperand(values)); } - public Where gte(Boolean value) { - return new Where(Operator.GREATER_THAN_EQUAL, left, new BooleanOperand(value)); - } - - public Where gte(Boolean... values) { - return new Where(Operator.GREATER_THAN_EQUAL, left, new BooleanArrayOperand(values)); - } - public Where gte(Integer value) { return new Where(Operator.GREATER_THAN_EQUAL, left, new IntegerOperand(value)); } @@ -397,46 +365,6 @@ public Where like(String value) { return new Where(Operator.LIKE, left, new TextOperand(value)); } - public Where like(String... values) { - return new Where(Operator.LIKE, left, new TextArrayOperand(values)); - } - - public Where like(Boolean value) { - return new Where(Operator.LIKE, left, new BooleanOperand(value)); - } - - public Where like(Boolean... values) { - return new Where(Operator.LIKE, left, new BooleanArrayOperand(values)); - } - - public Where like(Integer value) { - return new Where(Operator.LIKE, left, new IntegerOperand(value)); - } - - public Where like(Integer... values) { - return new Where(Operator.LIKE, left, new IntegerArrayOperand(values)); - } - - public Where like(Number value) { - return new Where(Operator.LIKE, left, new NumberOperand(value.doubleValue())); - } - - public Where like(Number... values) { - return new Where(Operator.LIKE, left, new NumberArrayOperand(values)); - } - - public Where like(Date value) { - return new Where(Operator.LIKE, left, new DateOperand(value)); - } - - public Where like(Date... values) { - return new Where(Operator.LIKE, left, new DateArrayOperand(values)); - } - - public Where like(Object value) { - return new Where(Operator.LIKE, left, fromObject(value)); - } - // ContainsAny // ------------------------------------------------------------------------ public Where containsAny(String value) { From 5a7d93e13ba8d728be5e4a6bedec508072fdb540 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 30 Jun 2025 12:57:43 +0200 Subject: [PATCH 05/13] feat: add DISTICT characteristic to CursorSpliterator --- .../weaviate/integration/PaginationITest.java | 25 +++++++++++++++++++ .../collections/CollectionHandleAsync.java | 3 +++ .../pagination/CursorSpliterator.java | 2 +- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/it/java/io/weaviate/integration/PaginationITest.java b/src/it/java/io/weaviate/integration/PaginationITest.java index 359159f16..676365c5a 100644 --- a/src/it/java/io/weaviate/integration/PaginationITest.java +++ b/src/it/java/io/weaviate/integration/PaginationITest.java @@ -5,6 +5,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -117,4 +119,27 @@ public void testWithQueryOptions() throws IOException { .doesNotContainKey("dont_fetch"); } } + + @Test + public void testStreamAsync() throws IOException, InterruptedException, ExecutionException { + // Arrange + var nsThings = ns("Things"); + var count = 10; + + client.collections.create(nsThings); + + try (final var async = client.async()) { + var things = async.collections.use(nsThings); + + var futures = new CompletableFuture[count]; + var inserted = new ArrayList(); + for (var i = 0; i < count; i++) { + futures[i] = things.data.insert(Collections.emptyMap()) + .thenAccept(object -> inserted.add(object.metadata().uuid())); + } + CompletableFuture.allOf(futures).get(); + + var asyncPaginator = things.paginate(); + } + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandleAsync.java b/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandleAsync.java index ec41a5a21..5723f3a53 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandleAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandleAsync.java @@ -24,4 +24,7 @@ public CollectionHandleAsync( this.data = new WeaviateDataClientAsync<>(collectionDescriptor, restTransport, this.query); this.aggregate = new WeaviateAggregateClientAsync(collectionDescriptor, grpcTransport); } + + public void paginate() { + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java index 442c62726..5c3c0fdf7 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java @@ -59,6 +59,6 @@ public long estimateSize() { @Override public int characteristics() { - return ORDERED | NONNULL; + return ORDERED | DISTINCT | NONNULL; } } From 2c8c0396a57618d2a4ced5bf75020096adcab91c Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 30 Jun 2025 17:07:10 +0200 Subject: [PATCH 06/13] feat: first iteration on AsyncPaginator Does not support Stream or Spliterator / Iterator APIs, because those are inherently synchronous. --- .../weaviate/integration/PaginationITest.java | 19 ++- .../collections/CollectionHandleAsync.java | 20 ++- .../pagination/AsyncPaginator.java | 140 ++++++++++++++++++ .../pagination/AsyncResultSet.java | 57 +++++++ .../api/collections/pagination/Paginator.java | 3 +- 5 files changed, 230 insertions(+), 9 deletions(-) create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java diff --git a/src/it/java/io/weaviate/integration/PaginationITest.java b/src/it/java/io/weaviate/integration/PaginationITest.java index 676365c5a..d48258ac6 100644 --- a/src/it/java/io/weaviate/integration/PaginationITest.java +++ b/src/it/java/io/weaviate/integration/PaginationITest.java @@ -7,8 +7,10 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; import org.assertj.core.api.Assertions; +import org.junit.Assume; import org.junit.Test; import io.weaviate.ConcurrentTest; @@ -121,7 +123,7 @@ public void testWithQueryOptions() throws IOException { } @Test - public void testStreamAsync() throws IOException, InterruptedException, ExecutionException { + public void testAsyncPaginator() throws IOException, InterruptedException, ExecutionException { // Arrange var nsThings = ns("Things"); var count = 10; @@ -139,7 +141,20 @@ public void testStreamAsync() throws IOException, InterruptedException, Executio } CompletableFuture.allOf(futures).get(); - var asyncPaginator = things.paginate(); + // Act + var objectCount = new AtomicInteger(); + var countAll = things.paginate(p -> p.pageSize(5)) + .forEach(__ -> objectCount.getAndIncrement()); + + // Assert + if (!countAll.isDone()) { + Assume.assumeTrue("iteration not completed", objectCount.get() < count); + } + + countAll.get(); // Wait for it to complete. + Assertions.assertThat(objectCount.get()) + .as("object count after iteration completed") + .isEqualTo(count); } } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandleAsync.java b/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandleAsync.java index 5723f3a53..b08bb2471 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandleAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/CollectionHandleAsync.java @@ -1,23 +1,27 @@ package io.weaviate.client6.v1.api.collections; +import java.util.function.Function; + import io.weaviate.client6.v1.api.collections.aggregate.WeaviateAggregateClientAsync; import io.weaviate.client6.v1.api.collections.config.WeaviateConfigClientAsync; import io.weaviate.client6.v1.api.collections.data.WeaviateDataClientAsync; +import io.weaviate.client6.v1.api.collections.pagination.AsyncPaginator; import io.weaviate.client6.v1.api.collections.query.WeaviateQueryClientAsync; +import io.weaviate.client6.v1.internal.ObjectBuilder; import io.weaviate.client6.v1.internal.grpc.GrpcTransport; import io.weaviate.client6.v1.internal.orm.CollectionDescriptor; import io.weaviate.client6.v1.internal.rest.RestTransport; -public class CollectionHandleAsync { +public class CollectionHandleAsync { public final WeaviateConfigClientAsync config; - public final WeaviateDataClientAsync data; - public final WeaviateQueryClientAsync query; + public final WeaviateDataClientAsync data; + public final WeaviateQueryClientAsync query; public final WeaviateAggregateClientAsync aggregate; public CollectionHandleAsync( RestTransport restTransport, GrpcTransport grpcTransport, - CollectionDescriptor collectionDescriptor) { + CollectionDescriptor collectionDescriptor) { this.config = new WeaviateConfigClientAsync(collectionDescriptor, restTransport, grpcTransport); this.query = new WeaviateQueryClientAsync<>(collectionDescriptor, grpcTransport); @@ -25,6 +29,12 @@ public CollectionHandleAsync( this.aggregate = new WeaviateAggregateClientAsync(collectionDescriptor, grpcTransport); } - public void paginate() { + public AsyncPaginator paginate() { + return AsyncPaginator.of(this.query); + } + + public AsyncPaginator paginate( + Function, ObjectBuilder>> fn) { + return AsyncPaginator.of(this.query, fn); } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java new file mode 100644 index 000000000..02b5d83c8 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java @@ -0,0 +1,140 @@ +package io.weaviate.client6.v1.api.collections.pagination; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; + +import io.weaviate.client6.v1.api.collections.WeaviateObject; +import io.weaviate.client6.v1.api.collections.query.FetchObjects; +import io.weaviate.client6.v1.api.collections.query.Metadata; +import io.weaviate.client6.v1.api.collections.query.QueryMetadata; +import io.weaviate.client6.v1.api.collections.query.QueryReference; +import io.weaviate.client6.v1.api.collections.query.QueryResponse; +import io.weaviate.client6.v1.api.collections.query.WeaviateQueryClientAsync; +import io.weaviate.client6.v1.internal.ObjectBuilder; + +public class AsyncPaginator { + private final WeaviateQueryClientAsync query; + private final Function> queryOptions; + private final int pageSize; + private final String cursor; + + private CompletableFuture> resultSet; + + public AsyncPaginator(Builder builder) { + this.query = builder.query; + this.queryOptions = builder.queryOptions; + this.pageSize = builder.pageSize; + this.cursor = builder.cursor; + + var rs = new AsyncResultSet( + cursor, + pageSize, + (after, limit) -> { + var fn = ObjectBuilder.partial(queryOptions, q -> q.after(after).limit(limit)); + return this.query.fetchObjects(fn).thenApply(QueryResponse::objects); + }); + + this.resultSet = CompletableFuture.completedFuture(rs); + } + + public CompletableFuture forEach(Consumer> action) { + return resultSet + .thenCompose(AsyncResultSet::fetchNextPage) + .thenCompose(processPageAndAdvance(action)); + } + + public Function, CompletableFuture> processPageAndAdvance( + Consumer> action) { + return rs -> { + // Empty result set means there were no more objects to fetch. + if (rs.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + + // Apply provided callback for each method -- consume current page. + for (var object : rs) { + action.accept(object); + } + + // Advance iteration. + return rs.fetchNextPage().thenCompose(processPageAndAdvance(action)); + }; + } + + public static AsyncPaginator of(WeaviateQueryClientAsync query) { + return of(query, ObjectBuilder.identity()); + } + + public static AsyncPaginator of(WeaviateQueryClientAsync query, + Function, ObjectBuilder>> fn) { + return fn.apply(new Builder<>(query)).build(); + } + + public static class Builder implements ObjectBuilder> { + private final WeaviateQueryClientAsync query; + + private Function> queryOptions = ObjectBuilder.identity(); + private int pageSize = Paginator.DEFAULT_PAGE_SIZE; + private String cursor; + private boolean prefetch = false; + + public Builder(WeaviateQueryClientAsync query) { + this.query = query; + } + + // Pagination options ----------------------------------------------------- + + public Builder pageSize(int pageSize) { + this.pageSize = pageSize; + return this; + } + + public Builder resumeFrom(String uuid) { + this.cursor = uuid; + return this; + } + + public Builder prefetch(boolean enable) { + this.prefetch = enable; + return this; + } + + // Query options ---------------------------------------------------------- + + public final Builder returnProperties(String... properties) { + return applyQueryOption(q -> q.returnProperties(properties)); + } + + public final Builder returnProperties(List properties) { + return applyQueryOption(q -> q.returnProperties(properties)); + } + + public final Builder returnReferences(QueryReference... references) { + return applyQueryOption(q -> q.returnReferences(references)); + } + + public final Builder returnReferences(List references) { + return applyQueryOption(q -> q.returnReferences(references)); + } + + public final Builder returnMetadata(Metadata... metadata) { + return applyQueryOption(q -> q.returnMetadata(metadata)); + } + + public final Builder returnMetadata(List metadata) { + return applyQueryOption(q -> q.returnMetadata(metadata)); + } + + private final Builder applyQueryOption(Function options) { + this.queryOptions = ObjectBuilder.partial(this.queryOptions, options); + return this; + } + + @Override + public AsyncPaginator build() { + return new AsyncPaginator<>(this); + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java new file mode 100644 index 000000000..9df4966f4 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java @@ -0,0 +1,57 @@ +package io.weaviate.client6.v1.api.collections.pagination; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; + +import io.weaviate.client6.v1.api.collections.WeaviateObject; +import io.weaviate.client6.v1.api.collections.query.QueryMetadata; + +public class AsyncResultSet implements Iterable> { + + private final int pageSize; + private final BiFunction>>> fetch; + + private String cursor; + private List> currentPage = new ArrayList<>(); + + AsyncResultSet(String cursor, int pageSize, + BiFunction>>> fetch) { + this.cursor = cursor; + this.pageSize = pageSize; + this.fetch = fetch; + } + + AsyncResultSet(String cursor, int pageSize, + BiFunction>>> fetch, + List> currentPage) { + this(cursor, pageSize, fetch); + this.currentPage = currentPage; + } + + public Iterable> currentPage() { + return currentPage; + } + + public boolean isEmpty() { + return this.currentPage.isEmpty(); + } + + public CompletableFuture> fetchNextPage() { + return fetch.apply(cursor, pageSize) + .thenApply(nextPage -> { + if (nextPage.isEmpty()) { + return new AsyncResultSet<>(null, pageSize, fetch, nextPage); + } + var last = nextPage.get(nextPage.size() - 1); + return new AsyncResultSet<>(last.metadata().uuid(), pageSize, fetch, nextPage); + }); + } + + @Override + public Iterator> iterator() { + return currentPage.iterator(); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java index 77ed1ec12..3a5ab884c 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java @@ -17,7 +17,7 @@ import io.weaviate.client6.v1.internal.ObjectBuilder; public class Paginator implements Iterable> { - private static final int DEFAULT_PAGE_SIZE = 100; + static final int DEFAULT_PAGE_SIZE = 100; private final WeaviateQueryClient query; private final Function> queryOptions; @@ -116,5 +116,4 @@ public Paginator build() { return new Paginator<>(this); } } - } From 774469eae94d0fe3d666e9a0868bcc57ba5f4569 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 30 Jun 2025 17:10:57 +0200 Subject: [PATCH 07/13] feat: support 'prefetch' in async pagination --- src/it/java/io/weaviate/integration/PaginationITest.java | 2 +- .../client6/v1/api/collections/pagination/AsyncPaginator.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/it/java/io/weaviate/integration/PaginationITest.java b/src/it/java/io/weaviate/integration/PaginationITest.java index d48258ac6..961ad4df1 100644 --- a/src/it/java/io/weaviate/integration/PaginationITest.java +++ b/src/it/java/io/weaviate/integration/PaginationITest.java @@ -143,7 +143,7 @@ public void testAsyncPaginator() throws IOException, InterruptedException, Execu // Act var objectCount = new AtomicInteger(); - var countAll = things.paginate(p -> p.pageSize(5)) + var countAll = things.paginate(p -> p.pageSize(5).prefetch(true)) .forEach(__ -> objectCount.getAndIncrement()); // Assert diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java index 02b5d83c8..14f173116 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java @@ -36,12 +36,12 @@ public AsyncPaginator(Builder builder) { return this.query.fetchObjects(fn).thenApply(QueryResponse::objects); }); - this.resultSet = CompletableFuture.completedFuture(rs); + this.resultSet = builder.prefetch ? rs.fetchNextPage() : CompletableFuture.completedFuture(rs); } public CompletableFuture forEach(Consumer> action) { return resultSet - .thenCompose(AsyncResultSet::fetchNextPage) + .thenCompose(rs -> rs.isEmpty() ? rs.fetchNextPage() : CompletableFuture.completedFuture(rs)) .thenCompose(processPageAndAdvance(action)); } From 61668e290df38b710e9a9b4cf01c2ca24bcb67af Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 30 Jun 2025 17:18:56 +0200 Subject: [PATCH 08/13] feat: provide forPage to allow processing objects in batches in user-land --- .../pagination/AsyncPaginator.java | 24 +++++++++++++++---- .../pagination/AsyncResultSet.java | 2 +- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java index 14f173116..adbc72141 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java @@ -40,13 +40,29 @@ public AsyncPaginator(Builder builder) { } public CompletableFuture forEach(Consumer> action) { + return resultSet + .thenCompose(rs -> rs.isEmpty() ? rs.fetchNextPage() : CompletableFuture.completedFuture(rs)) + .thenCompose(processEachAndAdvance(action)); + } + + public CompletableFuture forPage(Consumer>> action) { return resultSet .thenCompose(rs -> rs.isEmpty() ? rs.fetchNextPage() : CompletableFuture.completedFuture(rs)) .thenCompose(processPageAndAdvance(action)); } - public Function, CompletableFuture> processPageAndAdvance( + public Function, CompletableFuture> processEachAndAdvance( Consumer> action) { + return processAndAdvanceFunc(rs -> rs.forEach(action)); + } + + public Function, CompletableFuture> processPageAndAdvance( + Consumer>> action) { + return processAndAdvanceFunc(rs -> action.accept(rs.currentPage())); + } + + public Function, CompletableFuture> processAndAdvanceFunc( + Consumer> action) { return rs -> { // Empty result set means there were no more objects to fetch. if (rs.isEmpty()) { @@ -54,12 +70,10 @@ public Function, CompletableFuture> processPag } // Apply provided callback for each method -- consume current page. - for (var object : rs) { - action.accept(object); - } + action.accept(rs); // Advance iteration. - return rs.fetchNextPage().thenCompose(processPageAndAdvance(action)); + return rs.fetchNextPage().thenCompose(processAndAdvanceFunc(action)); }; } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java index 9df4966f4..dbd3c9512 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java @@ -31,7 +31,7 @@ public class AsyncResultSet implements Iterable> currentPage() { + public List> currentPage() { return currentPage; } From 0cc1c6e76badea3255c80d2dccd4f1164cb30213 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 30 Jun 2025 18:05:23 +0200 Subject: [PATCH 09/13] fix: break infinite recursion --- .../client6/v1/api/collections/query/BaseQueryOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/query/BaseQueryOptions.java b/src/main/java/io/weaviate/client6/v1/api/collections/query/BaseQueryOptions.java index c67bf64c1..3a2815864 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/query/BaseQueryOptions.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/query/BaseQueryOptions.java @@ -104,7 +104,7 @@ public final SELF returnReferences(List references) { } public final SELF returnMetadata(Metadata... metadata) { - return returnMetadata(metadata); + return returnMetadata(Arrays.asList(metadata)); } public final SELF returnMetadata(List metadata) { From 4e2a7d65a2a0f34037dfbe6e4272d8d1d235c9f8 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 4 Jul 2025 14:24:13 +0200 Subject: [PATCH 10/13] refactor: proofing AsyncResultSet - this.cursor should be a final field since AsyncResult will only ever work with a single page - change currentPage access to package-private to avoid confusion See: https://github.com/weaviate/java-client/pull/399#discussion_r2179553725 --- .../v1/api/collections/pagination/AsyncResultSet.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java index dbd3c9512..e0a9f40f8 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java @@ -1,6 +1,7 @@ package io.weaviate.client6.v1.api.collections.pagination; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -12,9 +13,9 @@ public class AsyncResultSet implements Iterable> { private final int pageSize; + private final String cursor; private final BiFunction>>> fetch; - private String cursor; private List> currentPage = new ArrayList<>(); AsyncResultSet(String cursor, int pageSize, @@ -28,10 +29,10 @@ public class AsyncResultSet implements Iterable>>> fetch, List> currentPage) { this(cursor, pageSize, fetch); - this.currentPage = currentPage; + this.currentPage = Collections.unmodifiableList(currentPage); } - public List> currentPage() { + List> currentPage() { return currentPage; } From 6890919a40c6fea01631a010d451841142a60511 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 4 Jul 2025 14:27:27 +0200 Subject: [PATCH 11/13] chore: make AsyncResult set a final class --- .../client6/v1/api/collections/pagination/AsyncResultSet.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java index e0a9f40f8..65e3a3ec7 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java @@ -10,12 +10,12 @@ import io.weaviate.client6.v1.api.collections.WeaviateObject; import io.weaviate.client6.v1.api.collections.query.QueryMetadata; -public class AsyncResultSet implements Iterable> { +public final class AsyncResultSet implements Iterable> { private final int pageSize; - private final String cursor; private final BiFunction>>> fetch; + private final String cursor; private List> currentPage = new ArrayList<>(); AsyncResultSet(String cursor, int pageSize, From 9e7511a5cc5c16ab3ab5c92063a3c70f12e4b54e Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 4 Jul 2025 18:29:08 +0200 Subject: [PATCH 12/13] refactor: rename AsyncResultSet -> AsyncPage for cleaner API --- .../pagination/AsyncPaginator.java | 14 ++--- .../pagination/AsyncResultSet.java | 58 ------------------- .../pagination/CursorSpliterator.java | 12 ++-- .../api/collections/pagination/Paginator.java | 14 ++--- 4 files changed, 20 insertions(+), 78 deletions(-) delete mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java index adbc72141..308a0327e 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPaginator.java @@ -20,7 +20,7 @@ public class AsyncPaginator { private final int pageSize; private final String cursor; - private CompletableFuture> resultSet; + private CompletableFuture> resultSet; public AsyncPaginator(Builder builder) { this.query = builder.query; @@ -28,7 +28,7 @@ public AsyncPaginator(Builder builder) { this.pageSize = builder.pageSize; this.cursor = builder.cursor; - var rs = new AsyncResultSet( + var rs = new AsyncPage( cursor, pageSize, (after, limit) -> { @@ -51,18 +51,18 @@ public CompletableFuture forPage(Consumer, CompletableFuture> processEachAndAdvance( + public Function, CompletableFuture> processEachAndAdvance( Consumer> action) { return processAndAdvanceFunc(rs -> rs.forEach(action)); } - public Function, CompletableFuture> processPageAndAdvance( + public Function, CompletableFuture> processPageAndAdvance( Consumer>> action) { - return processAndAdvanceFunc(rs -> action.accept(rs.currentPage())); + return processAndAdvanceFunc(rs -> action.accept(rs.items())); } - public Function, CompletableFuture> processAndAdvanceFunc( - Consumer> action) { + public Function, CompletableFuture> processAndAdvanceFunc( + Consumer> action) { return rs -> { // Empty result set means there were no more objects to fetch. if (rs.isEmpty()) { diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java deleted file mode 100644 index 65e3a3ec7..000000000 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncResultSet.java +++ /dev/null @@ -1,58 +0,0 @@ -package io.weaviate.client6.v1.api.collections.pagination; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; - -import io.weaviate.client6.v1.api.collections.WeaviateObject; -import io.weaviate.client6.v1.api.collections.query.QueryMetadata; - -public final class AsyncResultSet implements Iterable> { - - private final int pageSize; - private final BiFunction>>> fetch; - - private final String cursor; - private List> currentPage = new ArrayList<>(); - - AsyncResultSet(String cursor, int pageSize, - BiFunction>>> fetch) { - this.cursor = cursor; - this.pageSize = pageSize; - this.fetch = fetch; - } - - AsyncResultSet(String cursor, int pageSize, - BiFunction>>> fetch, - List> currentPage) { - this(cursor, pageSize, fetch); - this.currentPage = Collections.unmodifiableList(currentPage); - } - - List> currentPage() { - return currentPage; - } - - public boolean isEmpty() { - return this.currentPage.isEmpty(); - } - - public CompletableFuture> fetchNextPage() { - return fetch.apply(cursor, pageSize) - .thenApply(nextPage -> { - if (nextPage.isEmpty()) { - return new AsyncResultSet<>(null, pageSize, fetch, nextPage); - } - var last = nextPage.get(nextPage.size() - 1); - return new AsyncResultSet<>(last.metadata().uuid(), pageSize, fetch, nextPage); - }); - } - - @Override - public Iterator> iterator() { - return currentPage.iterator(); - } -} diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java index 5c3c0fdf7..6d474c16a 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/CursorSpliterator.java @@ -10,24 +10,24 @@ import io.weaviate.client6.v1.api.collections.WeaviateObject; import io.weaviate.client6.v1.api.collections.query.QueryMetadata; -public class CursorSpliterator implements Spliterator> { +public class CursorSpliterator implements Spliterator> { private final int pageSize; - private final BiFunction>> fetch; + private final BiFunction>> fetch; // Spliterators do not promise thread-safety, so there's no mechanism // to protect access to its internal state. private String cursor; - private Iterator> currentPage = Collections.emptyIterator(); + private Iterator> currentPage = Collections.emptyIterator(); public CursorSpliterator(String cursor, int pageSize, - BiFunction>> fetch) { + BiFunction>> fetch) { this.cursor = cursor; this.pageSize = pageSize; this.fetch = fetch; } @Override - public boolean tryAdvance(Consumer> action) { + public boolean tryAdvance(Consumer> action) { // Happy path: there are remaining objects in the current page. if (currentPage.hasNext()) { action.accept(currentPage.next()); @@ -45,7 +45,7 @@ public boolean tryAdvance(Consumer> trySplit() { + public Spliterator> trySplit() { // Do not support splitting just now; return null; } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java index 3a5ab884c..213448e34 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/Paginator.java @@ -16,25 +16,25 @@ import io.weaviate.client6.v1.api.collections.query.WeaviateQueryClient; import io.weaviate.client6.v1.internal.ObjectBuilder; -public class Paginator implements Iterable> { +public class Paginator implements Iterable> { static final int DEFAULT_PAGE_SIZE = 100; - private final WeaviateQueryClient query; + private final WeaviateQueryClient query; private final Function> queryOptions; private final int pageSize; private final String cursor; @Override - public Iterator> iterator() { + public Iterator> iterator() { return Spliterators.iterator(spliterator()); } - public Stream> stream() { + public Stream> stream() { return StreamSupport.stream(spliterator(), false); } - public Spliterator> spliterator() { - return new CursorSpliterator(cursor, pageSize, + public Spliterator> spliterator() { + return new CursorSpliterator(cursor, pageSize, (after, limit) -> { var fn = ObjectBuilder.partial(queryOptions, q -> q.after(after).limit(limit)); return query.fetchObjects(fn).objects(); @@ -50,7 +50,7 @@ public static Paginator of(WeaviateQueryClient query, return fn.apply(new Builder<>(query)).build(); } - Paginator(Builder builder) { + Paginator(Builder builder) { this.query = builder.query; this.queryOptions = builder.queryOptions; this.cursor = builder.cursor; From 4f33411e63a068dc4c897483a891ccc8ef7be3a6 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 4 Jul 2025 18:31:35 +0200 Subject: [PATCH 13/13] fix: add missing file after renaming --- .../api/collections/pagination/AsyncPage.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPage.java diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPage.java b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPage.java new file mode 100644 index 000000000..54f45f945 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/collections/pagination/AsyncPage.java @@ -0,0 +1,58 @@ +package io.weaviate.client6.v1.api.collections.pagination; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; + +import io.weaviate.client6.v1.api.collections.WeaviateObject; +import io.weaviate.client6.v1.api.collections.query.QueryMetadata; + +public final class AsyncPage implements Iterable> { + + private final int pageSize; + private final BiFunction>>> fetch; + + private final String cursor; + private List> currentPage = new ArrayList<>(); + + AsyncPage(String cursor, int pageSize, + BiFunction>>> fetch) { + this.cursor = cursor; + this.pageSize = pageSize; + this.fetch = fetch; + } + + AsyncPage(String cursor, int pageSize, + BiFunction>>> fetch, + List> currentPage) { + this(cursor, pageSize, fetch); + this.currentPage = Collections.unmodifiableList(currentPage); + } + + List> items() { + return currentPage; + } + + public boolean isEmpty() { + return this.currentPage.isEmpty(); + } + + public CompletableFuture> fetchNextPage() { + return fetch.apply(cursor, pageSize) + .thenApply(nextPage -> { + if (nextPage.isEmpty()) { + return new AsyncPage<>(null, pageSize, fetch, nextPage); + } + var last = nextPage.get(nextPage.size() - 1); + return new AsyncPage<>(last.metadata().uuid(), pageSize, fetch, nextPage); + }); + } + + @Override + public Iterator> iterator() { + return currentPage.iterator(); + } +}