Skip to content

s3vectors support as sink#357

Merged
adiom-mark merged 1 commit intomainfrom
s3vector
Dec 13, 2025
Merged

s3vectors support as sink#357
adiom-mark merged 1 commit intomainfrom
s3vector

Conversation

@adiom-mark
Copy link
Collaborator

@adiom-mark adiom-mark commented Dec 11, 2025

Summary by CodeRabbit

  • New Features
    • Added an AWS S3 vector connector ("s3vectors") with CLI flags for bucket, vector-key (default "data"), rate limiting, batch size and max-parallelism; supports batched, parallel writes and updates.
  • Utilities
    • Added de-duplication utility to retain only the latest update per composite data ID.
  • Tests
    • Added unit tests covering the de-duplication utility, including edge and nil cases.
  • Chores
    • Updated dependencies to support S3 vector and vector-data handling.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Dec 11, 2025

Walkthrough

Adds a Go S3Vectors connector with NewConn(bucketName, vectorKey, maxParallelism, batchSize, limiter) that implements write/update batching, parallel Put/Delete calls, id/vector parsing, registers CLI flags, updates dependencies, and adds KeepLastUpdate deduplication with tests.

Changes

Cohort / File(s) Summary
S3 Vector Connector Implementation
connectors/s3vector/conn.go
New connector type conn and public constructor NewConn(bucketName, vectorKey, maxParallelism, batchSize, limiter *rate.Limiter). Implements ConnectorServiceHandler methods: GetInfo, WriteData, WriteUpdates, StreamLSN/StreamUpdates (no-op), and panicking stubs for GeneratePlan, GetNamespaceMetadata, ListData. Adds dataToPIV and idToKey for parsing multiple vector formats and id extraction; batches operations by batchSize and parallelizes using maxParallelism + limiter; calls S3Vectors PutVectors/DeleteVectors and propagates errors.
CLI Connector Registration
internal/app/options/connectorflags.go
Registers connector names "s3vector"/"s3vectors", imports connectors/s3vector, adds CLI flags (--bucket, --vector-key, --max-parallelism, --rate-limit, --rate-limit-burst, --batch-size) and creates connectors via s3vector.NewConn(...).
Dependency Management
go.mod
Bumps AWS SDK modules and adds service/s3vectors, credentials, smithy-go; adds pgvector-go and decimal-related libs; updates several indirect/internal module versions.
Dedup Utility and Tests
connectors/util/util.go, connectors/util/util_test.go
Adds KeepLastUpdate(updates []*adiomv1.Update) []*adiomv1.Update and helper addToIdIndexMap to deduplicate updates by composite ID, keeping only the most recent per ID. Adds TestKeepLastUpdate covering nil, empty, single, multiple, and mixed insert/delete cases.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Client
participant Connector as S3Vectors Connector
participant Limiter as Rate Limiter
participant S3 as S3Vectors API

Client->>Connector: Call WriteData/WriteUpdates(items)
Connector->>Limiter: acquire token (rate limit)
alt batch items
Connector->>Connector: build batches (batchSize)
loop parallel workers (maxParallelism)
Connector->>S3: PutVectors / DeleteVectors (batch)
S3-->>Connector: response / error
end
end
Connector-->>Client: return aggregated result/error

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Inspect dataToPIV parsing for supported input variants and numeric precision handling.
  • Verify idToKey behavior for composite IDs and error cases.
  • Validate batching, limiter usage, parallelism controls, goroutine safety, and error aggregation in WriteData/WriteUpdates.
  • Review NewConn AWS config/client initialization and CLI flag wiring.

Poem

🐇 I hopped to S3 with vectors in paw,
I parse floats and strings, and batch what I saw.
I pace with a limiter, and parallelize cheer,
I put and delete, keeping data near.
A rabbit's hop — your vectors are here!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.57% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 's3vectors support as sink' directly and specifically describes the main change: adding S3Vector as a sink connector with full data handling capabilities.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch s3vector

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
internal/app/options/connectorflags.go (1)

348-368: Make bucket required and tighten CLI UX for s3vectors

The connector wiring looks correct, but two things are worth tightening:

  • bucket is not marked as required, so a user can invoke s3vectors without a bucket and only see a late AWS error when calling S3 Vectors.
  • UsageText is a bit vague compared to other connectors.

Consider:

- &cli.StringFlag{
-     Name:  "bucket",
-     Usage: "the s3 vector bucket to use",
- },
+ &cli.StringFlag{
+     Name:     "bucket",
+     Usage:    "The S3 Vector bucket name to use",
+     Required: true,
+ },

And optionally updating usage to something like: "s3vectors --bucket <bucket-name> [--vector-key <field>]" for clarity.

connectors/s3vector/conn.go (4)

55-63: Return explicit Unimplemented errors for streaming methods

StreamLSN and StreamUpdates currently return nil, which effectively says “success, but do nothing”. If this connector is ever misused as a source, that could lead to confusing behavior (no data, no error).

It would be clearer to fail fast with CodeUnimplemented, similar to:

-func (c *conn) StreamUpdates(ctx context.Context, _ *connect.Request[adiomv1.StreamUpdatesRequest], _ *connect.ServerStream[adiomv1.StreamUpdatesResponse]) error {
-    return nil
-}
+func (c *conn) StreamUpdates(ctx context.Context, _ *connect.Request[adiomv1.StreamUpdatesRequest], _ *connect.ServerStream[adiomv1.StreamUpdatesResponse]) error {
+    return connect.NewError(connect.CodeUnimplemented, fmt.Errorf("StreamUpdates not supported for s3vector sink"))
+}

Same comment applies to StreamLSN.


65-85: WriteData path is straightforward; consider guarding empty batches

The WriteData implementation—building PutInputVectors via dataToPIV and issuing a single PutVectors call per namespace—looks good, and the error wrapping with connect.CodeInternal is consistent.

You may optionally skip the service call when there is no data:

- if _, err := c.client.PutVectors(ctx, &s3vectors.PutVectorsInput{
-     Vectors:          vectors,
-     IndexName:        indexName,
-     VectorBucketName: c.bucketName,
- }); err != nil {
+ if len(vectors) > 0 {
+     if _, err := c.client.PutVectors(ctx, &s3vectors.PutVectorsInput{
+         Vectors:          vectors,
+         IndexName:        indexName,
+         VectorBucketName: c.bucketName,
+     }); err != nil {
          return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err putting vectors: %w", err))
- }
+     }
+ }

Not critical, but can avoid a no-op round-trip on empty batches.


179-216: WriteUpdates logic is good; guard empty slices and consider unknown update types

The separation of deletes vs inserts/updates, plus batching into one DeleteVectors and one PutVectors call, looks solid.

Two minor refinements:

  • As with WriteData, skip service calls when the corresponding slice is empty to avoid no-op requests.
  • Currently, any future/unknown UpdateType is silently ignored; if that’s not desired, you could add an else branch to error on unexpected types.

Example for empty-slice guard:

 if len(toDelete) > 0 {
     if _, err := c.client.DeleteVectors(...); err != nil { ... }
 }
- if _, err := c.client.PutVectors(ctx, &s3vectors.PutVectorsInput{
-     Vectors:          vectors,
-     IndexName:        indexName,
-     VectorBucketName: c.bucketName,
- }); err != nil {
-     return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err putting vectors: %w", err))
- }
+ if len(vectors) > 0 {
+     if _, err := c.client.PutVectors(ctx, &s3vectors.PutVectorsInput{
+         Vectors:          vectors,
+         IndexName:        indexName,
+         VectorBucketName: c.bucketName,
+     }); err != nil {
+         return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err putting vectors: %w", err))
+     }
+ }

