From ea1136585af414cc3f48a53fdd2787d4f1d07851 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 6 Feb 2024 14:16:39 -0800 Subject: [PATCH 1/4] fix --- lib/stream.js | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index 012d6d03..02376e27 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -1,13 +1,14 @@ -// Attempt to use readable-stream if available, attempt to use the built-in stream module. +// Attempt to use the native ReadbleStream if available, otherwise check for the `readable-stream` package or the built-in Node.js `stream` module. let Readable; -try { - Readable = require("readable-stream").Readable; -} catch (e) { + +if (typeof ReadableStream === 'undefined') { try { - Readable = require("stream").Readable; + Readable = require("readable-stream").Readable || require("stream").Readable; } catch (e) { Readable = null; } +} else { + Readable = ReadableStream; } /** From b623f37c28fe0b33b19f982abd45bd27a9a876fa Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 6 Feb 2024 14:43:24 -0800 Subject: [PATCH 2/4] Switch to ReadableStream (by gpt) --- lib/stream.js | 184 ++++++++++++++++++++------------------------------ 1 file changed, 74 insertions(+), 110 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index 02376e27..85ef9bf5 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -1,28 +1,4 @@ -// Attempt to use the native ReadbleStream if available, otherwise check for the `readable-stream` package or the built-in Node.js `stream` module. -let Readable; - -if (typeof ReadableStream === 'undefined') { - try { - Readable = require("readable-stream").Readable || require("stream").Readable; - } catch (e) { - Readable = null; - } -} else { - Readable = ReadableStream; -} - -/** - * A server-sent event. - */ class ServerSentEvent { - /** - * Create a new server-sent event. - * - * @param {string} event The event name. - * @param {string} data The event data. - * @param {string} id The event ID. - * @param {number} retry The retry time. - */ constructor(event, data, id, retry) { this.event = event; this.data = data; @@ -30,107 +6,95 @@ class ServerSentEvent { this.retry = retry; } - /** - * Convert the event to a string. - */ toString() { - if (this.event === "output") { - return this.data; + let result = ''; + if (this.id) { + result += `id: ${this.id}\n`; } - - return ""; + if (this.event) { + result += `event: ${this.event}\n`; + } + if (this.retry) { + result += `retry: ${this.retry}\n`; + } + if (this.data) { + result += `data: ${this.data}\n`; + } + result += '\n'; + return result; } } -/** - * A stream of server-sent events. - */ -class Stream extends Readable { - /** - * Create a new stream of server-sent events. - * - * @param {string} url The URL to connect to. - * @param {object} options The fetch options. - */ +class Stream { constructor(url, options) { - if (!Readable) { - throw new Error( - "Readable streams are not supported. Please use Node.js 18 or later, or install the readable-stream package." - ); - } - - super(); this.url = url; this.options = options; + this.readableStream = new ReadableStream({ + start: async (controller) => { + const response = await fetch(this.url, { + ...this.options, + headers: { + Accept: 'text/event-stream', + }, + }); + const reader = response.body.getReader(); + let decoder = new TextDecoder(); + let eventBuffer = ''; + + const processChunk = (chunk) => { + eventBuffer += decoder.decode(chunk, {stream: true}); + let eolIndex; + while ((eolIndex = eventBuffer.indexOf('\n')) >= 0) { + const line = eventBuffer.slice(0, eolIndex).trim(); + eventBuffer = eventBuffer.slice(eolIndex + 1); + if (line === '') { + // End of an event + const event = this.parseEvent(eventBuffer); + controller.enqueue(event); + eventBuffer = ''; + } else { + // Accumulate data + eventBuffer += line + '\n'; + } + } + }; - this.event = null; - this.data = []; - this.lastEventId = null; - this.retry = null; - } + const push = async () => { + const {done, value} = await reader.read(); + if (done) { + controller.close(); + return; + } + processChunk(value); + push(); + }; - decode(line) { - if (!line) { - if (!this.event && !this.data.length && !this.lastEventId) { - return null; + push(); } - - const sse = new ServerSentEvent( - this.event, - this.data.join("\n"), - this.lastEventId - ); - - this.event = null; - this.data = []; - this.retry = null; - - return sse; - } - - if (line.startsWith(":")) { - return null; - } - - const [field, value] = line.split(": "); - if (field === "event") { - this.event = value; - } else if (field === "data") { - this.data.push(value); - } else if (field === "id") { - this.lastEventId = value; - } - - return null; - } - - async *[Symbol.asyncIterator]() { - const response = await fetch(this.url, { - ...this.options, - headers: { - Accept: "text/event-stream", - }, }); + } - for await (const chunk of response.body) { - const decoder = new TextDecoder("utf-8"); - const text = decoder.decode(chunk); - const lines = text.split("\n"); - for (const line of lines) { - const sse = this.decode(line); - if (sse) { - if (sse.event === "error") { - throw new Error(sse.data); - } - - yield sse; - - if (sse.event === "done") { - return; - } - } + parseEvent(rawData) { + const lines = rawData.trim().split('\n'); + let event = 'message', data = '', id = null, retry = null; + for (const line of lines) { + const [fieldName, value] = line.split(/:(.*)/, 2); + switch (fieldName) { + case 'event': + event = value.trim(); + break; + case 'data': + data += value.trim() + '\n'; + break; + case 'id': + id = value.trim(); + break; + case 'retry': + retry = parseInt(value.trim(), 10); + break; } } + return new ServerSentEvent(event, data.trim(), id, retry); } } From daa6376facbde7ea5c1ab71b970662ea7d6daf20 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 6 Feb 2024 14:46:34 -0800 Subject: [PATCH 3/4] undo sse change --- lib/stream.js | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index 85ef9bf5..6bbea622 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -1,4 +1,16 @@ + +/** + * A server-sent event. + */ class ServerSentEvent { + /** + * Create a new server-sent event. + * + * @param {string} event The event name. + * @param {string} data The event data. + * @param {string} id The event ID. + * @param {number} retry The retry time. + */ constructor(event, data, id, retry) { this.event = event; this.data = data; @@ -6,22 +18,15 @@ class ServerSentEvent { this.retry = retry; } + /** + * Convert the event to a string. + */ toString() { - let result = ''; - if (this.id) { - result += `id: ${this.id}\n`; - } - if (this.event) { - result += `event: ${this.event}\n`; - } - if (this.retry) { - result += `retry: ${this.retry}\n`; + if (this.event === "output") { + return this.data; } - if (this.data) { - result += `data: ${this.data}\n`; - } - result += '\n'; - return result; + + return ""; } } From 5a6cea3fbc9244eb86db2c9ea04eb813794a6008 Mon Sep 17 00:00:00 2001 From: Max Leiter Date: Tue, 6 Feb 2024 19:10:31 -0800 Subject: [PATCH 4/4] lint fixes --- lib/stream.js | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/stream.js b/lib/stream.js index 6bbea622..e2e564f5 100644 --- a/lib/stream.js +++ b/lib/stream.js @@ -43,13 +43,13 @@ class Stream { }, }); const reader = response.body.getReader(); - let decoder = new TextDecoder(); + const decoder = new TextDecoder(); let eventBuffer = ''; const processChunk = (chunk) => { eventBuffer += decoder.decode(chunk, {stream: true}); - let eolIndex; - while ((eolIndex = eventBuffer.indexOf('\n')) >= 0) { + let eolIndex = eventBuffer.indexOf('\n'); + while (eolIndex >= 0) { const line = eventBuffer.slice(0, eolIndex).trim(); eventBuffer = eventBuffer.slice(eolIndex + 1); if (line === '') { @@ -59,8 +59,10 @@ class Stream { eventBuffer = ''; } else { // Accumulate data - eventBuffer += line + '\n'; + eventBuffer += `${line}\n` } + + eolIndex = eventBuffer.indexOf('\n'); } }; @@ -81,7 +83,11 @@ class Stream { parseEvent(rawData) { const lines = rawData.trim().split('\n'); - let event = 'message', data = '', id = null, retry = null; + let event = 'message'; + let data = ''; + let id = null; + let retry = null; + for (const line of lines) { const [fieldName, value] = line.split(/:(.*)/, 2); switch (fieldName) { @@ -89,7 +95,7 @@ class Stream { event = value.trim(); break; case 'data': - data += value.trim() + '\n'; + data += `${value.trim()}\n`; break; case 'id': id = value.trim();