Skip to content

[client] Fix/grpc retry#5750

Open
pappz wants to merge 28 commits intomainfrom
fix/grpc-retry
Open

[client] Fix/grpc retry#5750
pappz wants to merge 28 commits intomainfrom
fix/grpc-retry

Conversation

@pappz
Copy link
Copy Markdown
Contributor

@pappz pappz commented Mar 31, 2026

Describe your changes

  • Fix Receive stuck after Close: Use backoff.Permanent for codes.Canceled errors so Receive() returns immediately when the connection is closed, instead of retrying for up to 3 months
  • Add connection recreation on corrupt stream: Store gRPC dial options on the client and recreate the connection on codes.Internal errors (RST_STREAM/PROTOCOL_ERROR), with logging for visibility
  • Remove WaitForReady: Let the backoff loop handle retry timing instead of blocking inside gRPC
  • Unify mutex: Replace streamMu with a single mu that protects clientConn, realClient, and stream
  • Fix back-off reset: After success connection reset the backoff. Without it after 3 month this logic expire

Issue ticket number and link

Stack

Checklist

  • Is it a bug fix
  • Is a typo/documentation fix
  • Is a feature enhancement
  • It is a refactor
  • Created tests that fail without the change (if possible)

By submitting this pull request, you confirm that you have read and agree to the terms of the Contributor License Agreement.

Documentation

Select exactly one:

  • I added/updated documentation for this change
  • Documentation is not needed for this change (explain why)

Docs PR URL (required if "docs added" is checked)

Paste the PR link from https://github.com/netbirdio/docs here:

https://github.com/netbirdio/docs/pull/__

Summary by CodeRabbit

  • New Features

    • Added a public Send API, preserved connection options for automatic reconnection, and introduced an explicit client-closed error.
  • Bug Fixes

    • Close now reliably cancels in-flight work and detaches/closes connections without deadlock.
    • Prevent concurrent receives; receive/setup fail fast on initiation/header errors and treat cancellations as permanent.
    • Improved reconnection and backoff behavior (health-window reset, reduced jitter) to avoid spurious retries.
  • Tests

    • Added concurrency, shutdown-during-receive, and protocol-level stream-reset reconnection tests.

pappz added 4 commits March 31, 2026 14:50
Use backoff.Permanent for canceled gRPC errors so Receive returns
immediately instead of retrying until context deadline when the
connection is already closed. Add TestNewClient_PermanentClose to
verify the behavior.

The connectivity.Shutdown check was meaningless because when the connection is
shut down, c.realClient.Events(ctx, grpc.WaitForReady(true)) on the nex line
already fails with codes.Canceled — which is now handled as a permanent error.
The explicit state check was just duplicating what gRPC already reports
through its normal error path.
grpc.WaitForReady(true) parks the RPC call internally until the
connection reaches READY, only unblocking on ctx cancellation.
This means the external backoff.Retry loop in Receive() never gets
control back during a connection outage — it cannot tick, log, or
apply its retry intervals while WaitForReady is blocking.

Removing it restores fail-fast behaviour: Events() returns immediately
with codes.Unavailable when the connection is not ready, which is
exactly what the backoff loop expects. The backoff becomes the single
authority over retry timing and cadence, as originally intended.
…ling

Store gRPC dial options on the client to enable connection recreation
on Internal errors (RST_STREAM/PROTOCOL_ERROR). Treat Unauthenticated,
PermissionDenied, and Unimplemented as permanent failures. Unify mutex
usage and add reconnection logging for better observability.
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 31, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 33bf9836-249a-4bef-8d8e-b41e1768a3bd

📥 Commits

Reviewing files that changed from the base of the PR and between dfbc0ef and 7d5decf.

📒 Files selected for processing (1)
  • flow/client/client.go

📝 Walkthrough

Walkthrough

Reworked client lifecycle to use a single mutex, added exported ErrClientClosed and Send, persisted dial target/opts for reconnection, refactored Receive to establish streams and centralize retry/recreate logic, changed backoff/jitter semantics, and added tests including protocol-error reconnection and shutdown scenarios.

Changes

Cohort / File(s) Summary
Client core
flow/client/client.go
Replaced prior per-stream locking with a single mu guarding lifecycle state (clientConn, realClient, stream, closed, receiving); stored dial target/opts; added exported ErrClientClosed and Send(event *proto.FlowEvent) error; refactored receive flow into establishStream()/receive() and centralized retry via handleRetryableError; implemented recreateConnection() to rebuild gRPC connection on failures; adjusted backoff/jitter and added minHealthyDuration; Close() detaches and closes conn safely.
Tests / harness
flow/client/client_test.go
Added connTrackListener to record connections and inject raw HTTP/2 RST_STREAM(PROTOCOL_ERROR) frames; extended test server with stream lifecycle channels; updated Events() handler to signal start/stop and return codes.Internal on forced close; replaced map ack tracking with atomics and synchronized handler start; added tests for shutdown behavior and protocol-error-driven reconnect and connection-count verification.

Sequence Diagram(s)

sequenceDiagram
  participant Caller
  participant Client
  participant Conn as "grpc.ClientConn / realClient"
  participant Stream

  Caller->>Client: Receive(ctx)
  Client->>Client: lock mu, snapshot conn/opts
  Client->>Conn: Establish Events stream
  Client->>Stream: Send initiator FlowEvent
  alt stream established
    Client->>Client: lock mu, set c.stream
    loop receive loop
      Stream-->>Client: FlowEvent
      Client-->>Caller: deliver event
    end
  else establishment/send failed
    Client-->>Caller: return error
  end

  alt codes.Internal during receive
    Client->>Client: handleRetryableError -> recreateConnection()
    Client->>Conn: Dial new conn with stored target/opts
    Client->>Client: swap clientConn/realClient, clear stream (under mu)
    Client->>Conn: close old conn (outside mu)
    Client-->>Caller: return error to trigger re-establish
  end

  alt context canceled or deadline
    Client-->>Caller: treat as permanent -> return cancellation error
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • mlsmaycon
  • lixmal

Poem

🐰 I tightened one mutex and stored the road I roam,
I close the old connection, then dial a new one home,
If streams go bump and break, I stitch another thread,
I send my tiny pings and keep the ack-count fed,
Hoppity reconnect — the flow keeps on its thread.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title is vague and generic, using the non-descriptive phrase 'Fix/grpc retry' which does not convey the key changes or main improvements in the changeset. Replace with a more specific title that highlights the primary fix, such as 'Fix Receive hanging after Close and add gRPC connection recreation on stream errors'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The description covers key changes, is marked as both a bug fix and refactor with a clear checklist, and indicates documentation is not needed; however, it lacks an issue ticket number/link as required by the template.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/grpc-retry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
flow/client/client_test.go (1)

162-180: ⚠️ Potential issue | 🟠 Major

This test never cancels the context.

Line 180 calls Receive() with a live ctx, so the test blocks inside the receive loop and never reaches the assertions below. Cancel before the call, or run Receive() in a goroutine and cancel from the test.

✅ Deterministic fix
 	ctx, cancel := context.WithCancel(context.Background())
 	t.Cleanup(cancel)