27-30: Return explicit error instead of panic("unimplemented") in sink-only handler methods

GeneratePlan, GetNamespaceMetadata, and ListData currently panic. While these methods are protected by capability checks in all current callers (test suite and orchestration layer verify GetSource() != nil before invocation), returning an explicit unimplemented error is better defensive practice:

-func (c *conn) GeneratePlan(ctx context.Context, _ *connect.Request[adiomv1.GeneratePlanRequest]) (*connect.Response[adiomv1.GeneratePlanResponse], error) {
-    panic("unimplemented")
-}
+func (c *conn) GeneratePlan(ctx context.Context, _ *connect.Request[adiomv1.GeneratePlanRequest]) (*connect.Response[adiomv1.GeneratePlanResponse], error) {
+    return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("GeneratePlan not supported for s3vector sink"))
+}

Apply similarly to GetNamespaceMetadata and ListData.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4b107fb and 4d627ab.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (3)
  • connectors/s3vector/conn.go (1 hunks)
  • go.mod (2 hunks)
  • internal/app/options/connectorflags.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
connectors/s3vector/conn.go (2)
gen/adiom/v1/messages.pb.go (4)
  • DataType_DATA_TYPE_MONGO_BSON (28-28)
  • UpdateType_UPDATE_TYPE_DELETE (79-79)
  • UpdateType_UPDATE_TYPE_INSERT (77-77)
  • UpdateType_UPDATE_TYPE_UPDATE (78-78)
gen/adiom/v1/adiomv1connect/adiom.connect.go (1)
  • ConnectorServiceHandler (214-225)
internal/app/options/connectorflags.go (1)
connectors/s3vector/conn.go (1)
  • NewConn (218-225)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (7)
internal/app/options/connectorflags.go (1)

20-20: New s3vector import wiring looks consistent

Importing github.com/adiom-data/dsync/connectors/s3vector is consistent with the new connector block below and the NewConn signature; no concerns here.

go.mod (2)

10-31: AWS SDK and pgvector-related dependency bumps look coherent

The direct requirements for aws-sdk-go-v2, config, credentials, s3, s3vectors, and smithy-go are on compatible 1.x minor versions, and the pgvector/decimal-related additions match the new S3 vector sink implementation. This alignment should avoid mixed-version issues within the AWS SDK or pgx/decimal stack.

Please double-check against your build/deploy environment that these specific versions are acceptable and don’t conflict with any pinned versions in downstream tooling (CI images, Lambda runtimes, etc.).


51-52: Indirect AWS internal modules updated to matching patch levels

Bumping internal/configsources and internal/endpoints/v2 to newer patch versions aligns them with the updated AWS SDK core. This should be safe and helps avoid subtle version skew bugs.

If you haven’t already, run go mod tidy and a full test suite to ensure there are no unexpected transitive changes from these internal module updates.

connectors/s3vector/conn.go (4)

21-25: GetInfo implementation is reasonable but assumes non-nil bucketName

GetInfo returning the bucket name as Id, fixed DbType "s3vector", and a sink capability for DATA_TYPE_MONGO_BSON makes sense for a sink-only connector. Just be aware that Id: *c.bucketName will panic if bucketName is ever nil; this is currently prevented by always constructing via NewConn, so the pattern is fine as long as conn is never instantiated manually.

Also applies to: 32-43


87-106: idToKey: behavior is acceptable but error message could be clearer

The logic to require a single _id and stringify any non-string type (including BSON ObjectIDs and numerics) is reasonable for key generation. Currently, any len(id) != 1 returns "unsupported id type", which conflates cardinality with type.

If you want clearer diagnostics, you might adjust the error message to reflect multiple/zero IDs, but functionally this is fine.


218-225: NewConn is straightforward; consider exposing config hooks later

NewConn’s use of config.LoadDefaultConfig(context.Background()) followed by s3vectors.NewFromConfig and wiring bucketName/vectorKey into conn is idiomatic for an AWS SDK v2 client. No immediate issues.

If, in the future, you need custom timeouts, endpoints, or profiles beyond what environment/shared-config provide, you may want to thread a context.Context and/or additional options in instead of hard-wiring context.Background().

Ensure this default AWS config loading behavior matches how you expect to provide credentials/region in your deployment environments (env vars, shared config profiles, IAM roles, etc.).


227-227: Interface assertion is useful

The compile-time assertion var _ adiomv1connect.ConnectorServiceHandler = &conn{} is a good safeguard to keep the connector in sync with the service interface.

