diff --git a/stream.go b/stream.go index fd13415..5fad76e 100644 --- a/stream.go +++ b/stream.go @@ -18,6 +18,9 @@ var ( ) const ( + // SSETypeDefault is the default type of SSEEvent. + SSETypeDefault = "message" + // SSETypeDone is the type of SSEEvent that indicates the prediction is done. The Data field will contain an empty JSON object. SSETypeDone = "done" @@ -39,7 +42,7 @@ type SSEEvent struct { } func (e *SSEEvent) decode(b []byte) error { - data := [][]byte{} + chunks := [][]byte{} for _, line := range bytes.Split(b, []byte("\n")) { // Parse field and value from line parts := bytes.SplitN(line, []byte{':'}, 2) @@ -62,17 +65,17 @@ func (e *SSEEvent) decode(b []byte) error { case "event": e.Type = string(value) case "data": - data = append(data, value) + chunks = append(chunks, value) default: // ignore } } - if !utf8.Valid(bytes.Join(data, []byte("\n"))) { + data := bytes.Join(chunks, []byte("\n")) + if !utf8.Valid(data) { return ErrInvalidUTF8Data } - - e.Data = string(bytes.Join(data, []byte("\n"))) + e.Data = string(data) return nil } @@ -202,9 +205,11 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l b := buf.Bytes() buf.Reset() - event := SSEEvent{} + event := SSEEvent{Type: SSETypeDefault} if err := event.decode(b); err != nil { errChan <- err + close(done) + return } sseChan <- event