Conversation
WalkthroughAdds namespace field to null connector logs. Updates null connector tests to use a new NewConn signature. Introduces a new v2 random connector with planning, listing, and streaming logic, plus a dedicated test. Registers a CLI “/dev/fakesource” that constructs the v2 random connector. Fixes destination ID in runDsync output. Adjusts test plan request flags. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor CLI as CLI (/dev/fakesource)
participant Flags as ConnectorFlags
participant Conn as random.connV2
participant Svc as ConnectorService (server)
participant Client as Test/Client
CLI->>Flags: parse flags (ns counts, batch sizes, timings, payload)
Flags->>Conn: NewConnV2(ConnV2Input)
note right of Conn: Configures namespaces, partitions,<br/>batching, jitter, stream ticks
Client->>Svc: GetInfo / GetNamespaceMetadata
Svc->>Conn: delegate
Conn-->>Client: capabilities, data types
Client->>Svc: GeneratePlan {InitialSync:true, Updates:true}
Svc->>Conn: GeneratePlan
Conn-->>Client: partitions + update partitions
par Initial sync
loop while more data
Client->>Svc: ListData(partition, cursor, batchSize)
Svc->>Conn: ListData
Conn-->>Client: documents + nextCursor
end
and Streaming
Client->>Svc: StreamUpdates/StreamLSN
Svc->>Conn: streamCommon loop (tick+jitter)
loop each tick
Conn-->>Client: batched updates or LSN
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/app/app.go (1)
205-216: Fix variable shadowing and potential panic in reverse namespace mapping.The inner dstNs shadows the outer, so duplicate detection uses the wrong key; error message also references splitted[1] which can panic when len==1.
- dstNs := splitted[0] - if len(splitted) == 2 { - dstNs := splitted[1] - namespaces = append(namespaces, dstNs+":"+splitted[0]) - } else { - namespaces = append(namespaces, dstNs) - } - if _, ok := seen[dstNs]; ok { - return fmt.Errorf("cannot reverse duplicated target namespaces %v", splitted[1]) - } - seen[dstNs] = struct{}{} + dstNs := splitted[0] + if len(splitted) == 2 { + dstNs = splitted[1] + namespaces = append(namespaces, dstNs+":"+splitted[0]) + } else { + namespaces = append(namespaces, dstNs) + } + if _, ok := seen[dstNs]; ok { + return fmt.Errorf("cannot reverse duplicated target namespaces %v", dstNs) + } + seen[dstNs] = struct{}{}
🧹 Nitpick comments (6)
connectors/null/connector_test.go (1)
22-23: LGTM: tests updated for NewConn signature.Constructor changes are reflected in tests and keep behavior explicit. Consider a small helper like NewConnDefault() to reduce repetition, but optional.
Also applies to: 29-30
connectors/random/connv2.go (2)
135-139: Guard against zero docs causing log2(0) issues.After clamping, this should be safe; if you don’t clamp, add an early check to avoid math.Log2(0) and NaN widths for idFmt.
438-439: Return InvalidArgument for unsupported data type.This is a client input error, not an internal server error.
- return connect.NewError(connect.CodeInternal, fmt.Errorf("unsupported type %v", r.Msg.GetType())) + return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unsupported type %v", r.Msg.GetType()))pkg/test/connector.go (1)
71-79: Also assert updates partitions are present.Since the request now sets Updates: true, verify TestGeneratePlan returns at least one UpdatesPartition to ensure streaming tests run.
suite.Run("TestGeneratePlan", func() { suite.Assert().NoError(err) suite.Assert().NotEmpty(planRes.Msg.GetPartitions()) for _, p := range planRes.Msg.GetPartitions() { suite.Assert().NotEmpty(p.GetNamespace()) } + suite.Assert().NotNil(planRes.Msg.GetUpdatesPartitions()) + suite.Assert().True(len(planRes.Msg.GetUpdatesPartitions()) >= 1, "updates partitions should be returned when Updates=true") })connectors/random/connector_test.go (1)
25-41: Stabilize V2 test timing; trim workload to reduce CI flakiness.1 ms intervals are brittle under load. Recommend slightly larger durations and fewer pages.
Apply:
- UpdateDuration: time.Millisecond, - StreamTick: time.Millisecond, + UpdateDuration: 10 * time.Millisecond, + StreamTick: 5 * time.Millisecond, ... - }, nil, nil, 50, 500) + }, nil, nil, 10, 500)internal/app/options/connectorflags.go (1)
233-236: Fix flag help text for max-updates-per-tick.This describes docs-per-update; it should describe updates-per-tick.
- &cli.IntFlag{ - Name: "max-updates-per-tick", - Usage: "Number of docs per update", - Value: 300, - }, + &cli.IntFlag{ + Name: "max-updates-per-tick", + Usage: "Max number of update batches emitted per tick", + Value: 300, + },
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (7)
connectors/null/connector.go(3 hunks)connectors/null/connector_test.go(1 hunks)connectors/random/connector_test.go(2 hunks)connectors/random/connv2.go(1 hunks)internal/app/app.go(1 hunks)internal/app/options/connectorflags.go(1 hunks)pkg/test/connector.go(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
pkg/test/connector.go (2)
gen/adiom/v1/messages.pb.go (3)
GeneratePlanRequest(585-593)GeneratePlanRequest(606-606)GeneratePlanRequest(621-623)java/src/main/java/adiom/v1/Messages.java (1)
GeneratePlanRequest(7859-8587)
connectors/null/connector_test.go (4)
connectors/common/base.go (1)
NewLocalConnector(863-875)connectors/null/connector.go (1)
NewConn(146-148)pkg/test/connector.go (2)
NewConnectorTestSuite(39-41)ClientFromHandler(30-37)protocol/test/connector_testsuite.go (1)
NewConnectorTestSuite(35-40)
internal/app/options/connectorflags.go (2)
gen/adiom/v1/adiomv1connect/adiom.connect.go (1)
ConnectorServiceHandler(214-225)connectors/random/connv2.go (2)
NewConnV2(114-139)ConnV2Input(98-112)
connectors/random/connv2.go (2)
gen/adiom/v1/messages.pb.go (3)
DataType_DATA_TYPE_JSON_ID(29-29)DataType_DATA_TYPE_MONGO_BSON(28-28)UpdateType_UPDATE_TYPE_UPDATE(78-78)gen/adiom/v1/adiomv1connect/adiom.connect.go (1)
ConnectorServiceHandler(214-225)
connectors/random/connector_test.go (3)
pkg/test/connector.go (2)
NewConnectorTestSuite(39-41)ClientFromHandler(30-37)gen/adiom/v1/adiomv1connect/adiom.connect.go (1)
ConnectorServiceClient(87-98)connectors/random/connv2.go (2)
NewConnV2(114-139)ConnV2Input(98-112)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: build
🔇 Additional comments (5)
connectors/null/connector.go (1)
96-96: LGTM: namespace added to logs.The added "namespace" field in write-data and write-updates logs looks good and keeps logs properly scoped.
Also applies to: 123-123, 140-140
connectors/random/connv2.go (1)
1-21: Confirm CI Go toolchain version
go.mod specifies Go 1.24 (>=1.22), satisfying the math/rand/v2 import. Please verify your CI workflows also target Go >=1.22.internal/app/app.go (1)
195-195: LGTM: destination description uses destination ID.Correct fix; avoids mixing source/destination identifiers in the UI.
connectors/random/connector_test.go (1)
10-10: LGTM: time import used by V2 test.internal/app/options/connectorflags.go (1)
214-216: Confirmed:num-update-partitions-per-namespace=0produces a single update stream. The code’s zero-check branch appends exactly oneUpdatesPartitionand skips per-partition loops.
| func NewConnV2(input ConnV2Input) *connV2 { | ||
| source := rand.NewPCG(uint64(time.Now().UnixNano()), rand.Uint64()) | ||
| initialSource, _ := source.MarshalBinary() | ||
|
|
||
| res := &connV2{ | ||
| namespacePrefix: input.NamespacePrefix, | ||
| initialSource: initialSource, | ||
| numNamespaces: input.NumNamespaces, |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Validate inputs and handle RNG seed encoding errors.
Clamp invalid inputs to safe minimums and avoid ignoring MarshalBinary errors so the cursor always contains a valid RNG state.
func NewConnV2(input ConnV2Input) *connV2 {
- source := rand.NewPCG(uint64(time.Now().UnixNano()), rand.Uint64())
- initialSource, _ := source.MarshalBinary()
+ // Clamp to safe defaults
+ if input.NumNamespaces <= 0 { input.NumNamespaces = 1 }
+ if input.NumPartitionsPerNamespace <= 0 { input.NumPartitionsPerNamespace = 1 }
+ if input.NumDocsPerPartition <= 0 { input.NumDocsPerPartition = 1 }
+ if input.BatchSize <= 0 { input.BatchSize = 1 }
+ if input.UpdateBatchSize < 0 { input.UpdateBatchSize = 0 }
+ if input.MaxUpdatesPerTick < 0 { input.MaxUpdatesPerTick = 0 }
+ if input.Jitter < 0 { input.Jitter = 0 }
+ if input.Sleep < 0 { input.Sleep = 0 }
+ if input.UpdateDuration < 0 { input.UpdateDuration = 0 }
+ if input.StreamTick <= 0 { input.StreamTick = 100 * time.Millisecond }
+
+ source := rand.NewPCG(uint64(time.Now().UnixNano()), rand.Uint64())
+ initialSource, err := source.MarshalBinary()
+ if err != nil {
+ // Fallback to a deterministic seed to ensure a non-empty source
+ source = rand.NewPCG(0xC0FFEE, 0xFACEFEED)
+ initialSource, _ = source.MarshalBinary()
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func NewConnV2(input ConnV2Input) *connV2 { | |
| source := rand.NewPCG(uint64(time.Now().UnixNano()), rand.Uint64()) | |
| initialSource, _ := source.MarshalBinary() | |
| res := &connV2{ | |
| namespacePrefix: input.NamespacePrefix, | |
| initialSource: initialSource, | |
| numNamespaces: input.NumNamespaces, | |
| func NewConnV2(input ConnV2Input) *connV2 { | |
| // Clamp to safe defaults | |
| if input.NumNamespaces <= 0 { input.NumNamespaces = 1 } | |
| if input.NumPartitionsPerNamespace <= 0 { input.NumPartitionsPerNamespace = 1 } | |
| if input.NumDocsPerPartition <= 0 { input.NumDocsPerPartition = 1 } | |
| if input.BatchSize <= 0 { input.BatchSize = 1 } | |
| if input.UpdateBatchSize < 0 { input.UpdateBatchSize = 0 } | |
| if input.MaxUpdatesPerTick < 0 { input.MaxUpdatesPerTick = 0 } | |
| if input.Jitter < 0 { input.Jitter = 0 } | |
| if input.Sleep < 0 { input.Sleep = 0 } | |
| if input.UpdateDuration < 0 { input.UpdateDuration = 0 } | |
| if input.StreamTick <= 0 { input.StreamTick = 100 * time.Millisecond } | |
| source := rand.NewPCG(uint64(time.Now().UnixNano()), rand.Uint64()) | |
| initialSource, err := source.MarshalBinary() | |
| if err != nil { | |
| // Fallback to a deterministic seed to ensure a non-empty source | |
| source = rand.NewPCG(0xC0FFEE, 0xFACEFEED) | |
| initialSource, _ = source.MarshalBinary() | |
| } | |
| res := &connV2{ | |
| namespacePrefix: input.NamespacePrefix, | |
| initialSource: initialSource, | |
| numNamespaces: input.NumNamespaces, |
| rawCursor := r.Msg.GetPartition().GetCursor() | ||
| if r.Msg.GetCursor() != nil { | ||
| rawCursor = r.Msg.GetCursor() | ||
| } | ||
| cursor, err := DecodePartitionCursor(rawCursor) | ||
| if err != nil { | ||
| return nil, connect.NewError(connect.CodeInternal, err) | ||
| } |
There was a problem hiding this comment.
Fix potential nil dereference when Partition is absent.
ListData calls GetPartition().GetCursor() before checking r.Msg.GetCursor(). If Partition is nil, this will panic. Prefer using Cursor first, then fallback to Partition with a nil check.
- rawCursor := r.Msg.GetPartition().GetCursor()
- if r.Msg.GetCursor() != nil {
- rawCursor = r.Msg.GetCursor()
- }
+ rawCursor := r.Msg.GetCursor()
+ if rawCursor == nil {
+ if r.Msg.GetPartition() == nil {
+ return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("either cursor or partition must be provided"))
+ }
+ rawCursor = r.Msg.GetPartition().GetCursor()
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| rawCursor := r.Msg.GetPartition().GetCursor() | |
| if r.Msg.GetCursor() != nil { | |
| rawCursor = r.Msg.GetCursor() | |
| } | |
| cursor, err := DecodePartitionCursor(rawCursor) | |
| if err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, err) | |
| } | |
| rawCursor := r.Msg.GetCursor() | |
| if rawCursor == nil { | |
| if r.Msg.GetPartition() == nil { | |
| return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("either cursor or partition must be provided")) | |
| } | |
| rawCursor = r.Msg.GetPartition().GetCursor() | |
| } | |
| cursor, err := DecodePartitionCursor(rawCursor) | |
| if err != nil { | |
| return nil, connect.NewError(connect.CodeInternal, err) | |
| } |
🤖 Prompt for AI Agents
In connectors/random/connv2.go around lines 300 to 307, the code currently calls
r.Msg.GetPartition().GetCursor() before checking r.Msg.GetCursor(), which will
panic if Partition is nil; change the logic to obtain rawCursor by first
checking r.Msg.GetCursor() and using it if non-nil, otherwise check if
r.Msg.GetPartition() is non-nil and then call GetCursor() on it, ensuring you
never call GetCursor() on a nil Partition; then pass the resulting rawCursor
(which may still be nil) to DecodePartitionCursor and handle the error as
before.
| if count > c.maxUpdatesPerTick { | ||
| break | ||
| } | ||
| var ns string | ||
| if len(namespaces) == 0 { | ||
| ns = fmt.Sprintf("%v%v", c.namespacePrefix, g.IntN(c.numNamespaces)) | ||
| } else { | ||
| ns = namespaces[g.IntN(len(namespaces))] | ||
| } | ||
| var updates []T | ||
| for range c.updateBatchSize { | ||
| partition := g.IntN(c.numPartitionsPerNamespace) | ||
| doc := g.Int64N(adjustedNumDocsPerPartition) | ||
| if c.numUpdatePartitionsPerNamespace > 1 { | ||
| doc = (doc * int64(c.numUpdatePartitionsPerNamespace)) + int64(cursor.PartitionIndex) | ||
| } | ||
| id := fmt.Sprintf(c.idFmt, partition, doc) | ||
| update, err := itemCallback(id) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| updates = append(updates, update) | ||
| } | ||
| count += c.updateBatchSize | ||
| cursor.LastTime = cursor.LastTime.Add(cursor.BatchDuration) |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Respect maxUpdatesPerTick without overshooting.
Current check uses > and can exceed the cap with larger batches. Use >= and consider truncating the last batch if needed.
- if count > c.maxUpdatesPerTick {
+ if count >= c.maxUpdatesPerTick {
break
}
- var updates []T
- for range c.updateBatchSize {
+ var updates []T
+ limit := c.updateBatchSize
+ if remaining := c.maxUpdatesPerTick - count; remaining < limit {
+ limit = remaining
+ }
+ for range limit {Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In connectors/random/connv2.go around lines 363 to 387 the loop can overshoot
c.maxUpdatesPerTick because it checks "if count > c.maxUpdatesPerTick" and
always uses the full c.updateBatchSize; change the pre-check to stop when count
>= c.maxUpdatesPerTick and compute remaining := c.maxUpdatesPerTick - count,
then set batchSize := min(c.updateBatchSize, remaining) (and skip if remaining
<= 0) and iterate exactly batchSize times when building updates; finally
increment count by batchSize (not c.updateBatchSize) and update cursor.LastTime
as before.
| // WriteData implements adiomv1connect.ConnectorServiceHandler. | ||
| func (c *connV2) WriteData(context.Context, *connect.Request[adiomv1.WriteDataRequest]) (*connect.Response[adiomv1.WriteDataResponse], error) { | ||
| panic("unimplemented") | ||
| } | ||
|
|
||
| // WriteUpdates implements adiomv1connect.ConnectorServiceHandler. | ||
| func (c *connV2) WriteUpdates(context.Context, *connect.Request[adiomv1.WriteUpdatesRequest]) (*connect.Response[adiomv1.WriteUpdatesResponse], error) { | ||
| panic("unimplemented") | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Don’t panic on unimplemented sink methods. Return CodeUnimplemented.
Panic will crash the process if these endpoints are hit. Prefer an explicit unimplemented error.
-import (
+import (
+ "errors"
"bytes"
"context" func (c *connV2) WriteData(context.Context, *connect.Request[adiomv1.WriteDataRequest]) (*connect.Response[adiomv1.WriteDataResponse], error) {
- panic("unimplemented")
+ return nil, connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported)
}
func (c *connV2) WriteUpdates(context.Context, *connect.Request[adiomv1.WriteUpdatesRequest]) (*connect.Response[adiomv1.WriteUpdatesResponse], error) {
- panic("unimplemented")
+ return nil, connect.NewError(connect.CodeUnimplemented, errors.ErrUnsupported)
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In connectors/random/connv2.go around lines 473 to 481, the WriteData and
WriteUpdates handlers currently call panic("unimplemented"); replace those
panics with explicit unimplemented gRPC/Connect errors returned to the caller:
construct and return an error with the CodeUnimplemented (Connect/GRPC
equivalent) and a short descriptive message (e.g., "WriteData not implemented
for random connector" and "WriteUpdates not implemented for random connector")
so the process does not crash and clients receive a proper unimplemented
response.
| }, func(c *cli.Context, _ []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) { | ||
| var m map[string]any | ||
| payload := c.StringSlice("payload") | ||
| if len(payload) > 0 { | ||
| m = map[string]any{} | ||
| } | ||
| for _, p := range payload { | ||
| k, v, _ := strings.Cut(p, ":") | ||
| m[k] = v | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Validate --payload entries; guard against malformed input.
Currently ignores split failure and writes empty values; return a clear error instead.
- var m map[string]any
- payload := c.StringSlice("payload")
- if len(payload) > 0 {
- m = map[string]any{}
- }
- for _, p := range payload {
- k, v, _ := strings.Cut(p, ":")
- m[k] = v
- }
+ m := map[string]any{}
+ payload := c.StringSlice("payload")
+ for _, p := range payload {
+ k, v, ok := strings.Cut(p, ":")
+ if !ok || k == "" {
+ return nil, fmt.Errorf("invalid --payload entry %q (expected key:value)", p)
+ }
+ m[k] = v
+ }
+ if len(m) == 0 {
+ m = nil
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| }, func(c *cli.Context, _ []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) { | |
| var m map[string]any | |
| payload := c.StringSlice("payload") | |
| if len(payload) > 0 { | |
| m = map[string]any{} | |
| } | |
| for _, p := range payload { | |
| k, v, _ := strings.Cut(p, ":") | |
| m[k] = v | |
| } | |
| }, func(c *cli.Context, _ []string, _ AdditionalSettings) (adiomv1connect.ConnectorServiceHandler, error) { | |
| m := map[string]any{} | |
| payload := c.StringSlice("payload") | |
| for _, p := range payload { | |
| k, v, ok := strings.Cut(p, ":") | |
| if !ok || k == "" { | |
| return nil, fmt.Errorf("invalid --payload entry %q (expected key:value)", p) | |
| } | |
| m[k] = v | |
| } | |
| if len(m) == 0 { | |
| m = nil | |
| } |
🤖 Prompt for AI Agents
In internal/app/options/connectorflags.go around lines 246 to 255, the loop that
parses --payload entries uses strings.Cut but ignores the cut failure and
silently writes empty values; update the code to validate each payload entry:
check the boolean result from strings.Cut (or verify presence of ':'), return a
clear error (e.g., nil, fmt.Errorf("invalid --payload entry %q: expected
key:value", p)) when an entry is malformed, and only initialize and populate the
map when entries are valid; ensure the error message includes the offending
entry so callers can correct their input.
Adds a /dev/fakesource connector.
Summary by CodeRabbit
New Features
Bug Fixes