Comment on lines +108 to +177
func dataToPIV(vectorKey string, data []byte) (types.PutInputVector, error) {
var m map[string]any
if err := bson.Unmarshal(data, &m); err != nil {
return types.PutInputVector{}, fmt.Errorf("err unmarshalling data: %w", err)
}
var id string
var idAny any
rawID := bson.Raw(data).Lookup("_id")
if err := bson.UnmarshalValue(rawID.Type, rawID.Value, &idAny); err != nil {
return types.PutInputVector{}, fmt.Errorf("err unmarshalling data _id: %w", err)
}
switch t := idAny.(type) {
case string:
id = t
default:
id = fmt.Sprintf("%v", t)
}

v := m[vectorKey]
delete(m, vectorKey)
m["_id"] = id

vectorData := &types.VectorDataMemberFloat32{}
switch t := v.(type) {
case string:
// pgvector encoded string
var vector pgvector.Vector
if err := vector.Parse(t); err != nil {
return types.PutInputVector{}, fmt.Errorf("err parsing string vector: %w", err)
}
vectorData.Value = vector.Slice()
case []float64:
for _, f := range t {
vectorData.Value = append(vectorData.Value, float32(f))
}
case []float32:
for _, f := range t {
vectorData.Value = append(vectorData.Value, f)
}
case []any:
for _, f := range t {
switch t2 := f.(type) {
case string:
fv, err := strconv.ParseFloat(t2, 32)
if err != nil {
return types.PutInputVector{}, fmt.Errorf("err converting vector value str to float32: %w", err)
}
vectorData.Value = append(vectorData.Value, float32(fv))
case float32:
vectorData.Value = append(vectorData.Value, t2)
case float64:
vectorData.Value = append(vectorData.Value, float32(t2))
case int:
vectorData.Value = append(vectorData.Value, float32(t2))
case int32:
vectorData.Value = append(vectorData.Value, float32(t2))
case int64:
vectorData.Value = append(vectorData.Value, float32(t2))
default:
return types.PutInputVector{}, fmt.Errorf("unsupported type for vector value: %T", t2)
}
}
}

return types.PutInputVector{
Data: vectorData,
Key: aws.String(id),
Metadata: document.NewLazyDocument(m),
}, nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the actual file to confirm the code
cat -n connectors/s3vector/conn.go | head -200

Repository: adiom-data/dsync

Length of output: 8363


🏁 Script executed:

# Search for S3 Vectors related files and imports
fd -e go . | xargs grep -l "S3.*Vector\|Vector.*S3" | head -10

Repository: adiom-data/dsync

Length of output: 42


🏁 Script executed:

# Look for tests related to dataToPIV
fd -e go . | xargs grep -l "dataToPIV" | head -10

Repository: adiom-data/dsync

Length of output: 87


🏁 Script executed:

# Search for vector field validation or handling elsewhere
rg "vectorKey|vector.*empty|zero.*vector" -t go -A 3 -B 3 | head -50

Repository: adiom-data/dsync

Length of output: 2832


🏁 Script executed:

# Check for S3 Vectors API types and behaviors
fd -e go . | xargs grep -l "types.PutInputVector\|VectorDataMember"

Repository: adiom-data/dsync

Length of output: 87


🏁 Script executed:

# Search for test files
fd -e go -path "*test*" connectors/s3vector/ 2>/dev/null

Repository: adiom-data/dsync

Length of output: 42


🏁 Script executed:

# Search for any validation or documentation comments about empty vectors
rg "empty.*vector|zero.*vector|vector.*validate" -t go -i

Repository: adiom-data/dsync

Length of output: 42


🏁 Script executed:

# Check for any integration tests or examples
fd -e go . | xargs grep -l "PutVectors\|s3vectors" | head -5

Repository: adiom-data/dsync

Length of output: 126


🌐 Web query:

AWS S3 Vectors API empty vector behavior validation

💡 Result:

Short answer — S3 Vectors enforces that vectors are non‑empty and valid: PutVectors requires 1–500 vectors per request; each vector must include a key and a vector data array of float32 values whose length exactly equals the index's dimension (dimension is 1–4096). If you send an empty vectors array, a vector with missing/incorrect data length, wrong type, or oversized metadata you’ll get a ValidationException (400) or other API error. Validate before calling: non‑empty vectors list, each vector.data length == index dimension, cast values to float32, and respect metadata size limits. Sources: AWS PutVectors docs, S3 Vectors limits, and Vector index docs. [1][2][3]

Sources
[1] PutVectors (AWS S3 Vectors API). (docs.aws.amazon.com)
[2] Limitations and restrictions (dimensions, batch sizes, metadata limits). (docs.aws.amazon.com)
[3] Vector indexes (dimension requirement — all vectors must match index dimension). (docs.aws.amazon.com)

Citations:


🌐 Web query:

AWS S3 Vectors PutVectors API zero-length vector requirements

💡 Result:

Short answer:

  • You cannot send a "zero-length" vector (i.e., an array with 0 elements) — the vector data must exactly match the index's configured dimension (1–4096). [1][3]
  • You also cannot send a zero vector (all elements = 0) when the index uses the cosine distance metric. [1][2]
  • Vector values must be valid float32 (no NaN or Infinity). [1][2]

Sources:
[1] AWS PutVectors API docs (vector dimension, float32, NaN/Inf, cosine zero-vector rule).
[2] AWS PutInputVector docs (zero vectors not allowed for cosine).
[3] S3 Vectors limitations (dimension range 1–4096).


🌐 Web query:

S3 Vectors API minimum vector dimension requirement

💡 Result:

Minimum vector dimension for the S3 Vectors API is 1 (allowed range: 1–4096). [1][2]

Sources:
[1] AWS S3 Vectors — Limitations and restrictions.
[2] AWS S3 Vectors — Vector indexes / Dimension requirements.


Add validation for missing or unsupported vectorKey values in dataToPIV

The function currently has two correctness issues:

  1. Missing vector field silently produces an empty vector
    If vectorKey is absent from the map, v := m[vectorKey] returns nil, the type switch falls through without matching any case, and vectorData.Value remains empty. The S3 Vectors API requires vector data length to exactly match the index dimension (minimum 1), so an empty vector will fail with a ValidationException at the API level—difficult to trace back to misconfiguration.

  2. No default case at the outer switch level
    Unsupported top-level types (e.g., []int) silently fall through. Only the inner []any branch has a default error case.

Add explicit validation before processing the vector:

- v := m[vectorKey]
+ v, ok := m[vectorKey]
+ if !ok {
+     return types.PutInputVector{}, fmt.Errorf("missing vector field %q", vectorKey)
+ }
  delete(m, vectorKey)
  m["_id"] = id

  vectorData := &types.VectorDataMemberFloat32{}
  switch t := v.(type) {
  ...
- case []any:
+ case []any:
      for _, f := range t {
          ...
      }
+ default:
+     return types.PutInputVector{}, fmt.Errorf("unsupported type for vector field %q: %T", vectorKey, v)
  }

This will fail fast with clear errors instead of producing invalid vectors that the API rejects.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func dataToPIV(vectorKey string, data []byte) (types.PutInputVector, error) {
var m map[string]any
if err := bson.Unmarshal(data, &m); err != nil {
return types.PutInputVector{}, fmt.Errorf("err unmarshalling data: %w", err)
}
var id string
var idAny any
rawID := bson.Raw(data).Lookup("_id")
if err := bson.UnmarshalValue(rawID.Type, rawID.Value, &idAny); err != nil {
return types.PutInputVector{}, fmt.Errorf("err unmarshalling data _id: %w", err)
}
switch t := idAny.(type) {
case string:
id = t
default:
id = fmt.Sprintf("%v", t)
}
v := m[vectorKey]
delete(m, vectorKey)
m["_id"] = id
vectorData := &types.VectorDataMemberFloat32{}
switch t := v.(type) {
case string:
// pgvector encoded string
var vector pgvector.Vector
if err := vector.Parse(t); err != nil {
return types.PutInputVector{}, fmt.Errorf("err parsing string vector: %w", err)
}
vectorData.Value = vector.Slice()
case []float64:
for _, f := range t {
vectorData.Value = append(vectorData.Value, float32(f))
}
case []float32:
for _, f := range t {
vectorData.Value = append(vectorData.Value, f)
}
case []any:
for _, f := range t {
switch t2 := f.(type) {
case string:
fv, err := strconv.ParseFloat(t2, 32)
if err != nil {
return types.PutInputVector{}, fmt.Errorf("err converting vector value str to float32: %w", err)
}
vectorData.Value = append(vectorData.Value, float32(fv))
case float32:
vectorData.Value = append(vectorData.Value, t2)
case float64:
vectorData.Value = append(vectorData.Value, float32(t2))
case int:
vectorData.Value = append(vectorData.Value, float32(t2))
case int32:
vectorData.Value = append(vectorData.Value, float32(t2))
case int64:
vectorData.Value = append(vectorData.Value, float32(t2))
default:
return types.PutInputVector{}, fmt.Errorf("unsupported type for vector value: %T", t2)
}
}
}
return types.PutInputVector{
Data: vectorData,
Key: aws.String(id),
Metadata: document.NewLazyDocument(m),
}, nil
}
func dataToPIV(vectorKey string, data []byte) (types.PutInputVector, error) {
var m map[string]any
if err := bson.Unmarshal(data, &m); err != nil {
return types.PutInputVector{}, fmt.Errorf("err unmarshalling data: %w", err)
}
var id string
var idAny any
rawID := bson.Raw(data).Lookup("_id")
if err := bson.UnmarshalValue(rawID.Type, rawID.Value, &idAny); err != nil {
return types.PutInputVector{}, fmt.Errorf("err unmarshalling data _id: %w", err)
}
switch t := idAny.(type) {
case string:
id = t
default:
id = fmt.Sprintf("%v", t)
}
v, ok := m[vectorKey]
if !ok {
return types.PutInputVector{}, fmt.Errorf("missing vector field %q", vectorKey)
}
delete(m, vectorKey)
m["_id"] = id
vectorData := &types.VectorDataMemberFloat32{}
switch t := v.(type) {
case string:
// pgvector encoded string
var vector pgvector.Vector
if err := vector.Parse(t); err != nil {
return types.PutInputVector{}, fmt.Errorf("err parsing string vector: %w", err)
}
vectorData.Value = vector.Slice()
case []float64:
for _, f := range t {
vectorData.Value = append(vectorData.Value, float32(f))
}
case []float32:
for _, f := range t {
vectorData.Value = append(vectorData.Value, f)
}
case []any:
for _, f := range t {
switch t2 := f.(type) {
case string:
fv, err := strconv.ParseFloat(t2, 32)
if err != nil {
return types.PutInputVector{}, fmt.Errorf("err converting vector value str to float32: %w", err)
}
vectorData.Value = append(vectorData.Value, float32(fv))
case float32:
vectorData.Value = append(vectorData.Value, t2)
case float64:
vectorData.Value = append(vectorData.Value, float32(t2))
case int:
vectorData.Value = append(vectorData.Value, float32(t2))
case int32:
vectorData.Value = append(vectorData.Value, float32(t2))
case int64:
vectorData.Value = append(vectorData.Value, float32(t2))
default:
return types.PutInputVector{}, fmt.Errorf("unsupported type for vector value: %T", t2)
}
}
default:
return types.PutInputVector{}, fmt.Errorf("unsupported type for vector field %q: %T", vectorKey, v)
}
return types.PutInputVector{
Data: vectorData,
Key: aws.String(id),
Metadata: document.NewLazyDocument(m),
}, nil
}
🤖 Prompt for AI Agents
In connectors/s3vector/conn.go around lines 108 to 177, the function dataToPIV
must validate the presence and type of the vector field: explicitly check
whether m contains vectorKey and return a clear error if missing or nil; ensure
after extracting v you verify that it is one of the supported top-level types
(string, []float64, []float32, []any) and return a descriptive error for any
unsupported type (e.g., []int); after building vectorData ensure its Value is
non-empty and return an error if the vector length is zero; add a default case
on the outer type switch to return the unsupported-type error so invalid inputs
fail fast with a clear message instead of producing an empty vector that
triggers API ValidationExceptions.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
connectors/s3vector/conn.go (2)

112-184: Good progress addressing previous validation concerns.

The previous critical issues have been resolved:

  • ✅ Vector field presence is now validated (lines 130-133)
  • ✅ Unsupported types now return clear errors (lines 175-177)

Consider adding post-conversion validation to catch empty vectors before the API call:

 		return types.PutInputVector{}, fmt.Errorf("unsupported type for vector values: %T", t)
 	}
