From c3433c4dfe76ec919c06fb36e2c8218381f32c1e Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Wed, 2 Jul 2025 15:55:00 +0200 Subject: [PATCH 1/4] feat: fetch shard status --- .../collections/config/GetShardsRequest.java | 24 +++++++++++++++++++ .../v1/api/collections/config/Shard.java | 4 ++++ .../config/WeaviateConfigClient.java | 5 ++++ .../config/WeaviateConfigClientAsync.java | 6 +++++ 4 files changed, 39 insertions(+) create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/config/GetShardsRequest.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/config/Shard.java diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/GetShardsRequest.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/GetShardsRequest.java new file mode 100644 index 000000000..aa5638ff2 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/GetShardsRequest.java @@ -0,0 +1,24 @@ +package io.weaviate.client6.v1.api.collections.config; + +import java.util.Collections; +import java.util.List; + +import org.apache.hc.core5.http.HttpStatus; + +import com.google.gson.reflect.TypeToken; + +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; + +public record GetShardsRequest(String collectionName) { + + @SuppressWarnings("unchecked") + public static final Endpoint> _ENDPOINT = Endpoint.of( + request -> "GET", + request -> "/schema/" + request.collectionName + "/shards", // TODO: tenant support + (gson, request) -> null, + request -> Collections.emptyMap(), + code -> code != HttpStatus.SC_SUCCESS, + (gson, response) -> (List) JSON.deserialize(response, TypeToken.getParameterized( + List.class, Shard.class))); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/Shard.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/Shard.java new file mode 100644 index 000000000..a039692b2 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/Shard.java @@ -0,0 +1,4 @@ +package io.weaviate.client6.v1.api.collections.config; + +public record Shard(String name, String status, long vectorQueueSize) { +} diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java index 9979bfa3a..d1e6ebe4f 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java @@ -1,6 +1,7 @@ package io.weaviate.client6.v1.api.collections.config; import java.io.IOException; +import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -45,4 +46,8 @@ public void update(String collectionName, this.restTransport.performRequest(UpdateCollectionRequest.of(thisCollection, fn), UpdateCollectionRequest._ENDPOINT); } + + public List getShards() throws IOException { + return this.restTransport.performRequest(new GetShardsRequest(collection.name()), GetShardsRequest._ENDPOINT); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java index 001f7e4ca..d835bb37d 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java @@ -1,6 +1,7 @@ package io.weaviate.client6.v1.api.collections.config; import java.io.IOException; +import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -48,4 +49,9 @@ public CompletableFuture update(String collectionName, UpdateCollectionRequest._ENDPOINT); }); } + + public CompletableFuture> getShards() { + return this.restTransport.performRequestAsync(new GetShardsRequest(collectionDescriptor.name()), + GetShardsRequest._ENDPOINT); + } } From 850e46f977ecdb25afb143360b94ba73aba3f524 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Wed, 2 Jul 2025 16:16:31 +0200 Subject: [PATCH 2/4] feat: add methods to update shards status --- .../api/collections/config/ShardStatus.java | 10 ++++++++++ .../config/UpdateShardStatusRequest.java | 19 +++++++++++++++++++ .../config/WeaviateConfigClient.java | 14 ++++++++++++++ .../config/WeaviateConfigClientAsync.java | 14 ++++++++++++++ 4 files changed, 57 insertions(+) create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/config/ShardStatus.java create mode 100644 src/main/java/io/weaviate/client6/v1/api/collections/config/UpdateShardStatusRequest.java diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/ShardStatus.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/ShardStatus.java new file mode 100644 index 000000000..bc5bb06d2 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/ShardStatus.java @@ -0,0 +1,10 @@ +package io.weaviate.client6.v1.api.collections.config; + +import com.google.gson.annotations.SerializedName; + +public enum ShardStatus { + @SerializedName("READY") + READY, + @SerializedName("READONLY") + READONLY; +} diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/UpdateShardStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/UpdateShardStatusRequest.java new file mode 100644 index 000000000..66c9e5444 --- /dev/null +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/UpdateShardStatusRequest.java @@ -0,0 +1,19 @@ +package io.weaviate.client6.v1.api.collections.config; + +import java.util.Collections; +import java.util.Map; + +import org.apache.hc.core5.http.HttpStatus; + +import io.weaviate.client6.v1.internal.json.JSON; +import io.weaviate.client6.v1.internal.rest.Endpoint; + +public record UpdateShardStatusRequest(String collection, String shard, ShardStatus status) { + public static final Endpoint _ENDPOINT = Endpoint.of( + request -> "PUT", + request -> "/schema/" + request.collection + "/shards" + request.shard, + (gson, request) -> JSON.serialize(Map.of("status", request.status)), + request -> Collections.emptyMap(), + code -> code != HttpStatus.SC_SUCCESS, + (gson, response) -> null); +} diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java index d1e6ebe4f..b2a0fb493 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClient.java @@ -1,6 +1,7 @@ package io.weaviate.client6.v1.api.collections.config; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -50,4 +51,17 @@ public void update(String collectionName, public List getShards() throws IOException { return this.restTransport.performRequest(new GetShardsRequest(collection.name()), GetShardsRequest._ENDPOINT); } + + public List updateShards(ShardStatus status, String... shards) throws IOException { + return updateShards(status, Arrays.asList(shards)); + } + + public List updateShards(ShardStatus status, List shards) throws IOException { + for (var shard : shards) { + this.restTransport.performRequest( + new UpdateShardStatusRequest(collection.name(), shard, status), + UpdateShardStatusRequest._ENDPOINT); + } + return getShards(); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java index d835bb37d..a418a47aa 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/WeaviateConfigClientAsync.java @@ -1,6 +1,7 @@ package io.weaviate.client6.v1.api.collections.config; import java.io.IOException; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -54,4 +55,17 @@ public CompletableFuture> getShards() { return this.restTransport.performRequestAsync(new GetShardsRequest(collectionDescriptor.name()), GetShardsRequest._ENDPOINT); } + + public CompletableFuture> updateShards(ShardStatus status, String... shards) throws IOException { + return updateShards(status, Arrays.asList(shards)); + } + + public CompletableFuture> updateShards(ShardStatus status, List shards) throws IOException { + var updates = shards.stream().map( + shard -> this.restTransport.performRequestAsync( + new UpdateShardStatusRequest(collectionDescriptor.name(), shard, status), + UpdateShardStatusRequest._ENDPOINT)) + .toArray(CompletableFuture[]::new); + return CompletableFuture.allOf(updates).thenCompose(__ -> getShards()); + } } From 257404728027a333358e6498b92936af851e91f9 Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Wed, 2 Jul 2025 17:07:39 +0200 Subject: [PATCH 3/4] chore: add @SerializedName annotations to Shard properties --- .../weaviate/client6/v1/api/collections/config/Shard.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/Shard.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/Shard.java index a039692b2..f0797668f 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/config/Shard.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/Shard.java @@ -1,4 +1,9 @@ package io.weaviate.client6.v1.api.collections.config; -public record Shard(String name, String status, long vectorQueueSize) { +import com.google.gson.annotations.SerializedName; + +public record Shard( + @SerializedName("name") String name, + @SerializedName("status") String status, + @SerializedName("vectorQueueSize") long vectorQueueSize) { } From fa8015f7805f06f77c3475a8eed4c4dc1485ed1e Mon Sep 17 00:00:00 2001 From: dyma solovei Date: Fri, 4 Jul 2025 13:39:59 +0200 Subject: [PATCH 4/4] fix: add missing / before {shard_name} path param Include integration test to test the basic functionality --- .../integration/CollectionsITest.java | 25 +++++++++++++++++++ .../config/UpdateShardStatusRequest.java | 2 +- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/src/it/java/io/weaviate/integration/CollectionsITest.java b/src/it/java/io/weaviate/integration/CollectionsITest.java index 06c70b1e5..dcde8a399 100644 --- a/src/it/java/io/weaviate/integration/CollectionsITest.java +++ b/src/it/java/io/weaviate/integration/CollectionsITest.java @@ -13,6 +13,8 @@ import io.weaviate.client6.v1.api.collections.Property; import io.weaviate.client6.v1.api.collections.Replication; import io.weaviate.client6.v1.api.collections.VectorIndex; +import io.weaviate.client6.v1.api.collections.config.Shard; +import io.weaviate.client6.v1.api.collections.config.ShardStatus; import io.weaviate.client6.v1.api.collections.vectorindex.Hnsw; import io.weaviate.client6.v1.api.collections.vectorizers.NoneVectorizer; import io.weaviate.containers.Container; @@ -159,4 +161,27 @@ public void testUpdateCollection() throws IOException { .extracting(CollectionConfig::replication).returns(false, Replication::asyncEnabled); }); } + + @Test + public void testShards() throws IOException { + var nsShatteredCups = ns("ShatteredCups"); + client.collections.create(nsShatteredCups); + var cups = client.collections.use(nsShatteredCups); + + // Act: get initial shard state + var shards = cups.config.getShards(); + + Assertions.assertThat(shards).as("single-tenant collections has 1 shard").hasSize(1); + var singleShard = shards.get(0); + + // Act: flip the status + var wantStatus = singleShard.status().equals("READY") ? ShardStatus.READONLY : ShardStatus.READY; + var updated = cups.config.updateShards(wantStatus, singleShard.name()); + + Assertions.assertThat(updated) + .as("shard status changed") + .hasSize(1) + .extracting(Shard::status) + .containsOnly(wantStatus.name()); + } } diff --git a/src/main/java/io/weaviate/client6/v1/api/collections/config/UpdateShardStatusRequest.java b/src/main/java/io/weaviate/client6/v1/api/collections/config/UpdateShardStatusRequest.java index 66c9e5444..95431d273 100644 --- a/src/main/java/io/weaviate/client6/v1/api/collections/config/UpdateShardStatusRequest.java +++ b/src/main/java/io/weaviate/client6/v1/api/collections/config/UpdateShardStatusRequest.java @@ -11,7 +11,7 @@ public record UpdateShardStatusRequest(String collection, String shard, ShardStatus status) { public static final Endpoint _ENDPOINT = Endpoint.of( request -> "PUT", - request -> "/schema/" + request.collection + "/shards" + request.shard, + request -> "/schema/" + request.collection + "/shards/" + request.shard, (gson, request) -> JSON.serialize(Map.of("status", request.status)), request -> Collections.emptyMap(), code -> code != HttpStatus.SC_SUCCESS,