+	cancel()

 	client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
 	require.NoError(t, err)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` around lines 162 - 180, The test creates a live
context (ctx from context.WithCancel) and then calls client.Receive(ctx, ...)
which blocks indefinitely because the context is never canceled; update the test
to either cancel the context before calling Receive or, better, run Receive in a
goroutine and cancel ctx from the test to unblock the loop: start Receive(...)
in a new goroutine (using the msgHandler defined) and then call cancel() (or use
t.Cleanup to call cancel) so the receive loop exits and the test can proceed to
assert handlerCalled and close the client (client.Close) cleanly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client.go`:
- Around line 80-89: The Close() method currently only clears stream and leaves
the client able to be resurrected; add a terminal closed flag (e.g., c.closed
bool) protected by c.mu and set it to true inside GRPCClient.Close while also
clearing c.stream and c.clientConn under the same lock; update
recreateConnection() and establishStreamAndReceive() (and any code path that
creates or installs a new c.clientConn or c.stream, such as Receive()) to check
c.closed under c.mu at start and return/bail immediately if set so no new
connections/streams are installed after Close() is called.
- Around line 119-127: The codes.Internal branch in the error handling currently
returns nil after successfully calling c.recreateConnection(), which incorrectly
signals success to backoff.Retry; change the flow in that branch (in the
function handling stream errors—see the switch on gRPC codes and the call to
c.recreateConnection()) to return a retryable error instead of nil so the retry
loop re-attempts establishStreamAndReceive on the newly created connection
(alternatively, re-enter establishStreamAndReceive before returning), ensuring
the stream is re-established after recreateConnection() succeeds.

---

Outside diff comments:
In `@flow/client/client_test.go`:
- Around line 162-180: The test creates a live context (ctx from
context.WithCancel) and then calls client.Receive(ctx, ...) which blocks
indefinitely because the context is never canceled; update the test to either
cancel the context before calling Receive or, better, run Receive in a goroutine
and cancel ctx from the test to unblock the loop: start Receive(...) in a new
goroutine (using the msgHandler defined) and then call cancel() (or use
t.Cleanup to call cancel) so the receive loop exits and the test can proceed to
assert handlerCalled and close the client (client.Close) cleanly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: afce4db9-6021-4a87-abb5-05de2882dd01

📥 Commits

Reviewing files that changed from the base of the PR and between 405c3f4 and a178ec7.

📒 Files selected for processing (2)
  • flow/client/client.go
  • flow/client/client_test.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
flow/client/client.go (1)

111-134: ⚠️ Potential issue | 🔴 Critical

Critical: Inverted error condition breaks retry logic entirely.

Line 111 checks err == nil but then attempts to handle error cases inside that block. This inverts the logic:

  • On success (err == nil): enters the block, no error checks match, falls through to return fmt.Errorf("receive: %w", err) which wraps nil — incorrectly signals failure.
  • On failure (err != nil): skips the block entirely, returns nil — incorrectly signals success to backoff.Retry.

This means successful stream operations are retried while actual errors terminate the retry loop as "success."