+
+	if len(vectorData.Value) == 0 {
+		return types.PutInputVector{}, fmt.Errorf("vector field %q resulted in empty vector data", vectorKey)
+	}
 
 	return types.PutInputVector{

This provides a clearer error message than waiting for the S3 Vectors API ValidationException.


227-234: LGTM with optional parameter validation suggestion.

The AWS config initialization and client setup are correct. Using context.Background() for initialization is appropriate.

Consider adding parameter validation:

 func NewConn(bucketName string, vectorKey string) (*conn, error) {
+	if bucketName == "" {
+		return nil, fmt.Errorf("bucketName is required")
+	}
+	if vectorKey == "" {
+		return nil, fmt.Errorf("vectorKey is required")
+	}
 	awsConfig, err := config.LoadDefaultConfig(context.Background())

This provides clearer errors during connector setup rather than later during operations.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4d627ab and 3518243.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (3)
  • connectors/s3vector/conn.go (1 hunks)
  • go.mod (2 hunks)
  • internal/app/options/connectorflags.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
internal/app/options/connectorflags.go (1)
connectors/s3vector/conn.go (1)
  • NewConn (227-234)
connectors/s3vector/conn.go (2)
gen/adiom/v1/messages.pb.go (4)
  • DataType_DATA_TYPE_MONGO_BSON (28-28)
  • UpdateType_UPDATE_TYPE_DELETE (79-79)
  • UpdateType_UPDATE_TYPE_INSERT (77-77)
  • UpdateType_UPDATE_TYPE_UPDATE (78-78)
gen/adiom/v1/adiomv1connect/adiom.connect.go (1)
  • ConnectorServiceHandler (214-225)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (7)
connectors/s3vector/conn.go (4)

1-19: LGTM!

Package declaration and imports are appropriate for an S3 Vectors sink connector supporting BSON data with pgvector compatibility.


21-63: LGTM!

The struct definition and unimplemented source methods are appropriate for a sink-only connector. Source-specific methods correctly panic or no-op.


33-43: LGTM!

GetInfo correctly declares this as a sink-only connector supporting MONGO_BSON data type.


91-110: LGTM!

The ID-to-key conversion appropriately handles single-field IDs and clearly rejects composite IDs with an error message.

internal/app/options/connectorflags.go (2)

20-20: LGTM!

The s3vector package import is correctly placed with other connector imports.


348-369: LGTM!

The s3vectors connector registration is correctly implemented:

  • Recognizes both "s3vector" and "s3vectors" identifiers (case-insensitive)
  • Requires the bucket flag and provides a sensible default for vector-key
  • Properly calls s3vector.NewConn with the configured parameters

The implementation follows the established pattern used by other connectors in the registry.

go.mod (1)

10-10: No action required. AWS SDK v2 v1.41.0, credentials v1.17.70, service/s3vectors v1.6.1, smithy-go v1.24.0, and pgvector-go v0.3.0 have no published security advisories. While newer AWS SDK releases exist (latest: 2025-12-02), the current versions are secure and do not require immediate updates for security reasons.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
connectors/s3vector/conn.go (1)

224-224: Use optimal batch size for DeleteVectors.

The DeleteVectors API supports up to 1,000 keys per request, but the code chunks to 500. While not incorrect, this is suboptimal for performance.

Apply this diff to use the correct batch size:

-		for batch := range slices.Chunk(toDelete, 500) {
+		for batch := range slices.Chunk(toDelete, 1000) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3518243 and e8bc20f.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (3)
  • connectors/s3vector/conn.go (1 hunks)
  • go.mod (2 hunks)
  • internal/app/options/connectorflags.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/app/options/connectorflags.go
🧰 Additional context used
🧬 Code graph analysis (1)
connectors/s3vector/conn.go (3)
gen/adiom/v1/messages.pb.go (4)
  • DataType_DATA_TYPE_MONGO_BSON (28-28)
  • UpdateType_UPDATE_TYPE_DELETE (79-79)
  • UpdateType_UPDATE_TYPE_INSERT (77-77)
  • UpdateType_UPDATE_TYPE_UPDATE (78-78)
connectors/vector/conn.go (1)
  • Chunk (48-51)
gen/adiom/v1/adiomv1connect/adiom.connect.go (1)
  • ConnectorServiceHandler (214-225)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (2)
connectors/s3vector/conn.go (2)

70-107: Batch size handling has been properly implemented.

The automatic chunking to 500 vectors per batch correctly addresses the S3 Vectors API limit mentioned in past review comments. The parallel processing with errgroup is well-implemented.


130-196: Validation improvements have been properly implemented.

The explicit check for missing vectorKey (lines 148-151) and the default case for unsupported types (lines 187-189) correctly address the issues raised in previous review comments. The function now fails fast with clear error messages instead of producing invalid vectors.

connectrpc.com/connect v1.18.1
connectrpc.com/grpcreflect v1.3.0
github.com/aws/aws-sdk-go-v2 v1.40.0
github.com/aws/aws-sdk-go-v2 v1.41.0
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🌐 Web query:

AWS SDK Go v2 latest version December 2025

💡 Result:

The latest AWS SDK for Go (v2) release (repo) is the "release-2025-12-02" published on December 2, 2025. [1][2]

Sources:

  • aws/aws-sdk-go-v2 releases (Release 2025-12-02). [1]
  • aws/aws-sdk-go-v2 repository README (current). [2]

🌐 Web query:

github.com/aws/aws-sdk-go-v2 v1.41.0 security advisories

💡 Result:

Short answer: There are no published security advisories for the aws/aws-sdk-go-v2 project (so none specific to a v1.41.0 module release). [1][2]

Sources:

  • aws/aws-sdk-go-v2 GitHub Security page: “There aren’t any published security advisories.” [1]
  • aws/aws-sdk-go-v2 Releases (shows module/service module versions such as v1.41.0 in release notes). [2]

🌐 Web query:

aws-sdk-go-v2 service s3vectors v1.6.1 release

💡 Result:

I couldn't find a v1.6.1 release. The aws-sdk-go-v2 releases show github.com/aws/aws-sdk-go-v2/service/s3vectors at v1.6.0 (latest in the release notes), and the package docs reflect the s3vectors module (v1.6.0). [1][2][3]

Sources:

  • aws-sdk-go-v2 GitHub releases (module highlights showing s3vectors v1.6.0). [1]
  • aws/aws-sdk-go-v2 repository main page / releases. [2]
  • pkg.go.dev page for github.com/aws/aws-sdk-go-v2/service/s3vectors. [3]

🏁 Script executed:

cat -n go.mod | head -25

Repository: adiom-data/dsync

Length of output: 1153


s3vectors v1.6.1 does not exist—update to v1.6.0 and consider updating aws-sdk-go-v2 to the latest release. The s3vectors package only has a v1.6.0 release available; v1.6.1 will cause dependency resolution issues. Additionally, aws-sdk-go-v2 v1.41.0 is outdated relative to the latest release (2025-12-02).

🤖 Prompt for AI Agents
In go.mod around line 10, the dependency reference to s3vectors v1.6.1 is
invalid; change it to v1.6.0 and also bump aws-sdk-go-v2 from v1.41.0 to a
current release. Edit go.mod to replace github.com/aws/s3vectors v1.6.1 with
v1.6.0 and update github.com/aws/aws-sdk-go-v2 to the desired newer tag, then
run go get github.com/aws/s3vectors@v1.6.0 && go get
github.com/aws/aws-sdk-go-v2@latest and finish with go mod tidy to resolve and
clean dependencies.

@adiom-mark adiom-mark force-pushed the s3vector branch 2 times, most recently from 4954f1d to eee6e87 Compare December 12, 2025 23:25
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
go.mod (1)

17-17: Past review issue still unresolved: s3vectors v1.6.1 does not exist.

The dependency on github.com/aws/aws-sdk-go-v2/service/s3vectors v1.6.1 remains invalid. Based on previous review comments, only v1.6.0 is available, and this will cause dependency resolution failures.

connectors/s3vector/conn.go (1)

255-262: Critical: maxParallelism still not assigned in NewConn.

The maxParallelism parameter is accepted but never assigned to the conn struct (line 261), causing parallelism control to be bypassed. This critical issue was flagged in a previous review but remains unfixed.

Apply this diff:

-	return &conn{client: client, bucketName: aws.String(bucketName), vectorKey: vectorKey}, nil
+	return &conn{client: client, bucketName: aws.String(bucketName), vectorKey: vectorKey, maxParallelism: maxParallelism}, nil
🧹 Nitpick comments (1)
connectors/s3vector/conn.go (1)

224-224: Consider using the correct batch size for DeleteVectors.

The S3 Vectors API allows up to 1000 keys per DeleteVectors request, but line 224 batches at 500. While conservative batching is safe, using the full API limit would improve efficiency.

Apply this diff to use the optimal batch size:

-		for batch := range slices.Chunk(toDelete, 500) {
+		for batch := range slices.Chunk(toDelete, 1000) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e8bc20f and 4954f1d.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (5)
  • connectors/s3vector/conn.go (1 hunks)
  • connectors/util/util.go (1 hunks)
  • connectors/util/util_test.go (1 hunks)
  • go.mod (2 hunks)
  • internal/app/options/connectorflags.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
connectors/util/util_test.go (2)
gen/adiom/v1/messages.pb.go (2)
  • UpdateType_UPDATE_TYPE_INSERT (77-77)
  • UpdateType_UPDATE_TYPE_DELETE (79-79)
connectors/util/util.go (1)
  • KeepLastUpdate (47-60)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (6)
connectors/util/util.go (1)

11-60: LGTM! Efficient deduplication implementation.

The hash-based deduplication logic is well-structured and correctly handles hash collisions by performing exact byte-slice comparisons. The approach of maintaining indices and replacing updates in-place ensures only the latest update per unique data-id is kept.

connectors/util/util_test.go (1)

12-78: LGTM! Comprehensive test coverage.

The test cases cover edge cases (nil, empty, single element), same-ID deduplication, and complex scenarios with mixed insert/delete operations. The use of proto.Equal for protobuf comparison is appropriate.

connectors/s3vector/conn.go (3)

71-107: Batch size limit correctly handled with automatic chunking.

The implementation now uses slices.Chunk(vectors, 500) to automatically split large batches into 500-vector chunks, properly addressing the S3 Vectors API limit. The errgroup pattern with optional parallelism control is well-implemented.


130-196: Previous validation concerns addressed.

The function now explicitly checks for the presence of vectorKey (lines 148-151) and includes a default case for unsupported vector types (lines 187-189), resolving the major concerns from the previous review. The S3 Vectors API will validate vector length constraints.


201-201: Excellent use of deduplication before API calls.

Calling util.KeepLastUpdate ensures that only the latest update per ID is processed, avoiding redundant API operations and potential conflicts.

internal/app/options/connectorflags.go (1)

348-374: LGTM! Clean connector registration following established patterns.

The s3vectors connector registration is well-structured with appropriate flags (required bucket, configurable vector-key, optional max-parallelism) and correctly calls s3vector.NewConn with the extracted parameters.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
go.mod (1)

17-17: s3vectors v1.6.1 version issue was previously flagged.

A past review already noted that s3vectors v1.6.1 does not exist—only v1.6.0 is available. Please verify this has been addressed or update to v1.6.0.

#!/bin/bash
# Verify if s3vectors v1.6.1 exists by checking the go.sum or attempting to resolve
grep "s3vectors" go.sum | head -5
connectors/s3vector/conn.go (1)

155-189: Add validation for empty vector data.

After parsing the vector, there's no check that vectorData.Value is non-empty. The S3 Vectors API requires vectors with length matching the index dimension (minimum 1). An empty vector will cause an API ValidationException.

 	}
+
+	if len(vectorData.Value) == 0 {
+		return types.PutInputVector{}, fmt.Errorf("vector field %q resulted in empty vector data", vectorKey)
+	}

 	return types.PutInputVector{
 		Data:     vectorData,
🧹 Nitpick comments (5)
connectors/util/util.go (1)

16-45: Early exit opportunity in the inner comparison loop.

When a mismatch is found in the ID comparison (line 30-31), the loop continues iterating unnecessarily. Adding a break would improve performance for composite IDs.

 			if len(item.dataId) == len(id) {
 				for i, idPart := range item.dataId {
 					if !slices.Equal(idPart.GetData(), id[i].GetData()) {
 						match = false
+						break
 					}
 				}
 			} else {
connectors/util/util_test.go (2)

12-69: Add test case names for better failure diagnostics.

Table-driven tests benefit from descriptive names to quickly identify which case failed. Currently, failures report only the index, making debugging harder.

 	testData := []struct {
+		Name     string
 		Updates  []*adiomv1.Update
 		Expected []*adiomv1.Update
 	}{
 		{
+			Name:     "nil input",
 			Updates:  nil,
 			Expected: nil,
 		},
 		{
+			Name:     "empty slice",
 			Updates:  []*adiomv1.Update{},
 			Expected: []*adiomv1.Update{},
 		},
 		// ... add Name to other cases

Then use t.Run(testCase.Name, ...) in the loop for clearer test output.


70-77: Consider adding a composite ID test case.

The current tests only cover single-element IDs. Since KeepLastUpdate supports composite IDs (multi-part []*adiomv1.BsonValue), consider adding a test case with multi-part IDs to verify that behavior.

connectors/s3vector/conn.go (2)

32-68: Consider returning explicit errors instead of panics for unimplemented methods.

Panics can crash the application unexpectedly. Since this is a sink-only connector, returning connect.NewError(connect.CodeUnimplemented, ...) would be safer and more consistent with gRPC conventions.

 // GeneratePlan implements adiomv1connect.ConnectorServiceHandler.
 func (c *conn) GeneratePlan(context.Context, *connect.Request[adiomv1.GeneratePlanRequest]) (*connect.Response[adiomv1.GeneratePlanResponse], error) {
-	panic("unimplemented")
+	return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("GeneratePlan not supported for s3vector sink"))
 }

Apply similar changes to GetNamespaceMetadata and ListData.


219-247: Unnecessary outer condition check.

The if len(updates) > 0 block on line 219 is redundant since both inner loops already handle empty slices correctly (they simply won't execute any iterations). However, it does avoid creating an errgroup when not needed.

Additionally, toDelete batching uses 500 but the S3 Vectors DeleteVectors API allows up to 1000 keys per request.

-		for batch := range slices.Chunk(toDelete, 500) {
+		for batch := range slices.Chunk(toDelete, 1000) {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4954f1d and eee6e87.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (5)
  • connectors/s3vector/conn.go (1 hunks)
  • connectors/util/util.go (1 hunks)
  • connectors/util/util_test.go (1 hunks)
  • go.mod (2 hunks)
  • internal/app/options/connectorflags.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/app/options/connectorflags.go
🧰 Additional context used
🧬 Code graph analysis (2)
connectors/util/util_test.go (2)
gen/adiom/v1/messages.pb.go (2)
  • UpdateType_UPDATE_TYPE_INSERT (77-77)
  • UpdateType_UPDATE_TYPE_DELETE (79-79)
connectors/util/util.go (1)
  • KeepLastUpdate (47-60)
connectors/s3vector/conn.go (4)
gen/adiom/v1/messages.pb.go (4)
  • DataType_DATA_TYPE_MONGO_BSON (28-28)
  • UpdateType_UPDATE_TYPE_DELETE (79-79)
  • UpdateType_UPDATE_TYPE_INSERT (77-77)
  • UpdateType_UPDATE_TYPE_UPDATE (78-78)
connectors/vector/conn.go (1)
  • Chunk (48-51)
connectors/util/util.go (1)
  • KeepLastUpdate (47-60)
gen/adiom/v1/adiomv1connect/adiom.connect.go (1)
  • ConnectorServiceHandler (214-225)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (8)
connectors/util/util.go (2)

11-14: LGTM!

The dataIdIndex struct is a clean, minimal internal structure for tracking data IDs and their positions in the result slice.


47-60: LGTM!

The KeepLastUpdate function correctly deduplicates updates by composite ID, keeping only the latest occurrence for each unique ID. The implementation correctly handles nil/empty inputs by returning nil/empty slices naturally.

connectors/s3vector/conn.go (6)

25-30: LGTM!

The struct now correctly includes all necessary fields including maxParallelism, which was noted as missing in a past review.


70-107: LGTM on batching implementation.

The batching with slices.Chunk(vectors, 500) and parallel execution with errgroup correctly addresses the S3 Vectors API limit of 500 vectors per request.


109-128: LGTM!

The idToKey function correctly handles single-element IDs with proper error handling for unsupported multi-element IDs.


148-151: Previous validation concern addressed.

The code now correctly checks for the presence of vectorKey and returns an explicit error if not found, addressing the past review comment about silent failures.


255-262: LGTM!

The NewConn function now correctly assigns maxParallelism to the struct, addressing the past review comment.


264-264: Good compile-time interface assertion.

This ensures the connector correctly implements the full handler interface at compile time.

Comment on lines +86 to +89
eg, ctx := errgroup.WithContext(ctx)
if c.maxParallelism > 0 {
eg.SetLimit(c.maxParallelism)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unlimited parallelism when maxParallelism is 0.

When maxParallelism is 0, SetLimit is not called, resulting in unbounded parallelism. This could overwhelm the S3 Vectors API with large datasets. Consider setting a reasonable default limit.

 	eg, ctx := errgroup.WithContext(ctx)
-	if c.maxParallelism > 0 {
-		eg.SetLimit(c.maxParallelism)
-	}
+	limit := c.maxParallelism
+	if limit <= 0 {
+		limit = 10 // reasonable default to avoid overwhelming the API
+	}
+	eg.SetLimit(limit)
🤖 Prompt for AI Agents
In connectors/s3vector/conn.go around lines 86 to 89, the code skips eg.SetLimit
when c.maxParallelism == 0 which yields unbounded parallelism; change the logic
to always call eg.SetLimit by computing a limit = c.maxParallelism if > 0 else a
sane default (e.g., 10 or a configured constant), and pass that limit to
eg.SetLimit so parallelism is bounded even when maxParallelism is zero or
negative.

Comment on lines +220 to +223
eg, ctx := errgroup.WithContext(ctx)
if c.maxParallelism > 0 {
eg.SetLimit(c.maxParallelism)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same unbounded parallelism concern as WriteData.

Consider applying a default parallelism limit here as well for consistency.

🤖 Prompt for AI Agents
In connectors/s3vector/conn.go around lines 220 to 223, the errgroup is left
unbounded when c.maxParallelism is zero or negative; add a default parallelism
limit (e.g. DefaultParallelism or a constant like 32) and call
eg.SetLimit(default) when c.maxParallelism <= 0 so the group always has a
bounded concurrency; prefer using a shared named constant for the default and
keep existing behavior when c.maxParallelism > 0.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (1)
connectors/s3vector/conn.go (1)

133-199: Validate non-empty vector output (S3 Vectors rejects empty vectors).
You now error on missing vectorKey and unsupported top-level types (good), but you still allow bson.A{} / empty parsed vectors to produce VectorDataMemberFloat32{Value:nil}.

 	switch t := v.(type) {
@@
 	default:
 		return types.PutInputVector{}, fmt.Errorf("unsupported type for vector values: %T", t)
 	}
+	if len(vectorData.Value) == 0 {
+		return types.PutInputVector{}, fmt.Errorf("vector field %q is empty", vectorKey)
+	}
🧹 Nitpick comments (1)
connectors/util/util.go (1)

16-46: Hash input should be delimited (and consider avoiding id slice aliasing).
Right now the hash is computed over raw concatenation of id-part bytes (Line 20-22). Different composite IDs can produce the same byte stream (e.g., ["ab","c"] vs ["a","bc"]), which won’t break correctness (you do an exact compare), but can create avoidable hot buckets and degrade performance. Also, dataIdIndex{update.GetId(), ...} stores the slice as-is (Line 43), which can be risky if any caller reuses/mutates the underlying Update/id objects.

Suggested tweak (delimiter + copy slice header):

 func addToIdIndexMap(m map[uint64][]*dataIdIndex, update *adiomv1.Update) (*dataIdIndex, bool) {
 	hasher := xxhash.New()
 	id := update.GetId()
 	for _, idPart := range id {
+		// delimiter to avoid ambiguous concatenation across parts
+		_, _ = hasher.Write([]byte{0})
 		_, _ = hasher.Write(idPart.GetData())
 	}
 	h := hasher.Sum64()
@@
-	item := &dataIdIndex{update.GetId(), -1}
+	// copy slice header to reduce aliasing surprises
+	idCopy := append([]*adiomv1.BsonValue(nil), id...)
+	item := &dataIdIndex{idCopy, -1}
 	m[h] = append(items, item)
 	return item, true
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eee6e87 and 20b238b.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (5)
  • connectors/s3vector/conn.go (1 hunks)
  • connectors/util/util.go (1 hunks)
  • connectors/util/util_test.go (1 hunks)
  • go.mod (2 hunks)
  • internal/app/options/connectorflags.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • connectors/util/util_test.go
  • go.mod
🧰 Additional context used
🧬 Code graph analysis (2)
internal/app/options/connectorflags.go (1)
connectors/s3vector/conn.go (1)
  • NewConn (262-276)
connectors/s3vector/conn.go (5)
connectors/dynamodb/stream/helper.go (1)
  • Limiter (23-25)
gen/adiom/v1/messages.pb.go (4)
  • DataType_DATA_TYPE_MONGO_BSON (28-28)
  • UpdateType_UPDATE_TYPE_DELETE (79-79)
  • UpdateType_UPDATE_TYPE_INSERT (77-77)
  • UpdateType_UPDATE_TYPE_UPDATE (78-78)
connectors/vector/conn.go (1)
  • Chunk (48-51)
connectors/util/util.go (1)
  • KeepLastUpdate (48-61)
gen/adiom/v1/adiomv1connect/adiom.connect.go (1)
  • ConnectorServiceHandler (214-225)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: build
🔇 Additional comments (1)
connectors/s3vector/conn.go (1)

88-104: Confirm Go/toolchain semantics for slices.Chunk and closure capture; safest is to capture batch.
This batching uses for batch := range slices.Chunk(...) and then closes over batch inside eg.Go. Whether that’s safe depends on (a) your Go version’s range-variable semantics and (b) what slices.Chunk returns in your targeted Go version.

Safe regardless of toolchain:

 	for batch := range slices.Chunk(vectors, c.batchSize) {
+		b := batch
 		eg.Go(func() error {
-			if err := c.limiter.WaitN(ctx, len(batch)); err != nil {
+			if err := c.limiter.WaitN(ctx, len(b)); err != nil {
 				return fmt.Errorf("err in limiter: %w", err)
 			}
 			if _, err := c.client.PutVectors(ctx, &s3vectors.PutVectorsInput{
-				Vectors:          batch,
+				Vectors:          b,
 				IndexName:        indexName,
 				VectorBucketName: c.bucketName,
 			}); err != nil {

Apply the same pattern to both loops in WriteUpdates.

Also applies to: 223-254

Comment on lines +34 to +60
// GeneratePlan implements adiomv1connect.ConnectorServiceHandler.
func (c *conn) GeneratePlan(context.Context, *connect.Request[adiomv1.GeneratePlanRequest]) (*connect.Response[adiomv1.GeneratePlanResponse], error) {
panic("unimplemented")
}

// GetInfo implements adiomv1connect.ConnectorServiceHandler.
func (c *conn) GetInfo(context.Context, *connect.Request[adiomv1.GetInfoRequest]) (*connect.Response[adiomv1.GetInfoResponse], error) {
return connect.NewResponse(&adiomv1.GetInfoResponse{
Id: *c.bucketName,
DbType: "s3vector",
Capabilities: &adiomv1.Capabilities{
Sink: &adiomv1.Capabilities_Sink{
SupportedDataTypes: []adiomv1.DataType{adiomv1.DataType_DATA_TYPE_MONGO_BSON},
},
},
}), nil
}

// GetNamespaceMetadata implements adiomv1connect.ConnectorServiceHandler.
func (c *conn) GetNamespaceMetadata(context.Context, *connect.Request[adiomv1.GetNamespaceMetadataRequest]) (*connect.Response[adiomv1.GetNamespaceMetadataResponse], error) {
panic("unimplemented")
}

// ListData implements adiomv1connect.ConnectorServiceHandler.
func (c *conn) ListData(context.Context, *connect.Request[adiomv1.ListDataRequest]) (*connect.Response[adiomv1.ListDataResponse], error) {
panic("unimplemented")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t panic("unimplemented") in handler methods; return Unimplemented errors.
These are RPC handlers—panics will crash the process if invoked (even accidentally by discovery/introspection).

 func (c *conn) GeneratePlan(context.Context, *connect.Request[adiomv1.GeneratePlanRequest]) (*connect.Response[adiomv1.GeneratePlanResponse], error) {
-	panic("unimplemented")
+	return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("GeneratePlan unimplemented"))
 }
@@
 func (c *conn) GetNamespaceMetadata(context.Context, *connect.Request[adiomv1.GetNamespaceMetadataRequest]) (*connect.Response[adiomv1.GetNamespaceMetadataResponse], error) {
-	panic("unimplemented")
+	return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("GetNamespaceMetadata unimplemented"))
 }
@@
 func (c *conn) ListData(context.Context, *connect.Request[adiomv1.ListDataRequest]) (*connect.Response[adiomv1.ListDataResponse], error) {
-	panic("unimplemented")
+	return nil, connect.NewError(connect.CodeUnimplemented, fmt.Errorf("ListData unimplemented"))
 }
🤖 Prompt for AI Agents
In connectors/s3vector/conn.go around lines 34 to 60, the RPC handlers currently
call panic("unimplemented"); replace those panics with proper Unimplemented RPC
errors: return nil and a connect.NewError(connect.CodeUnimplemented,
fmt.Errorf("GeneratePlan not implemented")) (and similar messages for
GetNamespaceMetadata and ListData). Ensure each handler returns the appropriate
Unimplemented error rather than panicking so the process won't crash if the RPC
is invoked.

Comment on lines +72 to +110
// WriteData implements adiomv1connect.ConnectorServiceHandler.
func (c *conn) WriteData(ctx context.Context, r *connect.Request[adiomv1.WriteDataRequest]) (*connect.Response[adiomv1.WriteDataResponse], error) {
if len(r.Msg.GetData()) == 0 {
return connect.NewResponse(&adiomv1.WriteDataResponse{}), nil
}

indexName := aws.String(r.Msg.GetNamespace())
vectors := make([]types.PutInputVector, len(r.Msg.GetData()))
for i, data := range r.Msg.GetData() {
vector, err := dataToPIV(c.vectorKey, data)
if err != nil {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("err getting input vector: %w", err))
}
vectors[i] = vector
}

eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(c.maxParallelism)
for batch := range slices.Chunk(vectors, c.batchSize) {
eg.Go(func() error {
if err := c.limiter.WaitN(ctx, len(batch)); err != nil {
return fmt.Errorf("err in limiter: %w", err)
}
if _, err := c.client.PutVectors(ctx, &s3vectors.PutVectorsInput{
Vectors: batch,
IndexName: indexName,
VectorBucketName: c.bucketName,
}); err != nil {
return fmt.Errorf("err putting vectors: %w", err)
}
return nil
})
}
if err := eg.Wait(); err != nil {
return nil, connect.NewError(connect.CodeInternal, err)
}

return connect.NewResponse(&adiomv1.WriteDataResponse{}), nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Validate maxParallelism, batchSize, and limiter in NewConn.

The three methods (WriteData, DeleteData, SearchVectors) use unvalidated constructor parameters:

  • eg.SetLimit(c.maxParallelism) panics if maxParallelism ≤ 0
  • c.limiter.WaitN(...) panics if limiter is nil
  • WaitN(ctx, len(batch)) fails if len(batch) > limiter.Burst()
  • Batch requests exceeding AWS limits (500 vectors) will be rejected

Add validation in NewConn:

 func NewConn(bucketName string, vectorKey string, maxParallelism int, batchSize int, limiter *rate.Limiter) (*conn, error) {
+	if maxParallelism <= 0 {
+		maxParallelism = 1
+	}
+	if batchSize <= 0 {
+		batchSize = 200
+	}
+	if batchSize > 500 {
+		return nil, fmt.Errorf("batchSize %d exceeds PutVectors limit (500)", batchSize)
+	}
+	if limiter == nil {
+		return nil, fmt.Errorf("limiter must not be nil")
+	}
+	if limiter.Burst() < batchSize {
+		return nil, fmt.Errorf("limiter burst %d must be >= batchSize %d", limiter.Burst(), batchSize)
+	}
 	awsConfig, err := config.LoadDefaultConfig(context.Background())

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In connectors/s3vector/conn.go around lines 72 to 110, add constructor
validation in NewConn to ensure maxParallelism > 0 (return an error if not),
batchSize > 0 and <= 500 (AWS limit) and limiter is non-nil; also ensure
limiter.Burst() >= batchSize (or return an error) so WaitN won't panic; after
these checks, use c.maxParallelism safely with eg.SetLimit and rely on validated
batchSize when chunking so individual batches never exceed AWS or limiter
limits.

Comment on lines +48 to +61
func KeepLastUpdate(updates []*adiomv1.Update) []*adiomv1.Update {
hashToDataIdIndex := map[uint64][]*dataIdIndex{}
var res []*adiomv1.Update
for _, update := range updates {
dii, isNew := addToIdIndexMap(hashToDataIdIndex, update)
if isNew {
dii.index = len(res)
res = append(res, update)
} else {
res[dii.index] = update
}
}
return res
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard against nil Update entries to avoid panic.
If updates contains a nil element, addToIdIndexMap(..., update) will panic when calling update.GetId().

 func KeepLastUpdate(updates []*adiomv1.Update) []*adiomv1.Update {
 	hashToDataIdIndex := map[uint64][]*dataIdIndex{}
 	var res []*adiomv1.Update
 	for _, update := range updates {
+		if update == nil {
+			continue
+		}
 		dii, isNew := addToIdIndexMap(hashToDataIdIndex, update)
 		if isNew {
 			dii.index = len(res)
 			res = append(res, update)
 		} else {
 			res[dii.index] = update
 		}
 	}
 	return res
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func KeepLastUpdate(updates []*adiomv1.Update) []*adiomv1.Update {
hashToDataIdIndex := map[uint64][]*dataIdIndex{}
var res []*adiomv1.Update
for _, update := range updates {
dii, isNew := addToIdIndexMap(hashToDataIdIndex, update)
if isNew {
dii.index = len(res)
res = append(res, update)
} else {
res[dii.index] = update
}
}
return res
}
func KeepLastUpdate(updates []*adiomv1.Update) []*adiomv1.Update {
hashToDataIdIndex := map[uint64][]*dataIdIndex{}
var res []*adiomv1.Update
for _, update := range updates {
if update == nil {
continue
}
dii, isNew := addToIdIndexMap(hashToDataIdIndex, update)
if isNew {
dii.index = len(res)
res = append(res, update)
} else {
res[dii.index] = update
}
}
return res
}
🤖 Prompt for AI Agents
In connectors/util/util.go around lines 48 to 61, the loop does not guard
against nil entries in the updates slice and will panic when calling
addToIdIndexMap(update) if update is nil; modify the loop to check if update ==
nil and simply continue (skip) such entries before calling addToIdIndexMap so
you never dereference a nil Update, preserving behavior of indexing/res
replacement for non-nil updates.

Comment on lines +20 to 28
"github.com/adiom-data/dsync/connectors/s3vector"
"github.com/adiom-data/dsync/connectors/testconn"
"github.com/adiom-data/dsync/connectors/vector"
"github.com/adiom-data/dsync/gen/adiom/v1/adiomv1connect"
"github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
"golang.org/x/net/http2"
"golang.org/x/time/rate"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

CLI should validate batch-size (<=500) and ensure limiter burst >= batch-size.
Right now a user can set --batch-size > 500 (AWS hard limit per your help text) or --batch-size > rate-limit-burst, which will likely fail at runtime (API validation error or WaitN burst error).

 			}, func(c *cli.Context, args []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) {
@@
 				maxParallelism := c.Int("max-parallelism")
 				rateLimit := c.Int("rate-limit")
-				rateLimitBurst := max(c.Int("rate-limit-burst"), rateLimit)
 				batchSize := c.Int("batch-size")
+				if maxParallelism <= 0 {
+					return nil, fmt.Errorf("max-parallelism must be >= 1")
+				}
+				if batchSize <= 0 || batchSize > 500 {
+					return nil, fmt.Errorf("batch-size must be in [1, 500]")
+				}
+				rateLimitBurst := max(c.Int("rate-limit-burst"), rateLimit)
+				rateLimitBurst = max(rateLimitBurst, batchSize)
 				limiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimitBurst)
 				return s3vector.NewConn(bucket, vectorKey, maxParallelism, batchSize, limiter)
 			}),

Also applies to: 349-395

🤖 Prompt for AI Agents
In internal/app/options/connectorflags.go around lines 20-28 (and similarly for
the flag block at lines 349-395), add CLI validation to enforce that the
--batch-size flag is at most 500 and that the configured rate-limit-burst is >=
batch-size; if either check fails return a descriptive error from the CLI
command's Before/Validate step (or flag's Validate function) so the process
fails fast with a clear message. Specifically: after flags are parsed (or in the
flags' Validate), read batch-size and rate-limit-burst, reject values where
batch-size > 500 or rate-limit-burst < batch-size, and surface a helpful error
like "batch-size must be <= 500" or "rate-limit-burst must be >= batch-size" to
prevent runtime API or rate limiter errors.

@adiom-mark adiom-mark enabled auto-merge (squash) December 13, 2025 00:13
@adiom-mark adiom-mark disabled auto-merge December 13, 2025 00:15
@adiom-mark adiom-mark merged commit 2a5d8c9 into main Dec 13, 2025
2 checks passed
@adiom-mark adiom-mark deleted the s3vector branch December 13, 2025 00:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant