From 0b2b0fdc1b33f540afe897e6b3f1e467bdbd7419 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 29 Sep 2025 16:15:08 +0200 Subject: [PATCH 01/11] feat: implement first draft of the sync backup API --- .../client6/v1/api/backup/Backup.java | 14 ++ .../client6/v1/api/backup/BackupStatus.java | 16 ++ .../v1/api/backup/CancelBackupRequest.java | 14 ++ .../v1/api/backup/CompressionLevel.java | 12 ++ .../v1/api/backup/CreateBackupRequest.java | 124 +++++++++++++ .../v1/api/backup/CreateBackupResponse.java | 35 ++++ .../v1/api/backup/GetCreateStatusRequest.java | 15 ++ .../api/backup/GetRestoreStatusRequest.java | 15 ++ .../v1/api/backup/ListBackupsRequest.java | 21 +++ .../v1/api/backup/RbacRestoreOption.java | 10 + .../v1/api/backup/RestoreBackupRequest.java | 125 +++++++++++++ .../v1/api/backup/RestoreBackupResponse.java | 35 ++++ .../v1/api/backup/WeaviateBackupClient.java | 173 ++++++++++++++++++ .../v1/internal/rest/OptionalEndpoint.java | 8 + 14 files changed, 617 insertions(+) create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/Backup.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/CancelBackupRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/ListBackupsRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java new file mode 100644 index 000000000..534369ef4 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java @@ -0,0 +1,14 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.List; + +import com.google.gson.annotations.SerializedName; + +public record Backup( + @SerializedName("id") String id, + @SerializedName("path") String path, + @SerializedName("backend") String backend, + @SerializedName("classes") List includesCollections, + @SerializedName("status") BackupStatus status, + @SerializedName("error") String error) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java b/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java new file mode 100644 index 000000000..13487918b --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java @@ -0,0 +1,16 @@ +package io.weaviate.client6.v1.api.backup; + +import com.google.gson.annotations.SerializedName; + +public enum BackupStatus { + @SerializedName("STARTED") + STARTED, + @SerializedName("TRANSFERRING") + TRANSFERRING, + @SerializedName("SUCCESS") + SUCCESS, + @SerializedName("FAILED") + FAILED, + @SerializedName("CANCELED") + CANCELED; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CancelBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/CancelBackupRequest.java new file mode 100644 index 000000000..9c311f9c9 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CancelBackupRequest.java @@ -0,0 +1,14 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.Collections; + +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record CancelBackupRequest(String backupId, String backend) { + + public static Endpoint _ENDPOINT = SimpleEndpoint.sideEffect( + request -> "DELETE", + request -> "/backups/" + request.backend + "/" + request.backupId, + request -> Collections.emptyMap()); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java b/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java new file mode 100644 index 000000000..7f8a75841 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java @@ -0,0 +1,12 @@ +package io.weaviate.client6.v1.api.backup; + +import com.google.gson.annotations.SerializedName; + +public enum CompressionLevel { + @SerializedName("DefaultCompression") + DEFAULT, + @SerializedName("BestSpeed") + BEST_SPEED, + @SerializedName("BestCompression") + BEST_COMPRESSION; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java new file mode 100644 index 000000000..99f00af3e --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java @@ -0,0 +1,124 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record CreateBackupRequest(BackupCreate body, String backend) { + + public static Endpoint _ENDPOINT = new SimpleEndpoint<>( + request -> "POST", + request -> "/backups/" + request.backend, + request -> Collections.emptyMap(), + request -> JSON.serialize(request.body), + (statusCode, response) -> new CreateBackupResponse(JSON.deserialize(response, Backup.class))); + + public static record BackupCreate( + @SerializedName("id") String id, + @SerializedName("include") List includeCollections, + @SerializedName("exclude") List excludeCollections, + @SerializedName("config") Config config) { + + private static record Config( + @SerializedName("CPUPercentage") Integer cpuPercentage, + @SerializedName("ChunkSize") Integer chunkSize, + @SerializedName("CompressionLevel") CompressionLevel compressionLevel, + @SerializedName("Bucket") String bucket, + @SerializedName("Path") String path) { + } + + public static BackupCreate of(String backupId) { + return of(backupId, ObjectBuilder.identity()); + } + + public static BackupCreate of(String backupId, Function> fn) { + return fn.apply(new Builder(backupId)).build(); + } + + public BackupCreate(Builder builder) { + this( + builder.backupId, + builder.includeCollections, + builder.excludeCollections, + new Config( + builder.cpuPercentage, + builder.chunkSize, + builder.compressionLevel, + builder.bucket, + builder.path)); + } + + public static class Builder implements ObjectBuilder { + private final String backupId; + + private Integer cpuPercentage; + private Integer chunkSize; + private CompressionLevel compressionLevel; + private String bucket; + private String path; + private final List includeCollections = new ArrayList<>(); + private final List excludeCollections = new ArrayList<>(); + + public Builder(String backupId) { + this.backupId = backupId; + } + + public Builder includeCollections(String... includeCollections) { + return includeCollections(Arrays.asList(includeCollections)); + } + + public Builder includeCollections(List includeCollections) { + this.includeCollections.addAll(includeCollections); + return this; + } + + public Builder excludeCollections(String... excludeCollections) { + return excludeCollections(Arrays.asList(excludeCollections)); + } + + public Builder excludeCollections(List excludeCollections) { + this.excludeCollections.addAll(excludeCollections); + return this; + } + + public Builder cpuPercentage(Integer cpuPercentage) { + this.cpuPercentage = cpuPercentage; + return this; + } + + public Builder chunkSize(Integer chunkSize) { + this.chunkSize = chunkSize; + return this; + } + + public Builder compressionLevel(CompressionLevel compressionLevel) { + this.compressionLevel = compressionLevel; + return this; + } + + public Builder bucket(String bucket) { + this.bucket = bucket; + return this; + } + + public Builder path(String path) { + this.path = path; + return this; + } + + @Override + public CreateBackupRequest build() { + new BackupCreate(this); + } + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java new file mode 100644 index 000000000..50c99d4a9 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java @@ -0,0 +1,35 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.List; + +public final class CreateBackupResponse { + private final Backup backup; + + CreateBackupResponse(Backup backup) { + this.backup = backup; + } + + public String id() { + return backup.id(); + } + + public String path() { + return backup.path(); + } + + public String backend() { + return backup.backend(); + } + + public List includesCollections() { + return backup.includesCollections(); + } + + public BackupStatus status() { + return backup.status(); + } + + public String error() { + return backup.error(); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java new file mode 100644 index 000000000..d0b487ee9 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java @@ -0,0 +1,15 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.Collections; +import java.util.Optional; + +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.OptionalEndpoint; + +public record GetCreateStatusRequest(String backupId, String backend) { + public static final Endpoint> _ENDPOINT = OptionalEndpoint.noBodyOptional( + request -> "GET", + request -> "backups/" + request.backend + "/" + request.backupId, + request -> Collections.emptyMap(), + Backup.class); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java new file mode 100644 index 000000000..8f4f261ba --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java @@ -0,0 +1,15 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.Collections; +import java.util.Optional; + +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.OptionalEndpoint; + +public record GetRestoreStatusRequest(String backupId, String backend) { + public static final Endpoint> _ENDPOINT = OptionalEndpoint.noBodyOptional( + request -> "GET", + request -> "backups/" + request.backend + "/" + request.backupId + "/restore", + request -> Collections.emptyMap(), + Backup.class); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/ListBackupsRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/ListBackupsRequest.java new file mode 100644 index 000000000..ea3e497a2 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/ListBackupsRequest.java @@ -0,0 +1,21 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.Collections; +import java.util.List; + +import com.google.gson.reflect.TypeToken; + +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record ListBackupsRequest(String backend) { + + @SuppressWarnings("unchecked") + public static Endpoint> _ENDPOINT = SimpleEndpoint.noBody( + request -> "GET", + request -> "/backups/" + request.backend, + request -> Collections.emptyMap(), + (statusCode, response) -> (List) JSON.deserialize( + response, TypeToken.getParameterized(List.class, Backup.class))); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java b/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java new file mode 100644 index 000000000..b4a45658c --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java @@ -0,0 +1,10 @@ +package io.weaviate.client6.v1.api.backup; + +import com.google.gson.annotations.SerializedName; + +public enum RbacRestoreOption { + @SerializedName("noRestore") + NONE, + @SerializedName("all") + ALL; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java new file mode 100644 index 000000000..22117732a --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java @@ -0,0 +1,125 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.function.Function; + +import com.google.gson.annotations.SerializedName; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; +import io.weaviate.client6.v1.internal.rest.SimpleEndpoint; + +public record RestoreBackupRequest(String backupId, String backend, BackupRestore body) { + + public static Endpoint _ENDPOINT = new SimpleEndpoint<>( + request -> "POST", + request -> "/backups/" + request.backend + "/" + request.backupId + "/restore", + request -> Collections.emptyMap(), + request -> JSON.serialize(request.body), + (statusCode, response) -> new RestoreBackupResponse(JSON.deserialize(response, Backup.class))); + + public record BackupRestore( + @SerializedName("include") List includeCollections, + @SerializedName("exclude") List excludeCollections, + @SerializedName("overwriteAlias") Boolean overwriteAlias, + @SerializedName("config") Config config) { + + public record Config( + @SerializedName("CPUPercentage") Integer cpuPercentage, + @SerializedName("Bucket") String bucket, + @SerializedName("Path") String path, + @SerializedName("usersOptions") RbacRestoreOption restoreUsers, + @SerializedName("rolesOptions") RbacRestoreOption restoreRoles) { + } + + public static BackupRestore of() { + return of(ObjectBuilder.identity()); + } + + public static BackupRestore of(Function> fn) { + return fn.apply(new Builder()).build(); + } + + public BackupRestore(Builder builder) { + this( + builder.includeCollections, + builder.excludeCollections, + builder.overwriteAlias, + new Config( + builder.cpuPercentage, + builder.bucket, + builder.path, + builder.restoreUsers, + builder.restoreRoles)); + + } + + public static class Builder implements ObjectBuilder { + private Integer cpuPercentage; + private String bucket; + private String path; + private Boolean overwriteAlias; + private RbacRestoreOption restoreUsers; + private RbacRestoreOption restoreRoles; + private final List includeCollections = new ArrayList<>(); + private final List excludeCollections = new ArrayList<>(); + + public Builder includeCollections(String... includeCollections) { + return includeCollections(Arrays.asList(includeCollections)); + } + + public Builder includeCollections(List includeCollections) { + this.includeCollections.addAll(includeCollections); + return this; + } + + public Builder excludeCollections(String... excludeCollections) { + return excludeCollections(Arrays.asList(excludeCollections)); + } + + public Builder excludeCollections(List excludeCollections) { + this.excludeCollections.addAll(excludeCollections); + return this; + } + + public Builder cpuPercentage(int cpuPercentage) { + this.cpuPercentage = cpuPercentage; + return this; + } + + public Builder bucket(String bucket) { + this.bucket = bucket; + return this; + } + + public Builder path(String path) { + this.path = path; + return this; + } + + public Builder overwriteAlias(boolean overwriteAlias) { + this.overwriteAlias = overwriteAlias; + return this; + } + + public Builder restoreUsers(RbacRestoreOption restoreUsers) { + this.restoreUsers = restoreUsers; + return this; + } + + public Builder restoreRoles(RbacRestoreOption restoreRoles) { + this.restoreRoles = restoreRoles; + return this; + } + + @Override + public BackupRestore build() { + return new BackupRestore(this); + } + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java new file mode 100644 index 000000000..8cfb30ca4 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java @@ -0,0 +1,35 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.List; + +public final class RestoreBackupResponse { + private final Backup backup; + + RestoreBackupResponse(Backup backup) { + this.backup = backup; + } + + public String id() { + return backup.id(); + } + + public String path() { + return backup.path(); + } + + public String backend() { + return backup.backend(); + } + + public List includesCollections() { + return backup.includesCollections(); + } + + public BackupStatus status() { + return backup.status(); + } + + public String error() { + return backup.error(); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java new file mode 100644 index 000000000..6d8ce9c58 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java @@ -0,0 +1,173 @@ +package io.weaviate.client6.v1.api.backup; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; + +import io.weaviate.client6.v1.api.WeaviateApiException; +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.rest.RestTransport; + +public class WeaviateBackupClient { + private final RestTransport restTransport; + + public WeaviateBackupClient(RestTransport restTransport) { + this.restTransport = restTransport; + } + + /** + * Start a new backup process. + * + * @param backupId Backup ID. Must be unique for the backend. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public void create(String backupId, String backend) throws IOException { + create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId), backend)); + } + + /** + * Start a new backup process. + * + * @param backupId Backup ID. Must be unique for the backend. + * @param backend Backup storage backend. + * @param fn Lambda expression for optional parameters. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public void create(String backupId, String backend, + Function> fn) + throws IOException { + create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId, fn), backend)); + } + + /** + * Start a new backup process. + * + * @param request Create backup request. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public CreateBackupResponse create(CreateBackupRequest request) throws IOException { + return this.restTransport.performRequest(request, CreateBackupRequest._ENDPOINT); + } + + /** + * Get backup create status. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Optional getCreateStatus(String backupId, String backend) throws IOException { + return this.restTransport.performRequest(new GetCreateStatusRequest(backupId, backend), + GetCreateStatusRequest._ENDPOINT); + } + + /** + * Start backup restore process. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public RestoreBackupResponse restore(String backupId, String backend) throws IOException { + return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of())); + } + + /** + * Start backup restore process. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @param fn Lambda expression for optional parameters. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public RestoreBackupResponse restore(String backupId, String backend, + Function> fn) + throws IOException { + return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of(fn))); + } + + /** + * Start backup restore process. + * + * @param request Restore backup request. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public RestoreBackupResponse restore(RestoreBackupRequest request) throws IOException { + return this.restTransport.performRequest(request, RestoreBackupRequest._ENDPOINT); + } + + /** + * Get backup restore status. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public Optional getRestoreStatus(String backupId, String backend) throws IOException { + return this.restTransport.performRequest(new GetRestoreStatusRequest(backupId, backend), + GetRestoreStatusRequest._ENDPOINT); + } + + /** + * List backups in the backend storage. + * + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public List list(String backend) throws IOException { + return this.restTransport.performRequest(new ListBackupsRequest(backend), ListBackupsRequest._ENDPOINT); + } + + /** + * Cancel in-progress backup. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @throws WeaviateApiException in case the server returned with an + * error status code. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public void cancel(String backupId, String backend) throws IOException { + this.restTransport.performRequest(new CancelBackupRequest(backupId, backend), CancelBackupRequest._ENDPOINT); + } +} diff --git a/src/main/java/io/weaviate/client6/v1/internal/rest/OptionalEndpoint.java b/src/main/java/io/weaviate/client6/v1/internal/rest/OptionalEndpoint.java index c3863bf97..2c960236f 100644 --- a/src/main/java/io/weaviate/client6/v1/internal/rest/OptionalEndpoint.java +++ b/src/main/java/io/weaviate/client6/v1/internal/rest/OptionalEndpoint.java @@ -15,6 +15,14 @@ public static OptionalEndpoint noBody return new OptionalEndpoint<>(method, requestUrl, queryParameters, nullBody(), deserializeResponse); } + public static OptionalEndpoint noBodyOptional( + Function method, + Function requestUrl, + Function> queryParameters, + Class cls) { + return new OptionalEndpoint<>(method, requestUrl, queryParameters, nullBody(), deserializeClass(cls)); + } + public OptionalEndpoint( Function method, Function requestUrl, From 534000e735eb4338166c45b5c41495726dc69679 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 29 Sep 2025 19:27:31 +0200 Subject: [PATCH 02/11] test: add full lifecycle integration test for backups Extended CreateBackupResponse class to include a utility .cancel() method. --- .../java/io/weaviate/containers/Weaviate.java | 6 + .../io/weaviate/integration/AliasITest.java | 2 +- .../io/weaviate/integration/BackupITest.java | 135 ++++++++++++++++++ .../io/weaviate/integration/DataITest.java | 2 +- .../io/weaviate/integration/ORMITest.java | 2 +- .../weaviate/integration/PaginationITest.java | 2 +- .../io/weaviate/integration/TenantsITest.java | 2 +- .../client6/v1/api/WeaviateClient.java | 5 + .../v1/api/backup/CreateBackupRequest.java | 12 +- .../v1/api/backup/CreateBackupResponse.java | 27 +++- .../v1/api/backup/GetCreateStatusRequest.java | 2 +- .../api/backup/GetRestoreStatusRequest.java | 2 +- .../v1/api/backup/RestoreBackupRequest.java | 4 +- .../v1/api/backup/RestoreBackupResponse.java | 15 +- .../client6/v1/api/backup/Waiter.java | 68 +++++++++ .../v1/api/backup/WeaviateBackupClient.java | 14 +- 16 files changed, 277 insertions(+), 23 deletions(-) create mode 100644 src/it/java/io/weaviate/integration/BackupITest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java diff --git a/src/it/java/io/weaviate/containers/Weaviate.java b/src/it/java/io/weaviate/containers/Weaviate.java index 88cd23ee0..7e2ecd19e 100644 --- a/src/it/java/io/weaviate/containers/Weaviate.java +++ b/src/it/java/io/weaviate/containers/Weaviate.java @@ -137,6 +137,12 @@ public Builder withOffloadS3(String accessKey, String secretKey) { return this; } + public Builder withFilesystemBackup(String fsPath) { + addModules("backup-filesystem"); + environment.put("BACKUP_FILESYSTEM_PATH", fsPath); + return this; + } + public Builder enableTelemetry(boolean enable) { environment.put("DISABLE_TELEMETRY", Boolean.toString(!enable)); return this; diff --git a/src/it/java/io/weaviate/integration/AliasITest.java b/src/it/java/io/weaviate/integration/AliasITest.java index f5b1dd47b..0fc40fc6b 100644 --- a/src/it/java/io/weaviate/integration/AliasITest.java +++ b/src/it/java/io/weaviate/integration/AliasITest.java @@ -12,7 +12,7 @@ import io.weaviate.containers.Container; public class AliasITest extends ConcurrentTest { - private static WeaviateClient client = Container.WEAVIATE.getClient(); + private static final WeaviateClient client = Container.WEAVIATE.getClient(); @Test public void test_aliasLifecycle() throws IOException { diff --git a/src/it/java/io/weaviate/integration/BackupITest.java b/src/it/java/io/weaviate/integration/BackupITest.java new file mode 100644 index 000000000..fc1852ab2 --- /dev/null +++ b/src/it/java/io/weaviate/integration/BackupITest.java @@ -0,0 +1,135 @@ +package io.weaviate.integration; + +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.IntStream; + +import org.assertj.core.api.Assertions; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.Test; + +import io.weaviate.ConcurrentTest; +import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.backup.Backup; +import io.weaviate.client6.v1.api.backup.BackupStatus; +import io.weaviate.client6.v1.api.backup.CompressionLevel; +import io.weaviate.containers.Weaviate; + +public class BackupITest extends ConcurrentTest { + private static final WeaviateClient client = Weaviate.custom() + .withFilesystemBackup("/tmp/backups").build() + .getClient(); + + @Test + public void test_lifecycle() throws IOException, TimeoutException { + // Arrange + String nsA = ns("A"), nsB = ns("B"), nsC = ns("C"), nsBig = ns("Big"); + String backup_1 = ns("backup_1").toLowerCase(); + String backend = "filesystem"; + + // Start writing data in the background so it's ready + // by the time we get to backup #3. + var spam = spamData(nsBig); + + client.collections.create(nsA); + client.collections.create(nsB); + client.collections.create(nsC); + + // Insert some data to check restore later + var collectionA = client.collections.use(nsA); + collectionA.data.insert(Map.of()); + + // Act: start backup + var started = client.backup.create(backup_1, backend, + backup -> backup + .excludeCollections(nsC, nsBig) + .compressionLevel(CompressionLevel.BEST_SPEED)); + + // Assert + Assertions.assertThat(started.backup()) + .as("created backup operation") + .returns(backup_1, Backup::id) + .returns(backend, Backup::backend) + .returns(BackupStatus.STARTED, Backup::status) + .returns(null, Backup::error) + .extracting(Backup::includesCollections, InstanceOfAssertFactories.list(String.class)) + .containsOnly(nsA, nsB); + + // Act: await backup competion + var completed = started.waitForCompletion(); + + // Assert + Assertions.assertThat(completed) + .as("await backup completion") + .returns(backup_1, Backup::id) + .returns(backend, Backup::backend) + .returns(BackupStatus.SUCCESS, Backup::status) + .returns(null, Backup::error); + + // Act: create another backup + String backup_2 = ns("backup_2").toLowerCase(); + client.backup.create(backup_2, backend).waitForCompletion(); + + // Assert: check the second backup is created successfully + var status_2 = client.backup.getCreateStatus(backup_2, backend); + Assertions.assertThat(status_2).as("backup #2").get() + .returns(BackupStatus.SUCCESS, Backup::status); + + // Act: create and cancel + // Try to throttle this backup by creating a lot of objects, + // limiting CPU resources and requiring high compression ratio. + // This is to avoid flaky tests and make sure we can cancel + // the backup before it completes successfully. + String backup_3 = ns("backup_3").toLowerCase(); + spam.join(); + var cancelMe = client.backup.create(backup_3, backend, + backup -> backup + .includeCollections(nsA, nsB, nsC, nsBig) + .cpuPercentage(1) + .compressionLevel(CompressionLevel.BEST_COMPRESSION)); + cancelMe.cancel(); + cancelMe.waitForStatus(BackupStatus.CANCELED); + + // Assert: check the backup is cancelled + var status_3 = client.backup.getCreateStatus(backup_3, backend); + Assertions.assertThat(status_3).as("backup #3").get() + .returns(BackupStatus.CANCELED, Backup::status); + + // Assert: all 3 backups are present + var all = client.backup.list(backend); + Assertions.assertThat(all).as("all backups") + .hasSize(3) + .extracting(Backup::id) + .containsOnly(backup_1, backup_2, backup_3); + + // Act: delete data and restore backup #1 + client.collections.delete(nsA); + client.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA)) + .waitForCompletion(); + + // Assert: object inserted in the beginning of the test is present + var restore_1 = client.backup.getRestoreStatus(backup_1, backend); + Assertions.assertThat(restore_1).as("restore backup #1").get() + .returns(BackupStatus.SUCCESS, Backup::status); + Assertions.assertThat(collectionA.size()).as("after restore backup #1").isEqualTo(1); + } + + private CompletableFuture spamData(String collectionName) { + return CompletableFuture.supplyAsync(() -> { + var spam = client.collections.use(collectionName); + for (int i = 0; i < 10_000; i++) { + var uuids = IntStream.range(0, 10).mapToObj(j -> UUID.randomUUID()).toArray(); + try { + spam.data.insert(Map.of("uuids", uuids)); + } catch (IOException e) { + throw new CompletionException(e); + } + } + return null; + }); + } +} diff --git a/src/it/java/io/weaviate/integration/DataITest.java b/src/it/java/io/weaviate/integration/DataITest.java index c679f7a7a..7eb176aa7 100644 --- a/src/it/java/io/weaviate/integration/DataITest.java +++ b/src/it/java/io/weaviate/integration/DataITest.java @@ -29,7 +29,7 @@ import io.weaviate.containers.Container; public class DataITest extends ConcurrentTest { - private static WeaviateClient client = Container.WEAVIATE.getClient(); + private static final WeaviateClient client = Container.WEAVIATE.getClient(); private static final String COLLECTION = unique("Artists"); private static final String VECTOR_INDEX = "bring_your_own"; diff --git a/src/it/java/io/weaviate/integration/ORMITest.java b/src/it/java/io/weaviate/integration/ORMITest.java index 16b5b3717..d708a25f5 100644 --- a/src/it/java/io/weaviate/integration/ORMITest.java +++ b/src/it/java/io/weaviate/integration/ORMITest.java @@ -23,7 +23,7 @@ import io.weaviate.containers.Container; public class ORMITest extends ConcurrentTest { - private static WeaviateClient client = Container.WEAVIATE.getClient(); + private static final WeaviateClient client = Container.WEAVIATE.getClient(); @Collection("ORMITestThings") static record Thing( diff --git a/src/it/java/io/weaviate/integration/PaginationITest.java b/src/it/java/io/weaviate/integration/PaginationITest.java index 3cb3d8784..b4b02d291 100644 --- a/src/it/java/io/weaviate/integration/PaginationITest.java +++ b/src/it/java/io/weaviate/integration/PaginationITest.java @@ -25,7 +25,7 @@ import io.weaviate.containers.Container; public class PaginationITest extends ConcurrentTest { - private static WeaviateClient client = Container.WEAVIATE.getClient(); + private static final WeaviateClient client = Container.WEAVIATE.getClient(); @Test public void testIterateAll() throws IOException { diff --git a/src/it/java/io/weaviate/integration/TenantsITest.java b/src/it/java/io/weaviate/integration/TenantsITest.java index 7c17cf54b..c765ae080 100644 --- a/src/it/java/io/weaviate/integration/TenantsITest.java +++ b/src/it/java/io/weaviate/integration/TenantsITest.java @@ -18,7 +18,7 @@ public class TenantsITest extends ConcurrentTest { .build(), Container.MINIO); - private static WeaviateClient client = compose.getClient(); + private static final WeaviateClient client = compose.getClient(); @Test public void test_tenantLifecycle() throws Exception { diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java index 0101dc122..e667d3e3b 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java @@ -4,6 +4,7 @@ import java.util.function.Function; import io.weaviate.client6.v1.api.alias.WeaviateAliasClient; +import io.weaviate.client6.v1.api.backup.WeaviateBackupClient; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClient; import io.weaviate.client6.v1.internal.ObjectBuilder; import io.weaviate.client6.v1.internal.TokenProvider; @@ -31,6 +32,9 @@ public class WeaviateClient implements AutoCloseable { /** Client for {@code /aliases} endpoints for managing collection aliases. */ public final WeaviateAliasClient alias; + /** Client for {@code /backups} endpoints for managing collection aliases. */ + public final WeaviateBackupClient backup; + public WeaviateClient(Config config) { RestTransportOptions restOpt; GrpcChannelOptions grpcOpt; @@ -81,6 +85,7 @@ public WeaviateClient(Config config) { this.restTransport = _restTransport; this.grpcTransport = new DefaultGrpcTransport(grpcOpt); this.alias = new WeaviateAliasClient(restTransport); + this.backup = new WeaviateBackupClient(restTransport); this.collections = new WeaviateCollectionsClient(restTransport, grpcTransport); this.config = config; } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java index 99f00af3e..bddb3119f 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java @@ -15,12 +15,12 @@ public record CreateBackupRequest(BackupCreate body, String backend) { - public static Endpoint _ENDPOINT = new SimpleEndpoint<>( + public static Endpoint _ENDPOINT = new SimpleEndpoint<>( request -> "POST", request -> "/backups/" + request.backend, request -> Collections.emptyMap(), request -> JSON.serialize(request.body), - (statusCode, response) -> new CreateBackupResponse(JSON.deserialize(response, Backup.class))); + (statusCode, response) -> JSON.deserialize(response, Backup.class)); public static record BackupCreate( @SerializedName("id") String id, @@ -90,12 +90,12 @@ public Builder excludeCollections(List excludeCollections) { return this; } - public Builder cpuPercentage(Integer cpuPercentage) { + public Builder cpuPercentage(int cpuPercentage) { this.cpuPercentage = cpuPercentage; return this; } - public Builder chunkSize(Integer chunkSize) { + public Builder chunkSize(int chunkSize) { this.chunkSize = chunkSize; return this; } @@ -116,8 +116,8 @@ public Builder path(String path) { } @Override - public CreateBackupRequest build() { - new BackupCreate(this); + public BackupCreate build() { + return new BackupCreate(this); } } } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java index 50c99d4a9..d35daf614 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java @@ -1,12 +1,21 @@ package io.weaviate.client6.v1.api.backup; +import java.io.IOException; import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeoutException; public final class CreateBackupResponse { private final Backup backup; + private final WeaviateBackupClient backupClient; - CreateBackupResponse(Backup backup) { + CreateBackupResponse(final Backup backup, final WeaviateBackupClient backupClient) { this.backup = backup; + this.backupClient = backupClient; + } + + public Backup backup() { + return backup; } public String id() { @@ -32,4 +41,20 @@ public BackupStatus status() { public String error() { return backup.error(); } + + public Backup waitForCompletion() throws IOException, TimeoutException { + return waitForStatus(BackupStatus.SUCCESS); + } + + public Backup waitForStatus(BackupStatus status) throws IOException, TimeoutException { + return new Waiter(backup, this::poll).waitForStatus(status); + } + + private Optional poll() throws Exception { + return this.backupClient.getCreateStatus(id(), backend()); + } + + public void cancel() throws IOException { + this.backupClient.cancel(id(), backend()); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java index d0b487ee9..52d23d19f 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/GetCreateStatusRequest.java @@ -9,7 +9,7 @@ public record GetCreateStatusRequest(String backupId, String backend) { public static final Endpoint> _ENDPOINT = OptionalEndpoint.noBodyOptional( request -> "GET", - request -> "backups/" + request.backend + "/" + request.backupId, + request -> "/backups/" + request.backend + "/" + request.backupId, request -> Collections.emptyMap(), Backup.class); } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java index 8f4f261ba..f4430b80d 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/GetRestoreStatusRequest.java @@ -9,7 +9,7 @@ public record GetRestoreStatusRequest(String backupId, String backend) { public static final Endpoint> _ENDPOINT = OptionalEndpoint.noBodyOptional( request -> "GET", - request -> "backups/" + request.backend + "/" + request.backupId + "/restore", + request -> "/backups/" + request.backend + "/" + request.backupId + "/restore", request -> Collections.emptyMap(), Backup.class); } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java index 22117732a..4fc71253a 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java @@ -15,12 +15,12 @@ public record RestoreBackupRequest(String backupId, String backend, BackupRestore body) { - public static Endpoint _ENDPOINT = new SimpleEndpoint<>( + public static Endpoint _ENDPOINT = new SimpleEndpoint<>( request -> "POST", request -> "/backups/" + request.backend + "/" + request.backupId + "/restore", request -> Collections.emptyMap(), request -> JSON.serialize(request.body), - (statusCode, response) -> new RestoreBackupResponse(JSON.deserialize(response, Backup.class))); + (statusCode, response) -> JSON.deserialize(response, Backup.class)); public record BackupRestore( @SerializedName("include") List includeCollections, diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java index 8cfb30ca4..c760067c9 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java @@ -1,12 +1,17 @@ package io.weaviate.client6.v1.api.backup; +import java.io.IOException; import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeoutException; public final class RestoreBackupResponse { private final Backup backup; + private final WeaviateBackupClient backupClient; - RestoreBackupResponse(Backup backup) { + RestoreBackupResponse(final Backup backup, final WeaviateBackupClient backupClient) { this.backup = backup; + this.backupClient = backupClient; } public String id() { @@ -32,4 +37,12 @@ public BackupStatus status() { public String error() { return backup.error(); } + + public Backup waitForCompletion() throws IOException, TimeoutException { + return new Waiter(backup, this::poll).waitForStatus(BackupStatus.SUCCESS); + } + + private Optional poll() throws Exception { + return this.backupClient.getRestoreStatus(id(), backend()); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java new file mode 100644 index 000000000..9bc68a6e7 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java @@ -0,0 +1,68 @@ +package io.weaviate.client6.v1.api.backup; + +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; + +final class Waiter { + private static final long WAIT_INTERVAL_MILLIS = 1_000; + private static final long TIMEOUT_MILLIS = 3600_000; + + private final Backup backup; + private final Callable> poll; + + Waiter(final Backup backup, Callable> poll) { + this.backup = backup; + this.poll = poll; + } + + Backup waitForStatus(BackupStatus wantStatus) throws IOException, TimeoutException { + if (backup.error() != null) { + throw new RuntimeException(backup.error()); + } + + if (backup.status() == wantStatus) { + return backup; + } + + Instant deadline = Instant.now().plusMillis(TIMEOUT_MILLIS); + Backup latest = backup; + while (!Thread.interrupted()) { + if (Instant.now().isAfter(deadline)) { + throw new TimeoutException("timed out after %s, latest status %s".formatted( + Duration.ofMillis(TIMEOUT_MILLIS).toSeconds(), latest.status())); + } + + try { + var current = poll.call().orElseThrow(); + latest = current; + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (latest.status() == wantStatus) { + return latest; + } else if (isComplete(latest)) { + throw new IllegalStateException("completed with status=%s without reaching %s" + .formatted(latest.status(), wantStatus)); + } + + try { + Thread.sleep(WAIT_INTERVAL_MILLIS); + } catch (InterruptedException e) { + System.out.println("Interrupted"); + Thread.currentThread().interrupt(); + } + } + return latest; + } + + private boolean isComplete(final Backup backup) { + return backup.status() == BackupStatus.SUCCESS + || backup.status() == BackupStatus.FAILED + || backup.status() == BackupStatus.CANCELED; + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java index 6d8ce9c58..3bee0aa13 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java @@ -27,8 +27,8 @@ public WeaviateBackupClient(RestTransport restTransport) { * due to a malformed request, a networking error * or the server being unavailable. */ - public void create(String backupId, String backend) throws IOException { - create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId), backend)); + public CreateBackupResponse create(String backupId, String backend) throws IOException { + return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId), backend)); } /** @@ -43,10 +43,10 @@ public void create(String backupId, String backend) throws IOException { * due to a malformed request, a networking error * or the server being unavailable. */ - public void create(String backupId, String backend, + public CreateBackupResponse create(String backupId, String backend, Function> fn) throws IOException { - create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId, fn), backend)); + return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId, fn), backend)); } /** @@ -60,7 +60,8 @@ public void create(String backupId, String backend, * or the server being unavailable. */ public CreateBackupResponse create(CreateBackupRequest request) throws IOException { - return this.restTransport.performRequest(request, CreateBackupRequest._ENDPOINT); + var backup = this.restTransport.performRequest(request, CreateBackupRequest._ENDPOINT); + return new CreateBackupResponse(backup, this); } /** @@ -123,7 +124,8 @@ public RestoreBackupResponse restore(String backupId, String backend, * or the server being unavailable. */ public RestoreBackupResponse restore(RestoreBackupRequest request) throws IOException { - return this.restTransport.performRequest(request, RestoreBackupRequest._ENDPOINT); + var backup = this.restTransport.performRequest(request, RestoreBackupRequest._ENDPOINT); + return new RestoreBackupResponse(backup, this); } /** From 857394f02ab31ace260a4555e6621762bd0b911f Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Mon, 29 Sep 2025 19:28:49 +0200 Subject: [PATCH 03/11] chore: fix javadoc parameters --- src/main/java/io/weaviate/client6/v1/internal/MapUtil.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/internal/MapUtil.java b/src/main/java/io/weaviate/client6/v1/internal/MapUtil.java index 4186572b7..05811cf46 100644 --- a/src/main/java/io/weaviate/client6/v1/internal/MapUtil.java +++ b/src/main/java/io/weaviate/client6/v1/internal/MapUtil.java @@ -28,9 +28,9 @@ private MapUtil() { * // Result: {1: 1, 2: 2, 3: null}; * } * - * @param stream Stream of elements {@link T}. - * @param keyFn Transforms element {@link T} to key {@link K}. - * @param keyFn Transforms element {@link T} to value {@link V}. + * @param stream Stream of elements {@link T}. + * @param keyFn Transforms element {@link T} to key {@link K}. + * @param valueFn Transforms element {@link T} to value {@link V}. * @return Map */ public static Map collect(Stream stream, Function keyFn, Function valueFn) { From aa68a9fb576a36f513594c3284ad55eb1a9aada5 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 30 Sep 2025 10:56:37 +0200 Subject: [PATCH 04/11] refactor: move wait/cancel operations to Backup class directly --- .../io/weaviate/integration/BackupITest.java | 12 ++-- .../client6/v1/api/backup/Backup.java | 32 +++++++++- .../v1/api/backup/CreateBackupResponse.java | 60 ------------------- .../v1/api/backup/RestoreBackupResponse.java | 48 --------------- .../v1/api/backup/WeaviateBackupClient.java | 20 +++---- 5 files changed, 47 insertions(+), 125 deletions(-) delete mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java delete mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java diff --git a/src/it/java/io/weaviate/integration/BackupITest.java b/src/it/java/io/weaviate/integration/BackupITest.java index fc1852ab2..3bfd9df4d 100644 --- a/src/it/java/io/weaviate/integration/BackupITest.java +++ b/src/it/java/io/weaviate/integration/BackupITest.java @@ -50,7 +50,7 @@ public void test_lifecycle() throws IOException, TimeoutException { .compressionLevel(CompressionLevel.BEST_SPEED)); // Assert - Assertions.assertThat(started.backup()) + Assertions.assertThat(started) .as("created backup operation") .returns(backup_1, Backup::id) .returns(backend, Backup::backend) @@ -60,7 +60,7 @@ public void test_lifecycle() throws IOException, TimeoutException { .containsOnly(nsA, nsB); // Act: await backup competion - var completed = started.waitForCompletion(); + var completed = started.waitForCompletion(client); // Assert Assertions.assertThat(completed) @@ -72,7 +72,7 @@ public void test_lifecycle() throws IOException, TimeoutException { // Act: create another backup String backup_2 = ns("backup_2").toLowerCase(); - client.backup.create(backup_2, backend).waitForCompletion(); + client.backup.create(backup_2, backend).waitForCompletion(client); // Assert: check the second backup is created successfully var status_2 = client.backup.getCreateStatus(backup_2, backend); @@ -91,8 +91,8 @@ public void test_lifecycle() throws IOException, TimeoutException { .includeCollections(nsA, nsB, nsC, nsBig) .cpuPercentage(1) .compressionLevel(CompressionLevel.BEST_COMPRESSION)); - cancelMe.cancel(); - cancelMe.waitForStatus(BackupStatus.CANCELED); + cancelMe.cancel(client); + cancelMe.waitForStatus(client, BackupStatus.CANCELED); // Assert: check the backup is cancelled var status_3 = client.backup.getCreateStatus(backup_3, backend); @@ -109,7 +109,7 @@ public void test_lifecycle() throws IOException, TimeoutException { // Act: delete data and restore backup #1 client.collections.delete(nsA); client.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA)) - .waitForCompletion(); + .waitForCompletion(client); // Assert: object inserted in the beginning of the test is present var restore_1 = client.backup.getRestoreStatus(backup_1, backend); diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java index 534369ef4..f8cf08694 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java @@ -1,14 +1,44 @@ package io.weaviate.client6.v1.api.backup; +import java.io.IOException; import java.util.List; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeoutException; import com.google.gson.annotations.SerializedName; +import io.weaviate.client6.v1.api.WeaviateClient; + public record Backup( @SerializedName("id") String id, @SerializedName("path") String path, @SerializedName("backend") String backend, @SerializedName("classes") List includesCollections, @SerializedName("status") BackupStatus status, - @SerializedName("error") String error) { + @SerializedName("error") String error, + @SerializedName("__operation__") Operation operation) { + + public Backup withOperation(Operation operation) { + return new Backup(id, path, backend, includesCollections, status, error, operation); + } + + public enum Operation { + CREATE, RESTORE; + } + + public Backup waitForCompletion(WeaviateClient client) throws IOException, TimeoutException { + return waitForStatus(client, BackupStatus.SUCCESS); + } + + public Backup waitForStatus(WeaviateClient client, BackupStatus status) throws IOException, TimeoutException { + final Callable> poll = operation == Operation.CREATE + ? () -> client.backup.getCreateStatus(id, backend) + : () -> client.backup.getRestoreStatus(id, backend); + return new Waiter(this, poll).waitForStatus(status); + } + + public void cancel(WeaviateClient client) throws IOException { + client.backup.cancel(id(), backend()); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java deleted file mode 100644 index d35daf614..000000000 --- a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupResponse.java +++ /dev/null @@ -1,60 +0,0 @@ -package io.weaviate.client6.v1.api.backup; - -import java.io.IOException; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeoutException; - -public final class CreateBackupResponse { - private final Backup backup; - private final WeaviateBackupClient backupClient; - - CreateBackupResponse(final Backup backup, final WeaviateBackupClient backupClient) { - this.backup = backup; - this.backupClient = backupClient; - } - - public Backup backup() { - return backup; - } - - public String id() { - return backup.id(); - } - - public String path() { - return backup.path(); - } - - public String backend() { - return backup.backend(); - } - - public List includesCollections() { - return backup.includesCollections(); - } - - public BackupStatus status() { - return backup.status(); - } - - public String error() { - return backup.error(); - } - - public Backup waitForCompletion() throws IOException, TimeoutException { - return waitForStatus(BackupStatus.SUCCESS); - } - - public Backup waitForStatus(BackupStatus status) throws IOException, TimeoutException { - return new Waiter(backup, this::poll).waitForStatus(status); - } - - private Optional poll() throws Exception { - return this.backupClient.getCreateStatus(id(), backend()); - } - - public void cancel() throws IOException { - this.backupClient.cancel(id(), backend()); - } -} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java deleted file mode 100644 index c760067c9..000000000 --- a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupResponse.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.weaviate.client6.v1.api.backup; - -import java.io.IOException; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeoutException; - -public final class RestoreBackupResponse { - private final Backup backup; - private final WeaviateBackupClient backupClient; - - RestoreBackupResponse(final Backup backup, final WeaviateBackupClient backupClient) { - this.backup = backup; - this.backupClient = backupClient; - } - - public String id() { - return backup.id(); - } - - public String path() { - return backup.path(); - } - - public String backend() { - return backup.backend(); - } - - public List includesCollections() { - return backup.includesCollections(); - } - - public BackupStatus status() { - return backup.status(); - } - - public String error() { - return backup.error(); - } - - public Backup waitForCompletion() throws IOException, TimeoutException { - return new Waiter(backup, this::poll).waitForStatus(BackupStatus.SUCCESS); - } - - private Optional poll() throws Exception { - return this.backupClient.getRestoreStatus(id(), backend()); - } -} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java index 3bee0aa13..5ec82737c 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java @@ -27,7 +27,7 @@ public WeaviateBackupClient(RestTransport restTransport) { * due to a malformed request, a networking error * or the server being unavailable. */ - public CreateBackupResponse create(String backupId, String backend) throws IOException { + public Backup create(String backupId, String backend) throws IOException { return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId), backend)); } @@ -43,7 +43,7 @@ public CreateBackupResponse create(String backupId, String backend) throws IOExc * due to a malformed request, a networking error * or the server being unavailable. */ - public CreateBackupResponse create(String backupId, String backend, + public Backup create(String backupId, String backend, Function> fn) throws IOException { return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId, fn), backend)); @@ -59,9 +59,9 @@ public CreateBackupResponse create(String backupId, String backend, * due to a malformed request, a networking error * or the server being unavailable. */ - public CreateBackupResponse create(CreateBackupRequest request) throws IOException { - var backup = this.restTransport.performRequest(request, CreateBackupRequest._ENDPOINT); - return new CreateBackupResponse(backup, this); + public Backup create(CreateBackupRequest request) throws IOException { + return this.restTransport.performRequest(request, CreateBackupRequest._ENDPOINT) + .withOperation(Backup.Operation.CREATE); } /** @@ -91,7 +91,7 @@ public Optional getCreateStatus(String backupId, String backend) throws * due to a malformed request, a networking error * or the server being unavailable. */ - public RestoreBackupResponse restore(String backupId, String backend) throws IOException { + public Backup restore(String backupId, String backend) throws IOException { return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of())); } @@ -107,7 +107,7 @@ public RestoreBackupResponse restore(String backupId, String backend) throws IOE * due to a malformed request, a networking error * or the server being unavailable. */ - public RestoreBackupResponse restore(String backupId, String backend, + public Backup restore(String backupId, String backend, Function> fn) throws IOException { return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of(fn))); @@ -123,9 +123,9 @@ public RestoreBackupResponse restore(String backupId, String backend, * due to a malformed request, a networking error * or the server being unavailable. */ - public RestoreBackupResponse restore(RestoreBackupRequest request) throws IOException { - var backup = this.restTransport.performRequest(request, RestoreBackupRequest._ENDPOINT); - return new RestoreBackupResponse(backup, this); + public Backup restore(RestoreBackupRequest request) throws IOException { + return this.restTransport.performRequest(request, RestoreBackupRequest._ENDPOINT) + .withOperation(Backup.Operation.RESTORE); } /** From 0c380646cf8dd9ae7b425c3c74ac817f8a8afb0a Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 30 Sep 2025 11:23:11 +0200 Subject: [PATCH 05/11] feat: provide options to customize wait interval/timeout --- .../io/weaviate/integration/BackupITest.java | 27 ++++---- .../client6/v1/api/backup/Backup.java | 15 ++++- .../client6/v1/api/backup/WaitOptions.java | 64 +++++++++++++++++++ .../client6/v1/api/backup/Waiter.java | 12 ++-- 4 files changed, 100 insertions(+), 18 deletions(-) create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/WaitOptions.java diff --git a/src/it/java/io/weaviate/integration/BackupITest.java b/src/it/java/io/weaviate/integration/BackupITest.java index 3bfd9df4d..c0c256abf 100644 --- a/src/it/java/io/weaviate/integration/BackupITest.java +++ b/src/it/java/io/weaviate/integration/BackupITest.java @@ -17,6 +17,7 @@ import io.weaviate.client6.v1.api.backup.Backup; import io.weaviate.client6.v1.api.backup.BackupStatus; import io.weaviate.client6.v1.api.backup.CompressionLevel; +import io.weaviate.client6.v1.api.collections.Property; import io.weaviate.containers.Weaviate; public class BackupITest extends ConcurrentTest { @@ -92,7 +93,7 @@ public void test_lifecycle() throws IOException, TimeoutException { .cpuPercentage(1) .compressionLevel(CompressionLevel.BEST_COMPRESSION)); cancelMe.cancel(client); - cancelMe.waitForStatus(client, BackupStatus.CANCELED); + cancelMe.waitForStatus(client, BackupStatus.CANCELED, wait -> wait.interval(500)); // Assert: check the backup is cancelled var status_3 = client.backup.getCreateStatus(backup_3, backend); @@ -108,26 +109,30 @@ public void test_lifecycle() throws IOException, TimeoutException { // Act: delete data and restore backup #1 client.collections.delete(nsA); - client.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA)) - .waitForCompletion(client); + client.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA)); // Assert: object inserted in the beginning of the test is present - var restore_1 = client.backup.getRestoreStatus(backup_1, backend); - Assertions.assertThat(restore_1).as("restore backup #1").get() + var restore_1 = client.backup.getRestoreStatus(backup_1, backend) + .orElseThrow().waitForCompletion(client); + Assertions.assertThat(restore_1).as("restore backup #1") .returns(BackupStatus.SUCCESS, Backup::status); Assertions.assertThat(collectionA.size()).as("after restore backup #1").isEqualTo(1); } + /** Write 10_000 entries with a UUID[10] property. */ private CompletableFuture spamData(String collectionName) { return CompletableFuture.supplyAsync(() -> { - var spam = client.collections.use(collectionName); - for (int i = 0; i < 10_000; i++) { - var uuids = IntStream.range(0, 10).mapToObj(j -> UUID.randomUUID()).toArray(); - try { + try { + client.collections.create(collectionName, + c -> c.properties(Property.uuidArray("uuids"))); + + var spam = client.collections.use(collectionName); + for (int i = 0; i < 10_000; i++) { + var uuids = IntStream.range(0, 10).mapToObj(j -> UUID.randomUUID()).toArray(); spam.data.insert(Map.of("uuids", uuids)); - } catch (IOException e) { - throw new CompletionException(e); } + } catch (IOException e) { + throw new CompletionException(e); } return null; }); diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java index f8cf08694..0c0ca4390 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java @@ -5,10 +5,12 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import com.google.gson.annotations.SerializedName; import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.internal.ObjectBuilder; public record Backup( @SerializedName("id") String id, @@ -31,11 +33,22 @@ public Backup waitForCompletion(WeaviateClient client) throws IOException, Timeo return waitForStatus(client, BackupStatus.SUCCESS); } + public Backup waitForCompletion(WeaviateClient client, Function> fn) + throws IOException, TimeoutException { + return waitForStatus(client, BackupStatus.SUCCESS, fn); + } + public Backup waitForStatus(WeaviateClient client, BackupStatus status) throws IOException, TimeoutException { + return waitForStatus(client, status, ObjectBuilder.identity()); + } + + public Backup waitForStatus(WeaviateClient client, BackupStatus status, + Function> fn) throws IOException, TimeoutException { + final var options = WaitOptions.of(fn); final Callable> poll = operation == Operation.CREATE ? () -> client.backup.getCreateStatus(id, backend) : () -> client.backup.getRestoreStatus(id, backend); - return new Waiter(this, poll).waitForStatus(status); + return new Waiter(this, poll, options).waitForStatus(status); } public void cancel(WeaviateClient client) throws IOException { diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WaitOptions.java b/src/main/java/io/weaviate/client6/v1/api/backup/WaitOptions.java new file mode 100644 index 000000000..1044bc6e2 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WaitOptions.java @@ -0,0 +1,64 @@ +package io.weaviate.client6.v1.api.backup; + +import java.time.Duration; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; + +public record WaitOptions(long interval, long timeout) { + private static final long DEFAULT_INTERVAL_MILLIS = 1_000; + private static final long DEFAULT_TIMEOUT_MILLIS = 3600_000; + + public static WaitOptions of(Function> fn) { + return fn.apply(new Builder()).build(); + } + + public WaitOptions(Builder builder) { + this(builder.interval, builder.timeout); + } + + public static class Builder implements ObjectBuilder { + private long interval = DEFAULT_INTERVAL_MILLIS; + private long timeout = DEFAULT_TIMEOUT_MILLIS; + + /** Set polling interval. Defaults to 1s. */ + public Builder interval(Duration duration) { + return interval(duration.toMillis()); + } + + /** + * Set polling interval. Defaults to 1s. + * + * @param intervalMillis Polling interval in milliseconds. Minimum 1ms. + */ + public Builder interval(long intervalMillis) { + this.interval = Math.max(intervalMillis, 1); + return this; + } + + /** + * Set wait timeout. Defaults to 1s. + * + * @param duration Wait timeout duration. + */ + public Builder timeout(Duration duration) { + return timeout(duration.toMillis()); + } + + /** + * Set wait timeout. Set this to a negative value + * for the wait to expire immediately. + * + * @param timeoutMillis Wait timeout in milliseconds. + */ + public Builder timeout(long timeoutMillis) { + this.timeout = timeoutMillis; + return this; + } + + @Override + public WaitOptions build() { + return new WaitOptions(this); + } + } +} diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java index 9bc68a6e7..e0251b758 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java @@ -8,15 +8,15 @@ import java.util.concurrent.TimeoutException; final class Waiter { - private static final long WAIT_INTERVAL_MILLIS = 1_000; - private static final long TIMEOUT_MILLIS = 3600_000; private final Backup backup; private final Callable> poll; + private final WaitOptions wait; - Waiter(final Backup backup, Callable> poll) { + Waiter(final Backup backup, Callable> poll, WaitOptions wait) { this.backup = backup; this.poll = poll; + this.wait = wait; } Backup waitForStatus(BackupStatus wantStatus) throws IOException, TimeoutException { @@ -28,12 +28,12 @@ Backup waitForStatus(BackupStatus wantStatus) throws IOException, TimeoutExcepti return backup; } - Instant deadline = Instant.now().plusMillis(TIMEOUT_MILLIS); + Instant deadline = Instant.now().plusMillis(wait.timeout()); Backup latest = backup; while (!Thread.interrupted()) { if (Instant.now().isAfter(deadline)) { throw new TimeoutException("timed out after %s, latest status %s".formatted( - Duration.ofMillis(TIMEOUT_MILLIS).toSeconds(), latest.status())); + Duration.ofMillis(wait.timeout()).toSeconds(), latest.status())); } try { @@ -51,7 +51,7 @@ Backup waitForStatus(BackupStatus wantStatus) throws IOException, TimeoutExcepti } try { - Thread.sleep(WAIT_INTERVAL_MILLIS); + Thread.sleep(wait.interval()); } catch (InterruptedException e) { System.out.println("Interrupted"); Thread.currentThread().interrupt(); From 0ab926a83af554564072a50e792d4891d5d83bce Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 30 Sep 2025 11:36:09 +0200 Subject: [PATCH 06/11] fix: throw on invalid operations --- .../java/io/weaviate/integration/BackupITest.java | 15 +++++++++++++++ .../io/weaviate/client6/v1/api/backup/Backup.java | 7 +++++++ .../v1/api/backup/WeaviateBackupClient.java | 10 ++++++---- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/src/it/java/io/weaviate/integration/BackupITest.java b/src/it/java/io/weaviate/integration/BackupITest.java index c0c256abf..2d05bfcd4 100644 --- a/src/it/java/io/weaviate/integration/BackupITest.java +++ b/src/it/java/io/weaviate/integration/BackupITest.java @@ -1,6 +1,7 @@ package io.weaviate.integration; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -119,6 +120,20 @@ public void test_lifecycle() throws IOException, TimeoutException { Assertions.assertThat(collectionA.size()).as("after restore backup #1").isEqualTo(1); } + @Test(expected = IllegalStateException.class) + public void test_cancelRestore() throws IOException { + var backup = new Backup("#1", "/tmp/bak/#1", "filesystem", List.of("Things"), BackupStatus.STARTED, null, + Backup.Operation.RESTORE); + backup.cancel(client); + } + + @Test(expected = IllegalStateException.class) + public void test_waitForCompletion_unknown() throws IOException, TimeoutException { + var backup = new Backup("#1", "/tmp/bak/#1", "filesystem", List.of("Things"), BackupStatus.STARTED, null, + null); + backup.waitForCompletion(client); + } + /** Write 10_000 entries with a UUID[10] property. */ private CompletableFuture spamData(String collectionName) { return CompletableFuture.supplyAsync(() -> { diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java index 0c0ca4390..a3d583256 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java @@ -44,6 +44,10 @@ public Backup waitForStatus(WeaviateClient client, BackupStatus status) throws I public Backup waitForStatus(WeaviateClient client, BackupStatus status, Function> fn) throws IOException, TimeoutException { + if (operation == null) { + throw new IllegalStateException("backup.operation is null"); + } + final var options = WaitOptions.of(fn); final Callable> poll = operation == Operation.CREATE ? () -> client.backup.getCreateStatus(id, backend) @@ -52,6 +56,9 @@ public Backup waitForStatus(WeaviateClient client, BackupStatus status, } public void cancel(WeaviateClient client) throws IOException { + if (operation == Operation.RESTORE) { + throw new IllegalStateException("backup restore cannot be canceled"); + } client.backup.cancel(id(), backend()); } } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java index 5ec82737c..b7ea450df 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java @@ -76,8 +76,9 @@ public Backup create(CreateBackupRequest request) throws IOException { * or the server being unavailable. */ public Optional getCreateStatus(String backupId, String backend) throws IOException { - return this.restTransport.performRequest(new GetCreateStatusRequest(backupId, backend), - GetCreateStatusRequest._ENDPOINT); + return this.restTransport.performRequest( + new GetCreateStatusRequest(backupId, backend), GetCreateStatusRequest._ENDPOINT) + .map(b -> b.withOperation(Backup.Operation.CREATE)); } /** @@ -140,8 +141,9 @@ public Backup restore(RestoreBackupRequest request) throws IOException { * or the server being unavailable. */ public Optional getRestoreStatus(String backupId, String backend) throws IOException { - return this.restTransport.performRequest(new GetRestoreStatusRequest(backupId, backend), - GetRestoreStatusRequest._ENDPOINT); + return this.restTransport + .performRequest(new GetRestoreStatusRequest(backupId, backend), GetRestoreStatusRequest._ENDPOINT) + .map(b -> b.withOperation(Backup.Operation.RESTORE)); } /** From f14289fde214b72019e498c6d675391bcf5db51e Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 30 Sep 2025 12:46:15 +0200 Subject: [PATCH 07/11] docs: write javadoc --- .../client6/v1/api/backup/Backup.java | 80 ++++++++++++++++++- .../client6/v1/api/backup/BackupStatus.java | 8 ++ .../v1/api/backup/CompressionLevel.java | 3 + .../v1/api/backup/CreateBackupRequest.java | 20 +++++ .../v1/api/backup/RbacRestoreOption.java | 3 + .../v1/api/backup/RestoreBackupRequest.java | 20 +++++ .../client6/v1/api/backup/Waiter.java | 4 +- .../v1/api/backup/WeaviateBackupClient.java | 3 + 8 files changed, 137 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java index a3d583256..ae3d05762 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java @@ -13,14 +13,27 @@ import io.weaviate.client6.v1.internal.ObjectBuilder; public record Backup( + /** Backup ID. */ @SerializedName("id") String id, + /** Path to backup in the backend storage. */ @SerializedName("path") String path, + /** Backup storage backend. */ @SerializedName("backend") String backend, + /** Collections included in the backup. */ @SerializedName("classes") List includesCollections, + /** Backup creation / restoration status. */ @SerializedName("status") BackupStatus status, + /** Backup creation / restoration error. */ @SerializedName("error") String error, + /** + * This value indicates if a backup is being created or restored from. + * For operations like LIST this value is null. + */ + // We set a bogus SerializedName to make sure we do not pick up this + // value from the JSON by accident, but always set it ourselves. @SerializedName("__operation__") Operation operation) { + /** Set operation associated with this backup. */ public Backup withOperation(Operation operation) { return new Backup(id, path, backend, includesCollections, status, error, operation); } @@ -29,19 +42,71 @@ public enum Operation { CREATE, RESTORE; } + /** + * Block until the backup has been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} is not + * called before this method returns. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * BackupStatus.SUCCESS. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ public Backup waitForCompletion(WeaviateClient client) throws IOException, TimeoutException { return waitForStatus(client, BackupStatus.SUCCESS); } + /** + * Block until the backup has been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} is not + * called before this method returns. + * @param fn Lambda expression for optional parameters. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * BackupStatus.SUCCESS. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ public Backup waitForCompletion(WeaviateClient client, Function> fn) throws IOException, TimeoutException { return waitForStatus(client, BackupStatus.SUCCESS, fn); } + /** + * Block until the backup operation reaches a certain status. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} is not + * called before this method returns. + * @param status Target status. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * the target status. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ public Backup waitForStatus(WeaviateClient client, BackupStatus status) throws IOException, TimeoutException { return waitForStatus(client, status, ObjectBuilder.identity()); } + /** + * Block until the backup operation reaches a certain status. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} is not + * called before this method returns. + * @param status Target status. + * @param fn Lambda expression for optional parameters. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * the target status. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ public Backup waitForStatus(WeaviateClient client, BackupStatus status, Function> fn) throws IOException, TimeoutException { if (operation == null) { @@ -55,10 +120,19 @@ public Backup waitForStatus(WeaviateClient client, BackupStatus status, return new Waiter(this, poll, options).waitForStatus(status); } + /** + * Cancel backup creation. + * + *

+ * This method cannot be called cancel backup restore. + * + * @param client Weaviate client. Make sure {@link WeaviateClient#close} is not + * called before this method returns. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ public void cancel(WeaviateClient client) throws IOException { - if (operation == Operation.RESTORE) { - throw new IllegalStateException("backup restore cannot be canceled"); - } client.backup.cancel(id(), backend()); } } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java b/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java index 13487918b..74d4ff30b 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/BackupStatus.java @@ -3,14 +3,22 @@ import com.google.gson.annotations.SerializedName; public enum BackupStatus { + /** Backup creation / restoration has begun. */ @SerializedName("STARTED") STARTED, + /** Backup in progress, data is being transferred. */ @SerializedName("TRANSFERRING") TRANSFERRING, + /** Backup creation / restoration completed successfully. */ @SerializedName("SUCCESS") SUCCESS, + /** Backup creation / restoration failed. */ @SerializedName("FAILED") FAILED, + /** + * Backup creation canceled. + * This status is never returned for backup restorations. + */ @SerializedName("CANCELED") CANCELED; } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java b/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java index 7f8a75841..1a6cd0003 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CompressionLevel.java @@ -3,10 +3,13 @@ import com.google.gson.annotations.SerializedName; public enum CompressionLevel { + /** Use default compression algorithm (gzip). */ @SerializedName("DefaultCompression") DEFAULT, + /** Use compression algorithm that prioritizes speed. */ @SerializedName("BestSpeed") BEST_SPEED, + /** Use compression algorithm that prioritizes compression quality. */ @SerializedName("BestCompression") BEST_COMPRESSION; } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java index bddb3119f..9db358ebc 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/CreateBackupRequest.java @@ -72,44 +72,64 @@ public Builder(String backupId) { this.backupId = backupId; } + /** Collection that should be included in the backup. */ public Builder includeCollections(String... includeCollections) { return includeCollections(Arrays.asList(includeCollections)); } + /** Collection that should be included in the backup. */ public Builder includeCollections(List includeCollections) { this.includeCollections.addAll(includeCollections); return this; } + /** Collection that should be excluded from the backup. */ public Builder excludeCollections(String... excludeCollections) { return excludeCollections(Arrays.asList(excludeCollections)); } + /** Collection that should be excluded from the backup. */ public Builder excludeCollections(List excludeCollections) { this.excludeCollections.addAll(excludeCollections); return this; } + /** + * Set the desired CPU core utilization. + * + * @param cpuPercentage Percent value of the target CPU utilization (1% to 80%). + */ public Builder cpuPercentage(int cpuPercentage) { this.cpuPercentage = cpuPercentage; return this; } + /** + * Set the desired chunk size. Defaults to 128MB. + * + * @param chunkSize Chunk size in MB (2MB to 512 MB). + */ public Builder chunkSize(int chunkSize) { this.chunkSize = chunkSize; return this; } + /** Adjust the parameters of the selected compression algorithm. */ public Builder compressionLevel(CompressionLevel compressionLevel) { this.compressionLevel = compressionLevel; return this; } + /** + * Set the bucket where backups are stored. + * Applicable for cloud storage backends. + */ public Builder bucket(String bucket) { this.bucket = bucket; return this; } + /** Override default backup location. */ public Builder path(String path) { this.path = path; return this; diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java b/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java index b4a45658c..b2ba14d68 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RbacRestoreOption.java @@ -2,9 +2,12 @@ import com.google.gson.annotations.SerializedName; +/** Controls which RBAC objects (users, roles) get restored. */ public enum RbacRestoreOption { + /** Do not restore any objects. */ @SerializedName("noRestore") NONE, + /** Restore all objects. */ @SerializedName("all") ALL; } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java index 4fc71253a..bc017d8ea 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/RestoreBackupRequest.java @@ -68,49 +68,69 @@ public static class Builder implements ObjectBuilder { private final List includeCollections = new ArrayList<>(); private final List excludeCollections = new ArrayList<>(); + /** Collection that should be restored. */ public Builder includeCollections(String... includeCollections) { return includeCollections(Arrays.asList(includeCollections)); } + /** Collection that should be restored. */ public Builder includeCollections(List includeCollections) { this.includeCollections.addAll(includeCollections); return this; } + /** Collection that should be not be restored. */ public Builder excludeCollections(String... excludeCollections) { return excludeCollections(Arrays.asList(excludeCollections)); } + /** Collection that should be not be restored. */ public Builder excludeCollections(List excludeCollections) { this.excludeCollections.addAll(excludeCollections); return this; } + /** + * Set the desired CPU core utilization. + * + * @param cpuPercentage Percent value of the target CPU utilization (1% to 80%). + */ public Builder cpuPercentage(int cpuPercentage) { this.cpuPercentage = cpuPercentage; return this; } + /** + * Set the bucket where backups are stored. + * Applicable for cloud storage backends. + */ public Builder bucket(String bucket) { this.bucket = bucket; return this; } + /** Override default backup location. */ public Builder path(String path) { this.path = path; return this; } + /** + * Allow restored collection aliases to overwrite existing ones + * in case of conflict. + */ public Builder overwriteAlias(boolean overwriteAlias) { this.overwriteAlias = overwriteAlias; return this; } + /** Control which RBAC users should be restored. */ public Builder restoreUsers(RbacRestoreOption restoreUsers) { this.restoreUsers = restoreUsers; return this; } + /** Control which RBAC roles should be restored. */ public Builder restoreRoles(RbacRestoreOption restoreRoles) { this.restoreRoles = restoreRoles; return this; diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java index e0251b758..3a67a54f8 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java @@ -53,7 +53,9 @@ Backup waitForStatus(BackupStatus wantStatus) throws IOException, TimeoutExcepti try { Thread.sleep(wait.interval()); } catch (InterruptedException e) { - System.out.println("Interrupted"); + // TODO: the interrupted state will be cleared on the next while() check + // and then we will simply return the latest state. An absence of an exception + // might be misleading here. What should we do? Thread.currentThread().interrupt(); } } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java index b7ea450df..e2235d4ce 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClient.java @@ -163,6 +163,9 @@ public List list(String backend) throws IOException { /** * Cancel in-progress backup. * + *

+ * This method cannot be called cancel backup restore. + * * @param backupId Backup ID. * @param backend Backup storage backend. * @throws WeaviateApiException in case the server returned with an From f91fcbad4eb93d9ba9bfd20ccd1269f3b05a7f31 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 30 Sep 2025 14:20:37 +0200 Subject: [PATCH 08/11] feat: add async implementation --- .../io/weaviate/integration/BackupITest.java | 112 +++++++++++++-- .../client6/v1/api/WeaviateClient.java | 2 +- .../client6/v1/api/WeaviateClientAsync.java | 5 + .../client6/v1/api/backup/Backup.java | 100 ++++++++++++-- .../client6/v1/api/backup/Waiter.java | 44 +++++- .../api/backup/WeaviateBackupClientAsync.java | 128 ++++++++++++++++++ 6 files changed, 365 insertions(+), 26 deletions(-) create mode 100644 src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClientAsync.java diff --git a/src/it/java/io/weaviate/integration/BackupITest.java b/src/it/java/io/weaviate/integration/BackupITest.java index 2d05bfcd4..67a7bcdf2 100644 --- a/src/it/java/io/weaviate/integration/BackupITest.java +++ b/src/it/java/io/weaviate/integration/BackupITest.java @@ -6,6 +6,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; import java.util.stream.IntStream; @@ -48,7 +49,7 @@ public void test_lifecycle() throws IOException, TimeoutException { // Act: start backup var started = client.backup.create(backup_1, backend, backup -> backup - .excludeCollections(nsC, nsBig) + .includeCollections(nsA, nsB) .compressionLevel(CompressionLevel.BEST_SPEED)); // Assert @@ -104,9 +105,8 @@ public void test_lifecycle() throws IOException, TimeoutException { // Assert: all 3 backups are present var all = client.backup.list(backend); Assertions.assertThat(all).as("all backups") - .hasSize(3) .extracting(Backup::id) - .containsOnly(backup_1, backup_2, backup_3); + .contains(backup_1, backup_2, backup_3); // Act: delete data and restore backup #1 client.collections.delete(nsA); @@ -120,11 +120,107 @@ public void test_lifecycle() throws IOException, TimeoutException { Assertions.assertThat(collectionA.size()).as("after restore backup #1").isEqualTo(1); } - @Test(expected = IllegalStateException.class) - public void test_cancelRestore() throws IOException { - var backup = new Backup("#1", "/tmp/bak/#1", "filesystem", List.of("Things"), BackupStatus.STARTED, null, - Backup.Operation.RESTORE); - backup.cancel(client); + @Test + public void test_lifecycle_async() throws ExecutionException, InterruptedException, Exception { + // Arrange + String nsA = ns("A"), nsB = ns("B"), nsC = ns("C"), nsBig = ns("Big"); + String backup_1 = ns("backup_1").toLowerCase(); + String backend = "filesystem"; + + try (final var async = client.async()) { + // Start writing data in the background so it's ready + // by the time we get to backup #3. + var spam = spamData(nsBig); + + CompletableFuture.allOf( + async.collections.create(nsA), + async.collections.create(nsB), + async.collections.create(nsC)) + .join(); + + // Insert some data to check restore later + var collectionA = async.collections.use(nsA); + collectionA.data.insert(Map.of()).join(); + + // Act: start backup + var started = async.backup.create(backup_1, backend, + backup -> backup + .includeCollections(nsA, nsB) + .compressionLevel(CompressionLevel.BEST_SPEED)) + .join(); + + // Assert + Assertions.assertThat(started) + .as("created backup operation") + .returns(backup_1, Backup::id) + .returns(backend, Backup::backend) + .returns(BackupStatus.STARTED, Backup::status) + .returns(null, Backup::error) + .extracting(Backup::includesCollections, InstanceOfAssertFactories.list(String.class)) + .containsOnly(nsA, nsB); + + // Act: await backup competion + var completed = started.waitForCompletion(async).join(); + + // Assert + Assertions.assertThat(completed) + .as("await backup completion") + .returns(backup_1, Backup::id) + .returns(backend, Backup::backend) + .returns(BackupStatus.SUCCESS, Backup::status) + .returns(null, Backup::error); + + // Act: create another backup + String backup_2 = ns("backup_2").toLowerCase(); + async.backup.create(backup_2, backend) + .thenCompose(bak -> bak.waitForCompletion(async)) + .join(); + + // Assert: check the second backup is created successfully + var status_2 = async.backup.getCreateStatus(backup_2, backend).join(); + Assertions.assertThat(status_2).as("backup #2").get() + .returns(BackupStatus.SUCCESS, Backup::status); + + // Act: create and cancel + // Try to throttle this backup by creating a lot of objects, + // limiting CPU resources and requiring high compression ratio. + // This is to avoid flaky tests and make sure we can cancel + // the backup before it completes successfully. + String backup_3 = ns("backup_3").toLowerCase(); + spam.join(); + async.backup.create(backup_3, backend, + backup -> backup + .includeCollections(nsA, nsB, nsC, nsBig) + .cpuPercentage(1) + .compressionLevel(CompressionLevel.BEST_COMPRESSION)) + .thenCompose(cancelMe -> cancelMe.cancel(async) + .thenCompose(__ -> cancelMe.waitForStatus(async, BackupStatus.CANCELED, + wait -> wait.interval(500)))) + .join(); + + // Assert: check the backup is cancelled + var status_3 = async.backup.getCreateStatus(backup_3, backend).join(); + Assertions.assertThat(status_3).as("backup #3").get() + .returns(BackupStatus.CANCELED, Backup::status); + + // Assert: all 3 backups are present + var all = async.backup.list(backend).join(); + Assertions.assertThat(all).as("all backups") + .extracting(Backup::id) + .contains(backup_1, backup_2, backup_3); + + // Act: delete data and restore backup #1 + async.collections.delete(nsA).join(); + async.backup.restore(backup_1, backend, restore -> restore.includeCollections(nsA)).join(); + + // Assert: object inserted in the beginning of the test is present + var restore_1 = async.backup.getRestoreStatus(backup_1, backend) + .thenCompose(bak -> bak.orElseThrow().waitForCompletion(async)) + .join(); + Assertions.assertThat(restore_1).as("restore backup #1") + .returns(BackupStatus.SUCCESS, Backup::status); + Assertions.assertThat(collectionA.size().join()).as("after restore backup #1").isEqualTo(1); + } } @Test(expected = IllegalStateException.class) diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java index e667d3e3b..4d7224a25 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClient.java @@ -32,7 +32,7 @@ public class WeaviateClient implements AutoCloseable { /** Client for {@code /aliases} endpoints for managing collection aliases. */ public final WeaviateAliasClient alias; - /** Client for {@code /backups} endpoints for managing collection aliases. */ + /** Client for {@code /backups} endpoints for managing backups. */ public final WeaviateBackupClient backup; public WeaviateClient(Config config) { diff --git a/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java index 7255fbc6e..01b46ce11 100644 --- a/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/WeaviateClientAsync.java @@ -5,6 +5,7 @@ import java.util.function.Function; import io.weaviate.client6.v1.api.alias.WeaviateAliasClientAsync; +import io.weaviate.client6.v1.api.backup.WeaviateBackupClientAsync; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClient; import io.weaviate.client6.v1.api.collections.WeaviateCollectionsClientAsync; import io.weaviate.client6.v1.internal.ObjectBuilder; @@ -30,6 +31,9 @@ public class WeaviateClientAsync implements AutoCloseable { /** Client for {@code /aliases} endpoints for managing collection aliases. */ public final WeaviateAliasClientAsync alias; + /** Client for {@code /backups} endpoints for managing backups. */ + public final WeaviateBackupClientAsync backup; + /** * This constructor is blocking if {@link Authentication} configured, * as the client will need to do the initial token exchange. @@ -84,6 +88,7 @@ public WeaviateClientAsync(Config config) { this.restTransport = _restTransport; this.grpcTransport = new DefaultGrpcTransport(grpcOpt); this.alias = new WeaviateAliasClientAsync(restTransport); + this.backup = new WeaviateBackupClientAsync(restTransport); this.collections = new WeaviateCollectionsClientAsync(restTransport, grpcTransport); } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java index ae3d05762..5cebd5e07 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java @@ -4,12 +4,15 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import java.util.function.Function; +import java.util.function.Supplier; import com.google.gson.annotations.SerializedName; import io.weaviate.client6.v1.api.WeaviateClient; +import io.weaviate.client6.v1.api.WeaviateClientAsync; import io.weaviate.client6.v1.internal.ObjectBuilder; public record Backup( @@ -45,8 +48,8 @@ public enum Operation { /** * Block until the backup has been created / restored successfully. * - * @param client Weaviate client. Make sure {@link WeaviateClient#close} is not - * called before this method returns. + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. * @throws IllegalStateException if {@link #operation} is not set (null). * @throws TimeoutException in case the wait times out without reaching * BackupStatus.SUCCESS. @@ -61,8 +64,8 @@ public Backup waitForCompletion(WeaviateClient client) throws IOException, Timeo /** * Block until the backup has been created / restored successfully. * - * @param client Weaviate client. Make sure {@link WeaviateClient#close} is not - * called before this method returns. + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. * @param fn Lambda expression for optional parameters. * @throws IllegalStateException if {@link #operation} is not set (null). * @throws TimeoutException in case the wait times out without reaching @@ -79,8 +82,8 @@ public Backup waitForCompletion(WeaviateClient client, Function> poll = operation == Operation.CREATE ? () -> client.backup.getCreateStatus(id, backend) : () -> client.backup.getRestoreStatus(id, backend); - return new Waiter(this, poll, options).waitForStatus(status); + return new Waiter(this, options).waitForStatus(status, poll); } /** * Cancel backup creation. * *

- * This method cannot be called cancel backup restore. + * This method cannot be called to cancel backup restore. * - * @param client Weaviate client. Make sure {@link WeaviateClient#close} is not - * called before this method returns. + * @param client Weaviate client. Make sure {@link WeaviateClient#close} + * is NOT called before this method returns. * @throws IOException in case the request was not sent successfully * due to a malformed request, a networking error * or the server being unavailable. @@ -135,4 +138,77 @@ public Backup waitForStatus(WeaviateClient client, BackupStatus status, public void cancel(WeaviateClient client) throws IOException { client.backup.cancel(id(), backend()); } + + /** + * Poll until backup's been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @throws IllegalStateException if {@link #operation} is not set (null). + * @throws TimeoutException in case the wait times out without reaching + * BackupStatus.SUCCESS. + * @throws IOException in case the request was not sent successfully + * due to a malformed request, a networking error + * or the server being unavailable. + */ + public CompletableFuture waitForCompletion(WeaviateClientAsync client) { + return waitForStatus(client, BackupStatus.SUCCESS); + } + + /** + * Poll until backup's been created / restored successfully. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture waitForCompletion(WeaviateClientAsync client, + Function> fn) { + return waitForStatus(client, BackupStatus.SUCCESS, fn); + } + + /** + * Poll until backup reaches a certain status or the wait times out. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param status Target status. + */ + public CompletableFuture waitForStatus(WeaviateClientAsync client, BackupStatus status) { + return waitForStatus(client, status, ObjectBuilder.identity()); + } + + /** + * Poll until backup reaches a certain status or the wait times out. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + * @param status Target status. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture waitForStatus(WeaviateClientAsync client, BackupStatus status, + Function> fn) { + if (operation == null) { + throw new IllegalStateException("backup.operation is null"); + } + + final var options = WaitOptions.of(fn); + final Supplier>> poll = operation == Operation.CREATE + ? () -> client.backup.getCreateStatus(id, backend) + : () -> client.backup.getRestoreStatus(id, backend); + return new Waiter(this, options).waitForStatusAsync(status, poll); + } + + /** + * Cancel backup creation. + * + *

+ * This method cannot be called to cancel backup restore. + * + * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} + * is NOT called before this method returns. + */ + public CompletableFuture cancel(WeaviateClientAsync client) { + return client.backup.cancel(id(), backend()); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java index 3a67a54f8..82528b735 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Waiter.java @@ -5,21 +5,24 @@ import java.time.Instant; import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; final class Waiter { private final Backup backup; - private final Callable> poll; private final WaitOptions wait; - Waiter(final Backup backup, Callable> poll, WaitOptions wait) { + Waiter(final Backup backup, WaitOptions wait) { this.backup = backup; - this.poll = poll; this.wait = wait; } - Backup waitForStatus(BackupStatus wantStatus) throws IOException, TimeoutException { + Backup waitForStatus(final BackupStatus wantStatus, Callable> poll) + throws IOException, TimeoutException { if (backup.error() != null) { throw new RuntimeException(backup.error()); } @@ -28,7 +31,7 @@ Backup waitForStatus(BackupStatus wantStatus) throws IOException, TimeoutExcepti return backup; } - Instant deadline = Instant.now().plusMillis(wait.timeout()); + final Instant deadline = Instant.now().plusMillis(wait.timeout()); Backup latest = backup; while (!Thread.interrupted()) { if (Instant.now().isAfter(deadline)) { @@ -62,6 +65,37 @@ Backup waitForStatus(BackupStatus wantStatus) throws IOException, TimeoutExcepti return latest; } + CompletableFuture waitForStatusAsync( + final BackupStatus wantStatus, + Supplier>> poll) { + if (backup.status() == wantStatus) { + return CompletableFuture.completedFuture(backup); + } + final Instant deadline = Instant.now().plusMillis(wait.timeout()); + return poll.get().thenCompose(latest -> _waitForStatusAsync(wantStatus, latest.orElseThrow(), poll, deadline)); + } + + CompletableFuture _waitForStatusAsync( + final BackupStatus wantStatus, + final Backup current, + Supplier>> poll, + final Instant deadline) { + + if (current.status() == wantStatus) { + return CompletableFuture.completedFuture(current); + } + + if (Instant.now().isAfter(deadline)) { + var e = new TimeoutException("timed out after %s, latest status %s".formatted( + Duration.ofMillis(wait.timeout()).toSeconds(), current.status())); + throw new CompletionException(e); + } + + return poll.get().thenComposeAsync( + latest -> _waitForStatusAsync(wantStatus, latest.orElseThrow(), poll, deadline), + CompletableFuture.delayedExecutor(wait.interval(), TimeUnit.MILLISECONDS)); + } + private boolean isComplete(final Backup backup) { return backup.status() == BackupStatus.SUCCESS || backup.status() == BackupStatus.FAILED diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClientAsync.java new file mode 100644 index 000000000..02e372810 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/backup/WeaviateBackupClientAsync.java @@ -0,0 +1,128 @@ +package io.weaviate.client6.v1.api.backup; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import io.weaviate.client6.v1.internal.ObjectBuilder; +import io.weaviate.client6.v1.internal.rest.RestTransport; + +public class WeaviateBackupClientAsync { + private final RestTransport restTransport; + + public WeaviateBackupClientAsync(RestTransport restTransport) { + this.restTransport = restTransport; + } + + /** + * Start a new backup process. + * + * @param backupId Backup ID. Must be unique for the backend. + * @param backend Backup storage backend. + */ + public CompletableFuture create(String backupId, String backend) { + return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId), backend)); + } + + /** + * Start a new backup process. + * + * @param backupId Backup ID. Must be unique for the backend. + * @param backend Backup storage backend. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture create(String backupId, String backend, + Function> fn) { + return create(new CreateBackupRequest(CreateBackupRequest.BackupCreate.of(backupId, fn), backend)); + } + + /** + * Start a new backup process. + * + * @param request Create backup request. + */ + public CompletableFuture create(CreateBackupRequest request) { + return this.restTransport.performRequestAsync(request, CreateBackupRequest._ENDPOINT) + .thenApply(bak -> bak.withOperation(Backup.Operation.CREATE)); + } + + /** + * Get backup create status. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + */ + public CompletableFuture> getCreateStatus(String backupId, String backend) { + return this.restTransport.performRequestAsync( + new GetCreateStatusRequest(backupId, backend), GetCreateStatusRequest._ENDPOINT) + .thenApply(bak -> bak.map(_bak -> _bak.withOperation(Backup.Operation.CREATE))); + } + + /** + * Start backup restore process. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + */ + public CompletableFuture restore(String backupId, String backend) { + return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of())); + } + + /** + * Start backup restore process. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + * @param fn Lambda expression for optional parameters. + */ + public CompletableFuture restore(String backupId, String backend, + Function> fn) { + return restore(new RestoreBackupRequest(backupId, backend, RestoreBackupRequest.BackupRestore.of(fn))); + } + + /** + * Start backup restore process. + * + * @param request Restore backup request. + */ + public CompletableFuture restore(RestoreBackupRequest request) { + return this.restTransport.performRequestAsync(request, RestoreBackupRequest._ENDPOINT) + .thenApply(bak -> bak.withOperation(Backup.Operation.RESTORE)); + } + + /** + * Get backup restore status. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + */ + public CompletableFuture> getRestoreStatus(String backupId, String backend) { + return this.restTransport + .performRequestAsync(new GetRestoreStatusRequest(backupId, backend), GetRestoreStatusRequest._ENDPOINT) + .thenApply(bak -> bak.map(_bak -> _bak.withOperation(Backup.Operation.RESTORE))); + } + + /** + * List backups in the backend storage. + * + * @param backend Backup storage backend. + */ + public CompletableFuture> list(String backend) { + return this.restTransport.performRequestAsync(new ListBackupsRequest(backend), ListBackupsRequest._ENDPOINT); + } + + /** + * Cancel in-progress backup. + * + *

+ * This method cannot be called cancel backup restore. + * + * @param backupId Backup ID. + * @param backend Backup storage backend. + */ + public CompletableFuture cancel(String backupId, String backend) { + return this.restTransport.performRequestAsync(new CancelBackupRequest(backupId, backend), + CancelBackupRequest._ENDPOINT); + } +} From 5b5f08b72834aaefd1d5a887a32198b65e6c916a Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 30 Sep 2025 14:24:58 +0200 Subject: [PATCH 09/11] chore: fix javadoc --- src/main/java/io/weaviate/client6/v1/api/backup/Backup.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java index 5cebd5e07..3f0a7a229 100644 --- a/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java +++ b/src/main/java/io/weaviate/client6/v1/api/backup/Backup.java @@ -145,11 +145,6 @@ public void cancel(WeaviateClient client) throws IOException { * @param client Weaviate client. Make sure {@link WeaviateClientAsync#close} * is NOT called before this method returns. * @throws IllegalStateException if {@link #operation} is not set (null). - * @throws TimeoutException in case the wait times out without reaching - * BackupStatus.SUCCESS. - * @throws IOException in case the request was not sent successfully - * due to a malformed request, a networking error - * or the server being unavailable. */ public CompletableFuture waitForCompletion(WeaviateClientAsync client) { return waitForStatus(client, BackupStatus.SUCCESS); From b6870a62034a0e47c4d75c2580f987f8957b3bb6 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 30 Sep 2025 14:46:22 +0200 Subject: [PATCH 10/11] docs: add section about backups to README --- README.md | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/README.md b/README.md index 1ba9b291d..89f596894 100644 --- a/README.md +++ b/README.md @@ -710,6 +710,61 @@ client.collections.update("Songs_Alias", "PopSongs"); client.collections.delete("Songs_Alias"); ``` +### Managing collection backups + +```java +// Start a backup: +Backup backup = client.backup.create( + "backup_1", "filesystem", + bak -> bak + .includeCollections("Songs", "Artists") + .compressionLevel(CompressionLevel.BEST_COMPRESSION) + .cpuPercentage(30) +); + +// By default, the client does not monitor the backup status. +// The above method returns as soon as the server acknowledges +// the request and starts to process it. +// +// Now you can poll backup status to know when it is succeedes (or fails). + +Backup status = client.backup.getCreateStatus(backup.id(), backup.backend()); +if (status.status() == BackupStatus.SUCCESSFUL) { + System.out.println("Yay!"); + System.exit(0); +} + +// Backups may take a write to complete. To block the current thread until +// the execution completes, call Backup::waitForCompletion(WeaviateClient). +// +// Notice that, while we use `backup` object we can also call it on the `status`, +// as both will have sufficient information to identify the backup operation. + +try { + Backup completed = backup.waitForCompletion(client); + assert completed.errors() == null : "completed with errors"; +} catch (TimeoutException e) { + System.out.exit(1); +} + +// List exists backups: +List allBackups = client.backup.list(); + +// Restore from the first backup: +var first = allBackups.getFirst(); +client.backup.restore(first.id(), first.backend()); + +// Similarly, wait until the restore is complete using Backup::waitForCompletion. +// It is possible to set a custom timeout and polling interval using a familiar Tucked Builder pattern: + +var restoring = client.backup.getRestoreStatus(first.id(), first.backend()); +var restored = restoring.waitForCompletion(client, wait -> wait + .timeout(Duration.ofMinutes(30)) + .interval(Duration.ofMinutes(5))); + +assert restored.errors() == null : "restored with errors"; +``` + ## Useful resources - [Documentation](https://weaviate.io/developers/weaviate/current/client-libraries/java.html). From 828a0010f4fdb4ddb4d951dcaad955f2daf6c2f3 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Tue, 30 Sep 2025 14:48:24 +0200 Subject: [PATCH 11/11] docs: add warning about concurrent backups --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 89f596894..b9f090350 100644 --- a/README.md +++ b/README.md @@ -712,6 +712,9 @@ client.collections.delete("Songs_Alias"); ### Managing collection backups +> [!CAUTION] +> Weaviate does not support concurrent backups. Await one backup's completion before starting another one. + ```java // Start a backup: Backup backup = client.backup.create(