🐛 Proposed fix
 	operation := func() error {
-		if err := c.establishStreamAndReceive(ctx, msgHandler); err == nil {
+		if err := c.establishStreamAndReceive(ctx, msgHandler); err != nil {
 			if errors.Is(err, context.Canceled) {
 				return backoff.Permanent(err)
 			}
 			if s, ok := status.FromError(err); ok {
 				switch s.Code() {
 				case codes.Canceled:
 					return backoff.Permanent(err)
 				case codes.Internal:
 					log.Warnf("connection corrupt, attempting reconnection: %v", err)
 					// RST_STREAM/PROTOCOL_ERROR — connection is corrupt, recreate immediately
 					if err := c.recreateConnection(); err != nil {
 						log.Errorf("recreate connection: %v", err)
 						return err
 					}
 					log.Infof("connection recreated successfully")
 					return fmt.Errorf("connection recreated, re-establishing stream")
 				}
 			}

 			log.Errorf("receive failed: %v", err)
 			return fmt.Errorf("receive: %w", err)
 		}
 		return nil
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 111 - 134, The conditional around
establishStreamAndReceive is inverted: change the check from "if err == nil" to
"if err != nil" so error handling runs only on failures; inside that block,
handle context.Canceled via errors.Is(err, context.Canceled) and map gRPC status
codes (use status.FromError(err) and switch on s.Code() to treat codes.Canceled
as permanent and codes.Internal to log, call c.recreateConnection(), and return
an informative error to trigger retry; otherwise log the receive failure and
return fmt.Errorf("receive: %w", err). Ensure the function returns nil only when
establishStreamAndReceive succeeded (i.e., err == nil) outside the
error-handling branch.
♻️ Duplicate comments (1)
flow/client/client.go (1)

80-90: ⚠️ Potential issue | 🟡 Minor

Close() still lacks terminal state protection.

The previous review correctly identified that without a closed flag, recreateConnection() can install a new connection after Close() has been called, leaking the replacement connection. This concern remains unaddressed.

Additionally, line 86 should guard against a nil conn for defensive safety:

 	c.mu.Unlock()

+	if conn == nil {
+		return nil
+	}
 	if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 80 - 90, Add terminal-state protection to
GRPCClient by introducing a closed boolean field and using it in Close() and
recreateConnection(); in Close() acquire c.mu, return immediately if c.closed is
true, set c.closed = true and clear c.stream/clientConn while still holding the
lock, then unlock and only attempt to close the saved conn if it is non-nil
(defensive nil check) and handle context.Canceled as before; update
recreateConnection() to check c.closed under c.mu and refuse to create or
install a new connection when closed to avoid leaking replacements.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@flow/client/client.go`:
- Around line 111-134: The conditional around establishStreamAndReceive is
inverted: change the check from "if err == nil" to "if err != nil" so error
handling runs only on failures; inside that block, handle context.Canceled via
errors.Is(err, context.Canceled) and map gRPC status codes (use
status.FromError(err) and switch on s.Code() to treat codes.Canceled as
permanent and codes.Internal to log, call c.recreateConnection(), and return an
informative error to trigger retry; otherwise log the receive failure and return
fmt.Errorf("receive: %w", err). Ensure the function returns nil only when
establishStreamAndReceive succeeded (i.e., err == nil) outside the
error-handling branch.

---

Duplicate comments:
In `@flow/client/client.go`:
- Around line 80-90: Add terminal-state protection to GRPCClient by introducing
a closed boolean field and using it in Close() and recreateConnection(); in
Close() acquire c.mu, return immediately if c.closed is true, set c.closed =
true and clear c.stream/clientConn while still holding the lock, then unlock and
only attempt to close the saved conn if it is non-nil (defensive nil check) and
handle context.Canceled as before; update recreateConnection() to check c.closed
under c.mu and refuse to create or install a new connection when closed to avoid
leaking replacements.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: fedafa84-5108-422b-a6e1-243b6ef947f1

📥 Commits

Reviewing files that changed from the base of the PR and between a178ec7 and 8ba4183.

📒 Files selected for processing (1)
  • flow/client/client.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
flow/client/client.go (1)

81-85: ⚠️ Potential issue | 🟠 Major

Make Close() terminal.

Close() still only clears stream, so a Receive() that races in after shutdown starts can keep running, and both recreateConnection() and establishStreamAndReceive() can install a fresh connection/stream after the caller has already closed the client. That leaves the client live again and can leak the replacement connection.

🔒 Minimal fix shape
 type GRPCClient struct {
 	realClient proto.FlowServiceClient
 	clientConn *grpc.ClientConn
 	stream     proto.FlowService_EventsClient
 	opts       []grpc.DialOption
 	mu         sync.Mutex
 	wg         sync.WaitGroup
+	closed     bool
 }

 func (c *GRPCClient) Close() error {
 	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		c.wg.Wait()
+		return nil
+	}
+	c.closed = true
 	c.stream = nil
 	conn := c.clientConn
+	c.clientConn = nil
+	c.realClient = nil
 	c.mu.Unlock()
 	...
 }

 func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHandler func(msg *proto.FlowEventAck) error) error {
-	c.wg.Add(1)
+	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		return context.Canceled
+	}
+	c.wg.Add(1)
+	c.mu.Unlock()
 	defer c.wg.Done()
 	...
 }

recreateConnection() and establishStreamAndReceive() need the same closed guard before they read or swap clientConn, realClient, or stream.

Also applies to: 112-114, 151-169, 172-175

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 81 - 85, Close() must be made terminal:
set a boolean closed flag under c.mu, close/cleanup the underlying clientConn
and stream while holding the lock, and return; then add checks in
recreateConnection(), establishStreamAndReceive(), and Receive() to bail out
early if c.closed is true before they read or swap c.clientConn, c.realClient,
or c.stream (i.e., acquire c.mu, test c.closed, and if set avoid creating or
installing a new connection/stream). Ensure all places that assign c.clientConn,
c.realClient, or c.stream perform the closed-guard check under the same mutex so
no new resources are installed after Close() runs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client.go`:
- Around line 87-93: The early return on the conn.Close() error path skips
c.wg.Wait(), breaking the "wait for receive loop" guarantee; change the Close
logic in the client (the method calling conn.Close() and c.wg.Wait()) to capture
the close error (e.g., closeErr := conn.Close() / or errFromClose), do not
return immediately, always call c.wg.Wait(), then if closeErr is non-nil and not
context.Canceled return fmt.Errorf("close client connection: %w", closeErr),
otherwise return nil.

---

Duplicate comments:
In `@flow/client/client.go`:
- Around line 81-85: Close() must be made terminal: set a boolean closed flag
under c.mu, close/cleanup the underlying clientConn and stream while holding the
lock, and return; then add checks in recreateConnection(),
establishStreamAndReceive(), and Receive() to bail out early if c.closed is true
before they read or swap c.clientConn, c.realClient, or c.stream (i.e., acquire
c.mu, test c.closed, and if set avoid creating or installing a new
connection/stream). Ensure all places that assign c.clientConn, c.realClient, or
c.stream perform the closed-guard check under the same mutex so no new resources
are installed after Close() runs.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 26e6364b-2f43-411c-bcc2-8f6dab58d8cd

📥 Commits

Reviewing files that changed from the base of the PR and between 8ba4183 and 2ddeacb.

📒 Files selected for processing (2)
  • flow/client/client.go
  • flow/client/client_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • flow/client/client_test.go

… during Receive

Prevent reconnection attempts after client closure by tracking a `closed` flag. Use `backoff.Permanent` for errors caused by operations on a closed client. Add a test to ensure `Close` does not block when `Receive` is actively running.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
flow/client/client.go (1)

84-93: ⚠️ Potential issue | 🔴 Critical

Make Close() idempotent to avoid a nil-pointer panic.

On repeated Close() calls, conn can be nil and Line 91 will panic (conn.Close()). Guard closed state and nil connection before closing.

🔧 Suggested fix
func (c *GRPCClient) Close() error {
 	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		return nil
+	}
 	c.closed = true
 	c.stream = nil
 	conn := c.clientConn
 	c.clientConn = nil
+	c.realClient = nil
 	c.mu.Unlock()

+	if conn == nil {
+		return nil
+	}
 	if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
 		return fmt.Errorf("close client connection: %w", err)
 	}

 	return nil
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 84 - 93, The Close method currently
assumes c.clientConn (conn) is non-nil and calls conn.Close(), which can panic
on repeated Close() calls; make Close idempotent by checking the closed state
and/or nil connection under the mutex before attempting to Close: acquire c.mu,
if c.closed is true return nil, otherwise set c.closed=true, set c.stream=nil,
copy c.clientConn to a local variable, set c.clientConn=nil, release the mutex,
then only if the local conn is non-nil call conn.Close() and handle the error
(ignoring context.Canceled) — reference symbols: Close (method), c.mu, c.closed,
c.stream, c.clientConn.
🧹 Nitpick comments (1)
flow/client/client.go (1)

98-105: Return ErrClientClosed from Send() when the client is closed.

Line 103 currently returns "stream not initialized" for both “not yet connected” and “already closed”. Using ErrClientClosed here would make caller behavior more deterministic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 98 - 105, The Send method on GRPCClient
currently returns a generic "stream not initialized" error when stream == nil;
change it to return ErrClientClosed when the client has been closed and keep the
existing error for the "not yet connected" case — inspect GRPCClient's
closed/close indicator (e.g., a boolean or error field like ErrClientClosed or
c.closed) after grabbing c.stream under the mutex and return ErrClientClosed if
that indicator shows the client is closed; otherwise return the original "stream
not initialized" error. Ensure you reference GRPCClient.Send, c.stream, c.mu and
the ErrClientClosed symbol when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client.go`:
- Around line 157-165: The current defer closes the old connection immediately
(defer func(conn *grpc.ClientConn){ _ = conn.Close()}(c.clientConn)) even if
grpc.NewClient(c.clientConn.Target(), c.opts...) fails, which may leave
c.clientConn pointing to a closed conn; change the flow so you first call
grpc.NewClient(...) and only if it succeeds swap c.clientConn under c.mu (e.g.,
assign new conn to a local variable, lock, replace c.clientConn, unlock) and
then close the previous connection (call Close on the old connection after the
swap), ensuring conn.Close is not deferred on the original c.clientConn before a
successful creation.

---

Duplicate comments:
In `@flow/client/client.go`:
- Around line 84-93: The Close method currently assumes c.clientConn (conn) is
non-nil and calls conn.Close(), which can panic on repeated Close() calls; make
Close idempotent by checking the closed state and/or nil connection under the
mutex before attempting to Close: acquire c.mu, if c.closed is true return nil,
otherwise set c.closed=true, set c.stream=nil, copy c.clientConn to a local
variable, set c.clientConn=nil, release the mutex, then only if the local conn
is non-nil call conn.Close() and handle the error (ignoring context.Canceled) —
reference symbols: Close (method), c.mu, c.closed, c.stream, c.clientConn.

---

Nitpick comments:
In `@flow/client/client.go`:
- Around line 98-105: The Send method on GRPCClient currently returns a generic
"stream not initialized" error when stream == nil; change it to return
ErrClientClosed when the client has been closed and keep the existing error for
the "not yet connected" case — inspect GRPCClient's closed/close indicator
(e.g., a boolean or error field like ErrClientClosed or c.closed) after grabbing
c.stream under the mutex and return ErrClientClosed if that indicator shows the
client is closed; otherwise return the original "stream not initialized" error.
Ensure you reference GRPCClient.Send, c.stream, c.mu and the ErrClientClosed
symbol when making the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4dda6fc0-e5db-4e9c-af20-fb953dec314f

📥 Commits

Reviewing files that changed from the base of the PR and between 2ddeacb and 11e9c05.

📒 Files selected for processing (2)
  • flow/client/client.go
  • flow/client/client_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • flow/client/client_test.go

Close the old `gRPC.ClientConn` after successfully swapping to a new connection during reconnection.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
flow/client/client.go (1)

83-96: ⚠️ Potential issue | 🟠 Major

Missing nil check on conn before Close() — double-close will panic.

If Close() is called twice, the second call will capture conn = nil (since c.clientConn was set to nil on the first call), then panic on line 91 when calling conn.Close().

🛡️ Proposed fix
 func (c *GRPCClient) Close() error {
 	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		return nil
+	}
 	c.closed = true
 	c.stream = nil
 	conn := c.clientConn
 	c.clientConn = nil
 	c.mu.Unlock()

+	if conn == nil {
+		return nil
+	}
 	if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
 		return fmt.Errorf("close client connection: %w", err)
 	}

 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 83 - 96, In GRPCClient.Close ensure you
don't call Close on a nil connection: after moving c.clientConn into the local
conn (inside GRPCClient.Close) check if conn == nil and return nil before
calling conn.Close(); preserve the mutex locking/unlocking around reading and
clearing c.clientConn and only attempt to Close the saved non-nil conn, so
double-close won't panic (references: GRPCClient.Close, conn, c.clientConn,
c.mu).
🧹 Nitpick comments (1)
flow/client/client.go (1)

150-172: Holding mutex during grpc.NewClient() can cause lock contention.

grpc.NewClient() at line 157 is called while holding c.mu. Although grpc.NewClient itself is typically fast (it doesn't establish the connection immediately with the new client API), this pattern could still cause issues if any internal operations delay. Consider capturing target and opts under the lock, releasing it, then calling NewClient.

♻️ Suggested refactor
 func (c *GRPCClient) recreateConnection() error {
 	c.mu.Lock()
 	if c.closed {
 		c.mu.Unlock()
 		return backoff.Permanent(ErrClientClosed)
 	}
+	target := c.clientConn.Target()
+	opts := c.opts
+	c.mu.Unlock()

-	conn, err := grpc.NewClient(c.clientConn.Target(), c.opts...)
+	conn, err := grpc.NewClient(target, opts...)
 	if err != nil {
-		c.mu.Unlock()
 		return fmt.Errorf("create new connection: %w", err)
 	}

+	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		_ = conn.Close()
+		return backoff.Permanent(ErrClientClosed)
+	}
 	old := c.clientConn
 	c.clientConn = conn
 	c.realClient = proto.NewFlowServiceClient(conn)
 	c.stream = nil
 	c.mu.Unlock()

 	_ = old.Close()

 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 150 - 172, recreateConnection currently
calls grpc.NewClient while holding c.mu which can cause unnecessary contention;
instead, inside recreateConnection grab c.mu, check c.closed, copy target :=
c.clientConn.Target() and opts := append([]grpc.DialOption(nil), c.opts...) (or
otherwise copy the options), then unlock; call grpc.NewClient(target, opts...)
outside the lock; after successful creation re-lock, check c.closed again and if
closed close the newly created conn and return
backoff.Permanent(ErrClientClosed), otherwise swap in the new connection by
setting c.clientConn, c.realClient and nil-ing c.stream, capture old into a
local variable, unlock and finally close old; preserve original error wrapping
behavior on NewClient failures.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@flow/client/client.go`:
- Around line 83-96: In GRPCClient.Close ensure you don't call Close on a nil
connection: after moving c.clientConn into the local conn (inside
GRPCClient.Close) check if conn == nil and return nil before calling
conn.Close(); preserve the mutex locking/unlocking around reading and clearing
c.clientConn and only attempt to Close the saved non-nil conn, so double-close
won't panic (references: GRPCClient.Close, conn, c.clientConn, c.mu).

---

Nitpick comments:
In `@flow/client/client.go`:
- Around line 150-172: recreateConnection currently calls grpc.NewClient while
holding c.mu which can cause unnecessary contention; instead, inside
recreateConnection grab c.mu, check c.closed, copy target :=
c.clientConn.Target() and opts := append([]grpc.DialOption(nil), c.opts...) (or
otherwise copy the options), then unlock; call grpc.NewClient(target, opts...)
outside the lock; after successful creation re-lock, check c.closed again and if
closed close the newly created conn and return
backoff.Permanent(ErrClientClosed), otherwise swap in the new connection by
setting c.clientConn, c.realClient and nil-ing c.stream, capture old into a
local variable, unlock and finally close old; preserve original error wrapping
behavior on NewClient failures.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a7b9cdbe-582c-41d8-b841-4785836ffb7e

📥 Commits

Reviewing files that changed from the base of the PR and between 11e9c05 and e9007f7.

📒 Files selected for processing (1)
  • flow/client/client.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
flow/client/client.go (1)

114-151: ⚠️ Potential issue | 🟠 Major

Add a client-owned close signal to interrupt the backoff retry loop when Close() is called.

The backoff timer on line 115 is bound only to the caller's ctx through backoff.WithContext(). When backoff.Retry() is sleeping between retry attempts, calling Close() (which only sets c.closed = true and closes the connection) cannot interrupt the sleep. In client/internal/netflow/manager.go:89-103, the old client is closed before the receive context is canceled, creating a window where Receive() remains blocked until the backoff delay expires instead of returning immediately.

Add a context cancel function to the GRPCClient struct and invoke it from Close() so that the retry loop wakes up immediately without waiting for the caller's context cancellation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 114 - 151, The Receive backoff is only
bound to the caller ctx so Close() cannot interrupt backoff.Retry; add a
client-owned cancel function to GRPCClient (e.g., store a context.CancelFunc on
the struct initialized when the client is created) and use that client-owned
context (or a derived context that combines the caller ctx and the client ctx)
when constructing backOff via defaultBackoff/ backoff.WithContext so the retry
uses a context you control; then call that CancelFunc from Close() (and keep
Close idempotent with c.closed) to wake up backoff.Retry immediately and allow
Receive() to return.
♻️ Duplicate comments (1)
flow/client/client.go (1)

83-95: ⚠️ Potential issue | 🔴 Critical

Make Close() nil-safe and idempotent.

Line 91 still calls conn.Close() after clientConn has been detached, so a second or concurrent Close() can hit a nil conn and panic instead of becoming a no-op. Return early when c.closed is already set and guard the detached conn before closing it.

🔒 Minimal fix
 func (c *GRPCClient) Close() error {
 	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		return nil
+	}
 	c.closed = true
 	c.stream = nil
 	conn := c.clientConn
 	c.clientConn = nil
 	c.mu.Unlock()
 
+	if conn == nil {
+		return nil
+	}
 	if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
 		return fmt.Errorf("close client connection: %w", err)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 83 - 95, The GRPCClient.Close method must
be made nil-safe and idempotent: at the start of Close() check c.closed under
c.mu and return early if already set, and when detaching clientConn capture it
into a local variable (e.g., conn := c.clientConn), set c.clientConn = nil while
still under the lock, then after unlocking check that conn != nil before calling
conn.Close(); also ensure any error handling still ignores context.Canceled.
Update the Close method (GRPCClient.Close, use c.mu and c.closed and clientConn)
to follow this guarded, idempotent pattern.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client.go`:
- Around line 191-210: The new event stream returned by cl.Events(ctx) must be
closed on any early return to avoid leaking streams; wrap or explicitly call the
stream close method (e.g., stream.CloseSend()) whenever you return after
stream.Send(...) or checkHeader(...) fails and also before returning when
c.closed is true, and only assign c.stream after all checks succeed; update the
error paths around cl.Events, stream.Send, checkHeader, and the c.closed branch
to ensure the opened stream is closed on every failure path.

---

Outside diff comments:
In `@flow/client/client.go`:
- Around line 114-151: The Receive backoff is only bound to the caller ctx so
Close() cannot interrupt backoff.Retry; add a client-owned cancel function to
GRPCClient (e.g., store a context.CancelFunc on the struct initialized when the
client is created) and use that client-owned context (or a derived context that
combines the caller ctx and the client ctx) when constructing backOff via
defaultBackoff/ backoff.WithContext so the retry uses a context you control;
then call that CancelFunc from Close() (and keep Close idempotent with c.closed)
to wake up backoff.Retry immediately and allow Receive() to return.

---

Duplicate comments:
In `@flow/client/client.go`:
- Around line 83-95: The GRPCClient.Close method must be made nil-safe and
idempotent: at the start of Close() check c.closed under c.mu and return early
if already set, and when detaching clientConn capture it into a local variable
(e.g., conn := c.clientConn), set c.clientConn = nil while still under the lock,
then after unlocking check that conn != nil before calling conn.Close(); also
ensure any error handling still ignores context.Canceled. Update the Close
method (GRPCClient.Close, use c.mu and c.closed and clientConn) to follow this
guarded, idempotent pattern.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2bc36480-545d-4bbf-9136-5bd930195a17

📥 Commits

Reviewing files that changed from the base of the PR and between 11e9c05 and 03f023b.

📒 Files selected for processing (1)
  • flow/client/client.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
flow/client/client.go (1)

83-96: ⚠️ Potential issue | 🟠 Major

Close() panics on double-call due to missing guard.

If Close() is called twice, the second call will capture conn = nil (since c.clientConn was set to nil on the first call) and then conn.Close() will panic with a nil pointer dereference.

Add an early return when the client is already closed to make Close() idempotent:

🔒 Proposed fix for idempotent close
 func (c *GRPCClient) Close() error {
 	c.mu.Lock()
+	if c.closed {
+		c.mu.Unlock()
+		return nil
+	}
 	c.closed = true
 	c.stream = nil
 	conn := c.clientConn
 	c.clientConn = nil
+	c.realClient = nil
 	c.mu.Unlock()

+	if conn == nil {
+		return nil
+	}
 	if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
 		return fmt.Errorf("close client connection: %w", err)
 	}

 	return nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 83 - 96, The Close method on GRPCClient
can panic when called twice because conn is set to nil on the first call; make
Close idempotent by checking the closed flag or nil clientConn under the mutex
and returning early if already closed (e.g., in GRPCClient.Close acquire c.mu,
if c.closed || c.clientConn == nil return nil, otherwise set c.closed = true and
nil out c.clientConn and c.stream, then release the lock and call conn.Close()).
Ensure you reference and update c.mu, c.closed, c.clientConn and c.stream in
that guarded critical section so the second caller returns without attempting to
call conn.Close().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@flow/client/client.go`:
- Around line 83-96: The Close method on GRPCClient can panic when called twice
because conn is set to nil on the first call; make Close idempotent by checking
the closed flag or nil clientConn under the mutex and returning early if already
closed (e.g., in GRPCClient.Close acquire c.mu, if c.closed || c.clientConn ==
nil return nil, otherwise set c.closed = true and nil out c.clientConn and
c.stream, then release the lock and call conn.Close()). Ensure you reference and
update c.mu, c.closed, c.clientConn and c.stream in that guarded critical
section so the second caller returns without attempting to call conn.Close().

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 806d684d-58ab-4b7e-9662-f7039508e320

📥 Commits

Reviewing files that changed from the base of the PR and between 03f023b and 62a6dc9.

📒 Files selected for processing (1)
  • flow/client/client.go

…ction

Introduce `TestReceive_ServerClosesStream` to verify the client's ability to recover and process acknowledgments after the server closes the stream. Enhance test server with a controlled stream closure mechanism.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client_test.go`:
- Around line 322-327: The timeout select's fatal message is misleading: it
waits on closeDone (the Close() completion signal) but reports "Receive did not
return"; update the t.Fatal call in the test surrounding closeDone to mention
Close blocked instead (e.g., "Close did not return — blocked in retry loop") so
it accurately reflects that Close() didn't complete; modify the test's select
handling of closeDone/timeout accordingly.
- Line 333: The test currently ignores the error returned by NewClient (call
site: client, _ := flow.NewClient("http://"+server.addr, "test-payload",
"test-signature", 1*time.Second)), which can lead to nil/invalid client usage;
change the call to capture the error (client, err := flow.NewClient(...)) and
fail the test immediately on error (e.g., t.Fatalf or require.NoError) so the
test stops with a clear message instead of proceeding with a bad client.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b8b2982f-fc1a-4a9b-8de5-b6cc20675431

📥 Commits

Reviewing files that changed from the base of the PR and between 62a6dc9 and aff1428.

📒 Files selected for processing (1)
  • flow/client/client_test.go

pappz added 2 commits March 31, 2026 23:09
Introduce `connTrackListener` to simulate HTTP/2 RST_STREAM with PROTOCOL_ERROR for testing. Refactor and rename `TestReceive_ServerClosesStream` to `TestReceive_ProtocolErrorStreamReconnect` to verify client recovery on protocol errors.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
flow/client/client_test.go (1)

390-390: ⚠️ Potential issue | 🟡 Minor

Handle the NewClient error before starting the test body.

If construction fails here, the rest of the test can panic or fail with a misleading timeout instead of the real setup error.

🛠️ Minimal fix
-	client, _ := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
+	client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
+	require.NoError(t, err)

Based on learnings: methods returning (T, error) should propagate the error and return early once err != nil.

#!/bin/bash
# Confirm that `NewClient` returns an error and this test currently discards it.
rg -n 'func NewClient\(' flow/client/client.go
rg -n 'client,\s*_\s*:=\s*flow\.NewClient\(' flow/client/client_test.go
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` at line 390, The test currently ignores the error
returned by flow.NewClient which can mask setup failures; update the test to
capture the error (client, err := flow.NewClient(...)) and immediately fail the
test if err != nil (e.g., t.Fatalf or t.Fatal with the error) before proceeding
with the rest of the test body so setup errors surface instead of causing
misleading timeouts or panics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client_test.go`:
- Around line 467-480: The test currently only verifies stream recovery but not
that the client recreated the TCP connection; after calling
server.listener.sendRSTStream(1) update the test to wait for connTrackListener
to report a second accepted connection (e.g., poll or wait on
connTrackListener.AcceptedConnections/count or its accept event channel with a
timeout) before sending the second ack (server.acks <-
&proto.FlowEventAck{...}), and assert that connTrackListener shows at least 2
accepted connections (or the accept channel received a second event) to prove a
new connection was created; keep the existing receivedAfterReconnect/ackCount
checks but add this explicit connection-count wait/assert using
connTrackListener.
- Around line 343-345: The test currently only checks for a non-nil error when
reading from done (case err := <-done) which is too weak for the scenario where
the client was closed before Receive() started; replace the generic
require.Error(t, err) with a precise assertion that the error is the
permanent-close sentinel (use require.ErrorIs(t, err, flow.ErrClientClosed) or
an equivalent errors.Is check) so the test explicitly verifies that Receive()
returns flow.ErrClientClosed in the pre-closed case (referencing the done
channel receive and the Receive() call).

---

Duplicate comments:
In `@flow/client/client_test.go`:
- Line 390: The test currently ignores the error returned by flow.NewClient
which can mask setup failures; update the test to capture the error (client, err
:= flow.NewClient(...)) and immediately fail the test if err != nil (e.g.,
t.Fatalf or t.Fatal with the error) before proceeding with the rest of the test
body so setup errors surface instead of causing misleading timeouts or panics.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 9be27528-4e12-4ea1-bd48-a6d79631cb6f

📥 Commits

Reviewing files that changed from the base of the PR and between aff1428 and b4194f8.

📒 Files selected for processing (1)
  • flow/client/client_test.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
flow/client/client_test.go (2)

342-347: ⚠️ Potential issue | 🟡 Minor

Assert flow.ErrClientClosed in the pre-closed case.

The client is closed before Receive() starts, so require.Error(t, err) is too weak — an unrelated dial/transport failure would also pass. This test should verify the permanent-close contract by asserting the specific error.

Proposed fix
 	select {
 	case err := <-done:
-		require.Error(t, err)
+		require.ErrorIs(t, err, flow.ErrClientClosed)
 	case <-time.After(2 * time.Second):
 		t.Fatal("Receive did not return after Close — stuck in retry loop")
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` around lines 342 - 347, The test currently only
checks require.Error(t, err) when Receive() returns from the done channel;
change this to assert the specific permanent-close error by using
require.ErrorIs(t, err, flow.ErrClientClosed) (or require.True(t, errors.Is(err,
flow.ErrClientClosed))) so the test verifies Receive() returns ErrClientClosed
when the client was closed before Receive started; locate the assertion in the
select branch handling err := <-done and replace the weak require.Error check
with the ErrorIs/errors.Is check against flow.ErrClientClosed.

468-480: ⚠️ Potential issue | 🟡 Minor

The reconnect test never proves a new connection was created.

After sendRSTStream(1), the client can still pass by opening a fresh stream on the existing connection. Since this PR specifically adds connection recreation on codes.Internal, the test should assert that connTrackListener observed a second accepted TCP connection before sending the second ack.

Proposed fix — poll for second TCP connection
 	server.listener.sendRSTStream(1)

-	// Wait for client to reconnect, then send second ack
-	time.Sleep(2 * time.Second)
+	// Wait for client to recreate TCP connection
+	deadline := time.After(3 * time.Second)
+	for {
+		server.listener.mu.Lock()
+		recreated := len(server.listener.conns) >= 2
+		server.listener.mu.Unlock()
+		if recreated {
+			break
+		}
+		select {
+		case <-deadline:
+			t.Fatal("timeout waiting for client to recreate the TCP connection")
+		case <-time.After(10 * time.Millisecond):
+		}
+	}
+
+	// Send second ack after new connection is established
 	server.acks <- &proto.FlowEventAck{EventId: []byte("after-close")}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` around lines 468 - 480, The test fails to prove a
new TCP connection was created after server.listener.sendRSTStream(1); update
the test to poll connTrackListener (the connection-tracking listener used in the
test) and wait until it reports a second accepted connection before sending the
second ack (server.acks <- &proto.FlowEventAck{EventId: []byte("after-close")});
specifically, after calling sendRSTStream(1) loop/poll with a timeout until
connTrackListener shows a new accept (i.e., an accepted connection count > 1)
and only then send the ack and wait on receivedAfterReconnect so the test
verifies connection recreation rather than just a new stream on the same
connection.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@flow/client/client_test.go`:
- Around line 342-347: The test currently only checks require.Error(t, err) when
Receive() returns from the done channel; change this to assert the specific
permanent-close error by using require.ErrorIs(t, err, flow.ErrClientClosed) (or
require.True(t, errors.Is(err, flow.ErrClientClosed))) so the test verifies
Receive() returns ErrClientClosed when the client was closed before Receive
started; locate the assertion in the select branch handling err := <-done and
replace the weak require.Error check with the ErrorIs/errors.Is check against
flow.ErrClientClosed.
- Around line 468-480: The test fails to prove a new TCP connection was created
after server.listener.sendRSTStream(1); update the test to poll
connTrackListener (the connection-tracking listener used in the test) and wait
until it reports a second accepted connection before sending the second ack
(server.acks <- &proto.FlowEventAck{EventId: []byte("after-close")});
specifically, after calling sendRSTStream(1) loop/poll with a timeout until
connTrackListener shows a new accept (i.e., an accepted connection count > 1)
and only then send the ack and wait on receivedAfterReconnect so the test
verifies connection recreation rather than just a new stream on the same
connection.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c1775ea8-0e1a-4233-befd-7547146ac052

📥 Commits

Reviewing files that changed from the base of the PR and between b4194f8 and 441a85f.

📒 Files selected for processing (1)
  • flow/client/client_test.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client_test.go`:
- Around line 474-480: The test reads the connection count after calling
server.listener.sendRSTStream, causing a race where the client may reconnect
before connsBefore is captured; change the sequence so you call
server.listener.connCount() and store it in connsBefore before invoking
server.listener.sendRSTStream(1), then wait with require.Eventually comparing
the new connCount() to that saved connsBefore to reliably detect a new TCP
connection; update references around server.listener.sendRSTStream,
server.listener.connCount, and the connsBefore variable accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: be6b05f6-67d1-4a23-a258-f3209f91df6a

📥 Commits

Reviewing files that changed from the base of the PR and between 441a85f and 910e666.

📒 Files selected for processing (1)
  • flow/client/client_test.go

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 1, 2026

Caution

Review failed

An error occurred during the review process. Please try again later.

📝 Walkthrough

Walkthrough

Consolidates GRPC client stream/connection lifecycle: single mutex + closed flag, persisted dial options, exported ErrClientClosed and Send; refactors Close/Receive to snapshot stream under lock, recreates connection on gRPC codes.Internal, and extends tests for concurrent shutdown and transport-level stream resets.

Changes

Cohort / File(s) Summary
Client core
flow/client/client.go
Replaced per-stream locking with single mu and closed flag; added exported ErrClientClosed and Send(event *proto.FlowEvent) error; persist opts []grpc.DialOption; changed Close() to mark closed, nil stream, detach+close conn outside lock; added recreateConnection() and isCancellation(); refactored Receive to establishStream() + receive() and adjusted retry logic (recreate conn on codes.Internal).
Tests / harness
flow/client/client_test.go
Added connTrackListener and server closeStream signal to inject HTTP/2 RST_STREAM/codes.Internal; updated test server to use tracked listener; added concurrency and lifecycle tests (TestNewClient_PermanentClose, TestNewClient_CloseVerify, TestClose_WhileReceiving, TestReceive_ProtocolErrorStreamReconnect).

Sequence Diagram(s)

sequenceDiagram
  participant Caller
  participant Client
  participant Conn as "grpc.ClientConn / realClient"
  participant Stream

  Caller->>Client: Receive(ctx)
  Client->>Client: lock mu, snapshot clientConn/opts
  Client->>Conn: create Events stream
  Client->>Stream: send initiator FlowEvent
  alt initiator send & header ok
    Client->>Client: lock mu, set c.stream (if not closed)
    Client->>Stream: receive messages (loop)
  else send or header fail
    Client->>Caller: return error
  end

  alt codes.Internal during receive
    Client->>Client: recreateConnection()
    Client->>Conn: dial new conn with stored opts
    Client->>Client: swap clientConn/realClient, nil out stream (under mu)
    Client->>Conn: close old conn (outside mu)
    Client->>Caller: return error to trigger re-establish
  end

  alt context canceled or codes.Canceled
    Client->>Client: treat as permanent -> stop retries
    Client->>Caller: return cancellation error
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested reviewers

  • lixmal
  • mlsmaycon

Poem

🐰 I nudged the stream and tightened one small lock,
I closed the old pathway, then dialed a brand new rock,
I send a little ping, I listen for reply,
If gremlins break the wire, I stitch a fresh supply,
Hoppity reconnect — the flow hops on nearby.

🚥 Pre-merge checks | ✅ 1 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title '[client] Fix/grpc retry' is vague and uses generic phrasing ('Fix/grpc retry') that doesn't clearly convey the specific changes or primary intent from the changeset. Consider a more specific title that highlights the main change, such as '[client] Fix Receive stuck after Close and handle stream reconnection' or '[client] Improve gRPC retry and connection handling'.
✅ Passed checks (1 passed)
Check name Status Explanation
Description check ✅ Passed The PR description covers the main changes and includes most required template sections, though the issue ticket field is missing and the 'Created tests' checkbox is unchecked despite new tests being added.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/grpc-retry

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
flow/client/client_test.go (1)

366-376: Wait for Receive() to become live before calling Close().

TestNewClient_CloseVerify starts Close() immediately, and TestClose_WhileReceiving uses a fixed 100 ms sleep. On a slow runner both tests can pass with Close() winning before the stream is established, so they stop exercising the active-receive shutdown path this PR is meant to protect. Gate Close() on a deterministic ready signal such as the first initiator ack.

One way to make the tests deterministic
+ ready := make(chan struct{})
  go func() {
      done <- client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
+         if msg.IsInitiator {
+             select {
+             case <-ready:
+             default:
+                 close(ready)
+             }
+         }
          return nil
      })
  }()
+
+ select {
+ case <-ready:
+ case <-time.After(2 * time.Second):
+     t.Fatal("Receive never established the stream")
+ }

Apply the same pattern in TestClose_WhileReceiving instead of the fixed sleep.

Also applies to: 399-406

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` around lines 366 - 376, The tests
(TestNewClient_CloseVerify and TestClose_WhileReceiving) race Close() against
Receive(), so change them to wait deterministically for Receive to be live by
gating Close() on a "ready" signal emitted from the Receive callback (for
example when the first initiator ack / proto.FlowEventAck is received). Modify
the goroutine that calls client.Receive(ctx, ...) to send a value on a ready
channel when the first ack arrives, and in the goroutine that calls
client.Close() wait on that ready channel (instead of sleeping or closing
immediately) before invoking client.Close(); apply the same pattern for both
test locations so Close() always runs after Receive has established the stream.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client_test.go`:
- Around line 26-27: The shared server-wide ack channel s.acks can deliver an
injected ack to a stale Events() handler (which still attempts stream.Send()),
causing the ack to be dropped and the reconnect test to flake; fix by routing
injected acks only to the current Events() invocation or by ensuring the
previous handler has fully exited before enqueueing the second ack—e.g., change
Events() to create and use a per-handler ack channel (or register a
handler-specific ack receiver) instead of using the global s.acks, or add a
synchronization signal from Events() (handler exit notification) and wait on it
before pushing the test ack; ensure all references to s.acks consumption (the
draining loop and stream.Send() calls) read from the handler-local channel so
acks go to the fresh stream only.

---

Nitpick comments:
In `@flow/client/client_test.go`:
- Around line 366-376: The tests (TestNewClient_CloseVerify and
TestClose_WhileReceiving) race Close() against Receive(), so change them to wait
deterministically for Receive to be live by gating Close() on a "ready" signal
emitted from the Receive callback (for example when the first initiator ack /
proto.FlowEventAck is received). Modify the goroutine that calls
client.Receive(ctx, ...) to send a value on a ready channel when the first ack
arrives, and in the goroutine that calls client.Close() wait on that ready
channel (instead of sleeping or closing immediately) before invoking
client.Close(); apply the same pattern for both test locations so Close() always
runs after Receive has established the stream.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 597db74d-acf7-43cd-897a-4e868e7a69e5

📥 Commits

Reviewing files that changed from the base of the PR and between 441a85f and 0b8704f.

📒 Files selected for processing (1)
  • flow/client/client_test.go

pappz added 6 commits April 1, 2026 11:30
Ensure the old `Events` handler exits fully before proceeding in the reconnection test to avoid dropped acknowledgments on a broken stream. Add a `handlerDone` channel to synchronize handler exits.
Introduce `target` field to store the gRPC connection target directly, simplifying reconnections and ensuring consistent connection reuse logic.
…ng for `DeadlineExceeded`

Refactor error handling to include `DeadlineExceeded` scenarios alongside `Canceled`. Update related condition checks for consistency.
…tions

Introduce `connGen` to track connection generations and ensure that stale `recreateConnection` calls do not override newer connections. Update stream establishment and reconnection logic to incorporate generation validation.
Refine backoff reset logic to ensure it only occurs for sufficiently long-lived stream connections, avoiding interference with `MaxElapsedTime`.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@flow/client/client_test.go`:
- Around line 408-427: Replace the flaky time.Sleep-based synchronization by
adding a concrete readiness and completion signal: have the goroutine that calls
client.Receive(ctx, ...) signal when the stream is established (e.g., send on a
receiveReady channel) and again when Receive returns (e.g., close or send on
receiveDone); in the test, wait for the receiveReady signal before invoking
client.Close(), then after Close() returns wait for receiveDone to ensure the
background Receive goroutine has exited; reference the existing client.Receive
and client.Close calls and the done channel to implement these two new
synchronization channels so Close is gated on real readiness and the test waits
for Receive to finish.

In `@flow/client/client.go`:
- Around line 136-149: When receive() returns an error, ensure the cached stream
is dropped immediately: set c.stream = nil right after "if err :=
c.receive(stream, msgHandler); err != nil {" and before any branching
(isCancellation or status.Internal handling) so concurrent Send() calls won't
target a dead stream; keep existing cancellation handling (isCancellation ->
backoff.Permanent) and the Internal path that calls recreateConnection(), but
operate on a cleared c.stream to fail fast and avoid writes to the broken stream
(refer to methods/fields: c.receive, c.stream, isCancellation,
c.recreateConnection, backoff.Permanent).
- Around line 72-73: The dial target construction uses parsedURL.Hostname() and
parsedURL.Port() which strips IPv6 brackets and can produce trailing colons;
instead use parsedURL.Host directly when building the target stored in target
(the value passed to grpc.NewClient and cached for reconnects). Update the code
that sets target to use parsedURL.Host (preserving bracketed IPv6 and omitting
an extra colon when no port is present) so grpc.NewClient(target, opts...)
receives a valid host[:port] string and reconnect logic uses the same original
host formatting.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3ff690f5-30aa-4f5e-b7df-1f987ab16264

📥 Commits

Reviewing files that changed from the base of the PR and between 0b8704f and d5078d0.

📒 Files selected for processing (2)
  • flow/client/client.go
  • flow/client/client_test.go

pappz added 3 commits April 1, 2026 12:12
Add `minHealthyDuration` constant to ensure stream retries only reset the backoff timer if the stream survives beyond a minimum duration. Prevents unhealthy, short-lived streams from interfering with `MaxElapsedTime`.
parsedURL.Hostname() strips IPv6 brackets. For http://[::1]:443, this turns it into ::1:443, which is not a valid host:port target for gRPC. Additionally, fmt.Sprintf("%s:%s", hostname, port) produces a trailing colon when the URL has no explicit port—http://example.com becomes example.com:. Both cases break the initial dial and reconnect paths. Use parsedURL.Host directly instead.
…ment in tests

Introduce `handlerStarted` channel in the test server to signal when the server-side handler begins, ensuring robust synchronization between client and server during stream establishment. Update relevant test cases to wait for this signal before proceeding.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
flow/client/client_test.go (1)

488-496: Consider using handlerStarted for deterministic timing.

Line 489 uses time.Sleep(500ms) to wait for stream establishment, while TestClose_WhileReceiving uses server.handlerStarted for the same purpose. Using the channel-based approach would be more deterministic:

♻️ Suggested change
-	// Wait for stream to be established, then send first ack
-	time.Sleep(500 * time.Millisecond)
+	// Wait for stream to be established, then send first ack
+	select {
+	case <-server.handlerStarted:
+	case <-time.After(3 * time.Second):
+		t.Fatal("timeout waiting for stream establishment")
+	}
 	server.acks <- &proto.FlowEventAck{EventId: []byte("before-close")}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` around lines 488 - 496, Replace the
nondeterministic Sleep in the test with the channel-based synchronization used
elsewhere: wait on server.handlerStarted (like in TestClose_WhileReceiving) to
ensure the stream handler is ready before sending the first ack to server.acks
and proceeding to the select that awaits receivedFirst, so the test uses
deterministic channel signaling instead of time.Sleep.
flow/client/client.go (1)

127-174: Consider extracting error handling to reduce cognitive complexity.

SonarCloud flags cognitive complexity at 22 (threshold 20). The nested error handling after c.receive() could be extracted into a helper method:

♻️ Suggested refactor
+func (c *GRPCClient) handleReceiveError(err error, gen uint64, streamStart time.Time, backOff backoff.BackOff) error {
+	if isContextDone(err) {
+		return backoff.Permanent(err)
+	}
+
+	if time.Since(streamStart) >= minHealthyDuration {
+		backOff.Reset()
+	}
+
+	if s, ok := status.FromError(err); ok && s.Code() == codes.Internal {
+		log.Warnf("connection corrupt, attempting reconnection: %v", err)
+		if err := c.recreateConnection(gen); err != nil {
+			log.Errorf("recreate connection: %v", err)
+			return err
+		}
+		log.Infof("connection recreated successfully")
+		return fmt.Errorf("connection recreated, re-establishing stream")
+	}
+
+	log.Errorf("receive failed: %v", err)
+	return fmt.Errorf("receive: %w", err)
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 127 - 174, The nested error handling
block inside GRPCClient.Receive (the code immediately after calling c.receive in
the operation closure) should be extracted into a helper function (e.g.,
handleReceiveError(err, streamStart time.Time, backOff backoff.BackOff, gen
connectionGen) error) to reduce cognitive complexity; move the checks currently
performed there—isContextDone(err) → return backoff.Permanent(err), the
minHealthyDuration check and backOff.Reset(), the status.FromError(err) check
for codes.Internal that calls c.recreateConnection(gen) and logs/returns the
special "connection recreated" error, and the final log.Errorf/return
fmt.Errorf("receive: %w", err)—into that helper and have operation call it,
preserving logging and error-wrapping behavior. Ensure the helper is pure to the
operation closure (accepts needed params like streamStart, backOff, gen and
returns the same error semantics) so control flow and backoff.Retry logic remain
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@flow/client/client_test.go`:
- Around line 488-496: Replace the nondeterministic Sleep in the test with the
channel-based synchronization used elsewhere: wait on server.handlerStarted
(like in TestClose_WhileReceiving) to ensure the stream handler is ready before
sending the first ack to server.acks and proceeding to the select that awaits
receivedFirst, so the test uses deterministic channel signaling instead of
time.Sleep.

In `@flow/client/client.go`:
- Around line 127-174: The nested error handling block inside GRPCClient.Receive
(the code immediately after calling c.receive in the operation closure) should
be extracted into a helper function (e.g., handleReceiveError(err, streamStart
time.Time, backOff backoff.BackOff, gen connectionGen) error) to reduce
cognitive complexity; move the checks currently performed
there—isContextDone(err) → return backoff.Permanent(err), the minHealthyDuration
check and backOff.Reset(), the status.FromError(err) check for codes.Internal
that calls c.recreateConnection(gen) and logs/returns the special "connection
recreated" error, and the final log.Errorf/return fmt.Errorf("receive: %w",
err)—into that helper and have operation call it, preserving logging and
error-wrapping behavior. Ensure the helper is pure to the operation closure
(accepts needed params like streamStart, backOff, gen and returns the same error
semantics) so control flow and backoff.Retry logic remain unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: afb4e197-5b51-455b-a8a6-8420dda955ec

📥 Commits

Reviewing files that changed from the base of the PR and between d5078d0 and bb7406a.

📒 Files selected for processing (2)
  • flow/client/client.go
  • flow/client/client_test.go

pappz added 2 commits April 1, 2026 12:30
…tream establishment sync in tests

Refactor acknowledgment tracking in tests to use an `atomic.Int32` counter instead of a map. Replace fixed sleep with robust synchronization by waiting on `handlerStarted` signal for stream establishment.
Refactor error handling in `receive` to a dedicated `handleReceiveError` method. Streamlines the main logic and isolates error recovery, including backoff reset and connection recreation.
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
flow/client/client.go (1)

140-142: ⚠️ Potential issue | 🟡 Minor

Stale stream reference persists during backoff window.

When receive() fails with a non-Internal error, c.stream is not cleared. This leaves Send() targeting a dead stream until the next establishStream() call. Consider clearing c.stream at the start of handleReceiveError to fail Send() fast.

🛠️ Proposed fix
 func (c *GRPCClient) handleReceiveError(err error, gen uint64, streamStart time.Time, backOff backoff.BackOff) error {
+	c.mu.Lock()
+	c.stream = nil
+	c.mu.Unlock()
+
 	if isContextDone(err) {
 		return backoff.Permanent(err)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client.go` around lines 140 - 142, handleReceiveError currently
leaves c.stream set when receive() fails, so Send() keeps targeting a dead
stream; modify handleReceiveError to clear the client's stream reference at the
start (set c.stream = nil) under the same mutex/lock you use to read/write
c.stream (e.g., c.mu) so concurrent Send() will fail fast, then proceed with the
existing backoff/reconnect logic and any cleanup.
🧹 Nitpick comments (3)
flow/client/client_test.go (3)

396-397: Consider asserting the specific error type.

require.Error(t, err) is weak — consider require.ErrorIs(t, err, flow.ErrClientClosed) for consistency with TestNewClient_PermanentClose.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` around lines 396 - 397, The test currently
asserts only that an error was returned from the done channel (case err :=
<-done: require.Error(t, err)); update this to assert the specific error type by
replacing that call with require.ErrorIs(t, err, flow.ErrClientClosed) so it
matches the pattern used in TestNewClient_PermanentClose and verifies the client
closed error is returned.

260-263: Consider using handlerStarted for deterministic timing.

The 100ms sleep before cancel could be flaky on slow CI. Using server.handlerStarted would ensure the stream is established before cancellation.

♻️ More deterministic approach
+	started := make(chan struct{})
 	go func() {
-		time.Sleep(100 * time.Millisecond)
+		<-started
 		cancel()
 	}()
+
+	go func() {
+		select {
+		case <-server.handlerStarted:
+		case <-time.After(time.Second):
+		}
+		close(started)
+	}()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` around lines 260 - 263, Replace the flaky
time.Sleep-based cancellation by waiting for the server to signal the stream is
established via server.handlerStarted (or the equivalent handlerStarted
channel/condition) before calling cancel(); specifically, in the goroutine that
currently sleeps then calls cancel(), wait on server.handlerStarted (or select
on it with a timeout if desired) to ensure the stream is started, then call
cancel() so cancellation is deterministic and robust on slow CI.

307-307: Consider using handlerStarted for reliable stream establishment.

Similar to other tests, using server.handlerStarted would be more deterministic than a 500ms sleep.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@flow/client/client_test.go` at line 307, Replace the nondeterministic
time.Sleep(500 * time.Millisecond) with a deterministic wait on the test
server's handlerStarted synchronization primitive: use the server.handlerStarted
channel (or its exposed method) to block until the stream handler is ready, or
select on that channel with a short timeout to fail fast; update the test around
time.Sleep in client_test.go to wait for server.handlerStarted before proceeding
so the stream is reliably established.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@flow/client/client.go`:
- Around line 140-142: handleReceiveError currently leaves c.stream set when
receive() fails, so Send() keeps targeting a dead stream; modify
handleReceiveError to clear the client's stream reference at the start (set
c.stream = nil) under the same mutex/lock you use to read/write c.stream (e.g.,
c.mu) so concurrent Send() will fail fast, then proceed with the existing
backoff/reconnect logic and any cleanup.

---

Nitpick comments:
In `@flow/client/client_test.go`:
- Around line 396-397: The test currently asserts only that an error was
returned from the done channel (case err := <-done: require.Error(t, err));
update this to assert the specific error type by replacing that call with
require.ErrorIs(t, err, flow.ErrClientClosed) so it matches the pattern used in
TestNewClient_PermanentClose and verifies the client closed error is returned.
- Around line 260-263: Replace the flaky time.Sleep-based cancellation by
waiting for the server to signal the stream is established via
server.handlerStarted (or the equivalent handlerStarted channel/condition)
before calling cancel(); specifically, in the goroutine that currently sleeps
then calls cancel(), wait on server.handlerStarted (or select on it with a
timeout if desired) to ensure the stream is started, then call cancel() so
cancellation is deterministic and robust on slow CI.
- Line 307: Replace the nondeterministic time.Sleep(500 * time.Millisecond) with
a deterministic wait on the test server's handlerStarted synchronization
primitive: use the server.handlerStarted channel (or its exposed method) to
block until the stream handler is ready, or select on that channel with a short
timeout to fail fast; update the test around time.Sleep in client_test.go to
wait for server.handlerStarted before proceeding so the stream is reliably
established.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4bd9753b-2eb5-4d93-a7d6-a10f22a7a7ff

📥 Commits

Reviewing files that changed from the base of the PR and between bb7406a and dfbc0ef.

📒 Files selected for processing (2)
  • flow/client/client.go
  • flow/client/client_test.go

pappz added 2 commits April 2, 2026 09:57
The flow client had two competing retry loops: our custom exponential
backoff and gRPC's internal subchannel reconnection. When establishStream
failed, the same ClientConn was reused, allowing gRPC's internal backoff
state to accumulate and control dial timing independently.

Changes:
- Consolidate error handling into handleRetryableError, which now
 handles context cancellation, permanent errors, backoff reset,
 and connection recreation in a single path
- Call recreateConnection on every retryable error so each retry
 gets a fresh ClientConn with no internal backoff state
- Remove connGen tracking since Receive is sequential and protected
 by a new receiving guard against concurrent calls
- Reduce RandomizationFactor from 1 to 0.5 to avoid near-zero
 backoff intervals
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud bot commented Apr 2, 2026

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants