From 81da4c03366bec181bc62a7135f7a54c1b6243f1 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 17 Sep 2024 13:42:00 -0700 Subject: [PATCH 1/4] Define and use default SSE type --- stream.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index fd13415..ced2de7 100644 --- a/stream.go +++ b/stream.go @@ -18,6 +18,9 @@ var ( ) const ( + // SSETypeDefault is the default type of SSEEvent. + SSETypeDefault = "message" + // SSETypeDone is the type of SSEEvent that indicates the prediction is done. The Data field will contain an empty JSON object. SSETypeDone = "done" @@ -202,7 +205,7 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l b := buf.Bytes() buf.Reset() - event := SSEEvent{} + event := SSEEvent{Type: SSETypeDefault} if err := event.decode(b); err != nil { errChan <- err } From 3676ad650e950a4c8174319146e37ddbf80eeb7d Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 19 Sep 2024 04:46:04 -0700 Subject: [PATCH 2/4] Cache intermediate data when decoding event --- stream.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/stream.go b/stream.go index ced2de7..26dfe03 100644 --- a/stream.go +++ b/stream.go @@ -42,7 +42,7 @@ type SSEEvent struct { } func (e *SSEEvent) decode(b []byte) error { - data := [][]byte{} + chunks := [][]byte{} for _, line := range bytes.Split(b, []byte("\n")) { // Parse field and value from line parts := bytes.SplitN(line, []byte{':'}, 2) @@ -65,17 +65,17 @@ func (e *SSEEvent) decode(b []byte) error { case "event": e.Type = string(value) case "data": - data = append(data, value) + chunks = append(chunks, value) default: // ignore } } - if !utf8.Valid(bytes.Join(data, []byte("\n"))) { + data := bytes.Join(chunks, []byte("\n")) + if !utf8.Valid(data) { return ErrInvalidUTF8Data } - - e.Data = string(bytes.Join(data, []byte("\n"))) + e.Data = string(data) return nil } From a200711f27b497ccbd410427e5eec45e7a90b331 Mon Sep 17 00:00:00 2001 From: Mattt Date: Thu, 19 Sep 2024 04:50:19 -0700 Subject: [PATCH 3/4] Return an error to close the stream if decoding an event fails Co-authored-by: Philip Potter --- stream.go | 1 + 1 file changed, 1 insertion(+) diff --git a/stream.go b/stream.go index 26dfe03..48ea040 100644 --- a/stream.go +++ b/stream.go @@ -208,6 +208,7 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l event := SSEEvent{Type: SSETypeDefault} if err := event.decode(b); err != nil { errChan <- err + return err } sseChan <- event From a77e09dd9694dea570bd17f74275cf11f5c40c84 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Thu, 19 Sep 2024 04:52:35 -0700 Subject: [PATCH 4/4] Close done instead of returning error --- stream.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index 48ea040..5fad76e 100644 --- a/stream.go +++ b/stream.go @@ -208,7 +208,8 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l event := SSEEvent{Type: SSETypeDefault} if err := event.decode(b); err != nil { errChan <- err - return err + close(done) + return } sseChan <- event