Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 160 additions & 0 deletions src/it/java/io/weaviate/integration/PaginationITest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.weaviate.integration;

import static org.junit.Assume.assumeTrue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;

import io.weaviate.ConcurrentTest;
import io.weaviate.client6.v1.api.WeaviateClient;
import io.weaviate.client6.v1.api.collections.Property;
import io.weaviate.client6.v1.api.collections.WeaviateMetadata;
import io.weaviate.client6.v1.api.collections.WeaviateObject;
import io.weaviate.containers.Container;

public class PaginationITest extends ConcurrentTest {
private static WeaviateClient client = Container.WEAVIATE.getClient();

@Test
public void testIterateAll() throws IOException {
// Arrange
var nsThings = ns("Things");
var count = 150;

client.collections.create(nsThings);
var things = client.collections.use(nsThings);

var inserted = new ArrayList<String>();
for (var i = 0; i < count; i++) {
var object = things.data.insert(Collections.emptyMap());
inserted.add(object.metadata().uuid());
}
assumeTrue("all objects were inserted", inserted.size() == count);

var allThings = things.paginate();

// Act: stream
var gotStream = allThings.stream()
.map(WeaviateObject::metadata).map(WeaviateMetadata::uuid).toList();

// Assert
Assertions.assertThat(gotStream)
.as("stream fetched all objects")
.hasSize(inserted.size())
.containsAll(inserted);

// Act: for-loop
var gotLoop = new ArrayList<String>();
for (var thing : allThings) {
gotLoop.add(thing.metadata().uuid());
}

// Assert
Assertions.assertThat(gotLoop)
.as("list fetched all objects")
.hasSize(inserted.size())
.containsAll(inserted);

Assertions.assertThat(gotStream)
.as("stream and list return consistent order")
.containsExactlyElementsOf(gotLoop);
}

@Test
public void testResumePagination() throws IOException {
// Arrange
var nsThings = ns("Things");
var count = 10;

client.collections.create(nsThings);

var things = client.collections.use(nsThings);
var inserted = new ArrayList<String>();
for (var i = 0; i < count; i++) {
var object = things.data.insert(Collections.emptyMap());
inserted.add(object.metadata().uuid());
}

// Iterate over first 5 objects
String lastId = things.paginate(p -> p.pageSize(5)).stream()
.limit(5).map(thing -> thing.metadata().uuid())
.reduce((prev, next) -> next).get();

// Act
var remaining = things.paginate(p -> p.resumeFrom(lastId)).stream().count();

// Assert
Assertions.assertThat(remaining).isEqualTo(5);
}

@Test
public void testWithQueryOptions() throws IOException {
// Arrange
var nsThings = ns("Things");
var count = 10;

client.collections.create(nsThings,
c -> c.properties(
Property.text("fetch_me"),
Property.integer("dont_fetch")));

var things = client.collections.use(nsThings);
var inserted = new ArrayList<String>();
for (var i = 0; i < count; i++) {
var object = things.data.insert(Collections.emptyMap());
inserted.add(object.metadata().uuid());
}

// Act / Assert
var withSomeProperties = things.paginate(p -> p.returnProperties("fetch_me"));
for (var thing : withSomeProperties) {
Assertions.assertThat(thing.properties())
.as("uuid=" + thing.metadata().uuid())
.doesNotContainKey("dont_fetch");
}
}

@Test
public void testAsyncPaginator() throws IOException, InterruptedException, ExecutionException {
// Arrange
var nsThings = ns("Things");
var count = 10;

client.collections.create(nsThings);

try (final var async = client.async()) {
var things = async.collections.use(nsThings);

var futures = new CompletableFuture<?>[count];
var inserted = new ArrayList<String>();
for (var i = 0; i < count; i++) {
futures[i] = things.data.insert(Collections.emptyMap())
.thenAccept(object -> inserted.add(object.metadata().uuid()));
}
CompletableFuture.allOf(futures).get();

// Act
var objectCount = new AtomicInteger();
var countAll = things.paginate(p -> p.pageSize(5).prefetch(true))
.forEach(__ -> objectCount.getAndIncrement());

// Assert
if (!countAll.isDone()) {
Assume.assumeTrue("iteration not completed", objectCount.get() < count);
}

countAll.get(); // Wait for it to complete.
Assertions.assertThat(objectCount.get())
.as("object count after iteration completed")
.isEqualTo(count);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package io.weaviate.client6.v1.api.collections;

import java.util.function.Function;

import io.weaviate.client6.v1.api.collections.aggregate.WeaviateAggregateClient;
import io.weaviate.client6.v1.api.collections.config.WeaviateConfigClient;
import io.weaviate.client6.v1.api.collections.data.WeaviateDataClient;
import io.weaviate.client6.v1.api.collections.pagination.Paginator;
import io.weaviate.client6.v1.api.collections.query.WeaviateQueryClient;
import io.weaviate.client6.v1.internal.ObjectBuilder;
import io.weaviate.client6.v1.internal.grpc.GrpcTransport;
import io.weaviate.client6.v1.internal.orm.CollectionDescriptor;
import io.weaviate.client6.v1.internal.rest.RestTransport;
Expand All @@ -24,4 +28,12 @@ public CollectionHandle(
this.data = new WeaviateDataClient<>(collectionDescriptor, restTransport, this.query);
this.aggregate = new WeaviateAggregateClient(collectionDescriptor, grpcTransport);
}

public Paginator<T> paginate() {
return Paginator.of(this.query);
}

public Paginator<T> paginate(Function<Paginator.Builder<T>, ObjectBuilder<Paginator<T>>> fn) {
return Paginator.of(this.query, fn);
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,40 @@
package io.weaviate.client6.v1.api.collections;

import java.util.function.Function;

import io.weaviate.client6.v1.api.collections.aggregate.WeaviateAggregateClientAsync;
import io.weaviate.client6.v1.api.collections.config.WeaviateConfigClientAsync;
import io.weaviate.client6.v1.api.collections.data.WeaviateDataClientAsync;
import io.weaviate.client6.v1.api.collections.pagination.AsyncPaginator;
import io.weaviate.client6.v1.api.collections.query.WeaviateQueryClientAsync;
import io.weaviate.client6.v1.internal.ObjectBuilder;
import io.weaviate.client6.v1.internal.grpc.GrpcTransport;
import io.weaviate.client6.v1.internal.orm.CollectionDescriptor;
import io.weaviate.client6.v1.internal.rest.RestTransport;

public class CollectionHandleAsync<T> {
public class CollectionHandleAsync<PropertiesT> {
public final WeaviateConfigClientAsync config;
public final WeaviateDataClientAsync<T> data;
public final WeaviateQueryClientAsync<T> query;
public final WeaviateDataClientAsync<PropertiesT> data;
public final WeaviateQueryClientAsync<PropertiesT> query;
public final WeaviateAggregateClientAsync aggregate;

public CollectionHandleAsync(
RestTransport restTransport,
GrpcTransport grpcTransport,
CollectionDescriptor<T> collectionDescriptor) {
CollectionDescriptor<PropertiesT> collectionDescriptor) {

this.config = new WeaviateConfigClientAsync(collectionDescriptor, restTransport, grpcTransport);
this.query = new WeaviateQueryClientAsync<>(collectionDescriptor, grpcTransport);
this.data = new WeaviateDataClientAsync<>(collectionDescriptor, restTransport, this.query);
this.aggregate = new WeaviateAggregateClientAsync(collectionDescriptor, grpcTransport);
}

public AsyncPaginator<PropertiesT> paginate() {
return AsyncPaginator.of(this.query);
}

public AsyncPaginator<PropertiesT> paginate(
Function<AsyncPaginator.Builder<PropertiesT>, ObjectBuilder<AsyncPaginator<PropertiesT>>> fn) {
return AsyncPaginator.of(this.query, fn);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.weaviate.client6.v1.api.collections.pagination;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

import io.weaviate.client6.v1.api.collections.WeaviateObject;
import io.weaviate.client6.v1.api.collections.query.QueryMetadata;

public final class AsyncPage<PropertiesT> implements Iterable<WeaviateObject<PropertiesT, Object, QueryMetadata>> {

private final int pageSize;
private final BiFunction<String, Integer, CompletableFuture<List<WeaviateObject<PropertiesT, Object, QueryMetadata>>>> fetch;

private final String cursor;
private List<WeaviateObject<PropertiesT, Object, QueryMetadata>> currentPage = new ArrayList<>();

AsyncPage(String cursor, int pageSize,
BiFunction<String, Integer, CompletableFuture<List<WeaviateObject<PropertiesT, Object, QueryMetadata>>>> fetch) {
this.cursor = cursor;
this.pageSize = pageSize;
this.fetch = fetch;
}

AsyncPage(String cursor, int pageSize,
BiFunction<String, Integer, CompletableFuture<List<WeaviateObject<PropertiesT, Object, QueryMetadata>>>> fetch,
List<WeaviateObject<PropertiesT, Object, QueryMetadata>> currentPage) {
this(cursor, pageSize, fetch);
this.currentPage = Collections.unmodifiableList(currentPage);
}

List<WeaviateObject<PropertiesT, Object, QueryMetadata>> items() {
return currentPage;
}

public boolean isEmpty() {
return this.currentPage.isEmpty();
}

public CompletableFuture<AsyncPage<PropertiesT>> fetchNextPage() {
return fetch.apply(cursor, pageSize)
.thenApply(nextPage -> {
if (nextPage.isEmpty()) {
return new AsyncPage<>(null, pageSize, fetch, nextPage);
}
var last = nextPage.get(nextPage.size() - 1);
return new AsyncPage<>(last.metadata().uuid(), pageSize, fetch, nextPage);
});
}

@Override
public Iterator<WeaviateObject<PropertiesT, Object, QueryMetadata>> iterator() {
return currentPage.iterator();
}
}
Loading