From 103197f49ea77e600f9e4a0b77e2c9341f5b456d Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 17 Sep 2024 13:57:54 -0700 Subject: [PATCH 1/4] Move up deferred closing of response body --- stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream.go b/stream.go index fd13415..52f847c 100644 --- a/stream.go +++ b/stream.go @@ -166,6 +166,7 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l g.Go(func() error { defer close(lineChan) + defer resp.Body.Close() for { select { @@ -176,7 +177,6 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l default: line, err := reader.ReadBytes('\n') if err != nil { - defer resp.Body.Close() return err } lineChan <- line From e42a30e5ee01283924651218946ce63ab0f75d48 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 17 Sep 2024 13:58:07 -0700 Subject: [PATCH 2/4] Return context error instead of nil --- stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream.go b/stream.go index 52f847c..4e75df6 100644 --- a/stream.go +++ b/stream.go @@ -171,7 +171,7 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l for { select { case <-ctx.Done(): - return nil + return ctx.Err() case <-done: return nil default: From 8a826b891acdb8ec5ad4989f04d26005ebdbfee1 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 17 Sep 2024 13:59:28 -0700 Subject: [PATCH 3/4] Send to line channel in select with context check --- stream.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index 4e75df6..6fafc14 100644 --- a/stream.go +++ b/stream.go @@ -179,7 +179,11 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l if err != nil { return err } - lineChan <- line + select { + case lineChan <- line: + case <-ctx.Done(): + return ctx.Err() + } } } }) From 2e369997762889e884b0df0d7b9a70e3bd4ddf08 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 17 Sep 2024 14:08:10 -0700 Subject: [PATCH 4/4] Refactor final goroutine in streamPrediction --- stream.go | 33 +++++++++++++++------------------ 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/stream.go b/stream.go index 6fafc14..9883c23 100644 --- a/stream.go +++ b/stream.go @@ -222,30 +222,27 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l }() go func() { + err := g.Wait() + defer close(sseChan) defer close(errChan) - for { - select { - case <-ctx.Done(): + if err != nil { + if errors.Is(err, io.EOF) { + // Attempt to reconnect if the connection was closed before the stream was done + r.streamPrediction(ctx, prediction, lastEvent, sseChan, errChan) return - case <-done: - return - default: - err := g.Wait() - if err != nil { - if err == io.EOF { - // Attempt to reconnect if the connection was closed before the stream was done - r.streamPrediction(ctx, prediction, lastEvent, sseChan, errChan) - continue - } + } - if errors.Is(err, context.Canceled) { - return - } + if errors.Is(err, context.Canceled) { + // Context was canceled, simply return + return + } - errChan <- err - } + select { + case errChan <- err: + default: + // errChan is full or closed } } }()