From a66f5ec686af1b59ebaf2ee11a06382da382cdc4 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 09:10:21 -0500 Subject: [PATCH 01/15] Add dep on event-emitter --- bower.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bower.json b/bower.json index be709a7..f5e7853 100644 --- a/bower.json +++ b/bower.json @@ -22,6 +22,7 @@ "purescript-exceptions": "^6.0.0", "purescript-node-buffer": "^8.0.0", "purescript-nullable": "^6.0.0", - "purescript-prelude": "^6.0.0" + "purescript-prelude": "^6.0.0", + "purescript-node-event-emitter": "https://github.com/purescript-node/purescript-node-event-emitter.git#^3.0.0" } } From 0412dd6fa33ccd689bec587077a6b61b7c7db883 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 09:11:27 -0500 Subject: [PATCH 02/15] Format code via purs tidy --- src/Node/Stream.purs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 1f0c470..03eb611 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -95,7 +95,7 @@ onData r cb = where fromEither x = case x of - Left _ -> + Left _ -> throw "Stream encoding should not be set" Right buf -> pure buf @@ -103,13 +103,13 @@ onData r cb = read :: forall w . Readable w - -> Maybe Int - -> Effect (Maybe Buffer) + -> Maybe Int + -> Effect (Maybe Buffer) read r size = do v <- readEither r size case v of - Nothing -> pure Nothing - Just (Left _) -> throw "Stream encoding should not be set" + Nothing -> pure Nothing + Just (Left _) -> throw "Stream encoding should not be set" Just (Right b) -> pure (Just b) readString @@ -121,9 +121,9 @@ readString readString r size enc = do v <- readEither r size case v of - Nothing -> pure Nothing - Just (Left _) -> throw "Stream encoding should not be set" - Just (Right buf) -> Just <$> Buffer.toString enc buf + Nothing -> pure Nothing + Just (Left _) -> throw "Stream encoding should not be set" + Just (Right buf) -> Just <$> Buffer.toString enc buf readEither :: forall w From 3d9720cfa5bfff6af2c99fce7ad8d89cce75e59f Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 09:12:01 -0500 Subject: [PATCH 03/15] Add coercion to event emitter --- src/Node/Stream.purs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 03eb611..a380f57 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -7,6 +7,7 @@ module Node.Stream , Write() , Writable() , Duplex() + , toEventEmitter , onData , onDataString , onDataEither @@ -37,15 +38,17 @@ module Node.Stream import Prelude -import Effect (Effect) -import Effect.Exception (throw, Error) import Data.Either (Either(..)) import Data.Maybe (Maybe(..), fromMaybe) -import Node.Buffer (Buffer) import Data.Nullable as N +import Effect (Effect) +import Effect.Exception (throw, Error) import Effect.Uncurried (EffectFn1, mkEffectFn1) +import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding (Encoding) +import Node.EventEmitter (EventEmitter) +import Unsafe.Coerce (unsafeCoerce) -- | A stream. -- | @@ -70,6 +73,9 @@ type Writable r = Stream (write :: Write | r) -- | A duplex (readable _and_ writable stream) type Duplex = Stream (read :: Read, write :: Write) +toEventEmitter :: forall rw. Stream rw -> EventEmitter +toEventEmitter = unsafeCoerce + foreign import undefined :: forall a. a foreign import data Chunk :: Type From 3f5e4776d85c363a52313e2d0f50e5700642bd7c Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 09:14:26 -0500 Subject: [PATCH 04/15] Replace most onX fns with eventH --- src/Node/Stream.js | 32 -------------------------- src/Node/Stream.purs | 55 +++++++++++++++----------------------------- 2 files changed, 18 insertions(+), 69 deletions(-) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index cdd6748..5ed8c89 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -31,38 +31,6 @@ export function onDataEitherImpl(readChunk) { }; } -export function onEnd(s) { - return f => () => { - s.on("end", f); - }; -} - -export function onFinish(s) { - return f => () => { - s.on("finish", f); - }; -} - -export function onReadable(s) { - return f => () => { - s.on("readable", f); - }; -} - -export function onError(s) { - return f => () => { - s.on("error", e => { - f(e)(); - }); - }; -} - -export function onClose(s) { - return f => () => { - s.on("close", f); - }; -} - export function resume(s) { return () => { s.resume(); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index a380f57..45933d7 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -12,11 +12,11 @@ module Node.Stream , onDataString , onDataEither , setEncoding - , onReadable - , onEnd - , onFinish - , onClose - , onError + , readableH + , endH + , finishH + , closeH + , errorH , resume , pause , isPaused @@ -42,12 +42,13 @@ import Data.Either (Either(..)) import Data.Maybe (Maybe(..), fromMaybe) import Data.Nullable as N import Effect (Effect) -import Effect.Exception (throw, Error) +import Effect.Exception (Error, throw) import Effect.Uncurried (EffectFn1, mkEffectFn1) import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding (Encoding) -import Node.EventEmitter (EventEmitter) +import Node.EventEmitter (EventEmitter, EventHandle(..)) +import Node.EventEmitter.UtilTypes (EventHandle1, EventHandle0) import Unsafe.Coerce (unsafeCoerce) -- | A stream. @@ -194,40 +195,20 @@ setEncoding -> Effect Unit setEncoding r enc = setEncodingImpl r (show enc) --- | Listen for `readable` events. -foreign import onReadable - :: forall w - . Readable w - -> Effect Unit - -> Effect Unit +readableH :: forall w. EventHandle0 (Readable w) +readableH = EventHandle "readable" identity --- | Listen for `end` events. -foreign import onEnd - :: forall w - . Readable w - -> Effect Unit - -> Effect Unit +endH :: forall w. EventHandle0 (Readable w) +endH = EventHandle "end" identity --- | Listen for `finish` events. -foreign import onFinish - :: forall w - . Writable w - -> Effect Unit - -> Effect Unit +finishH :: forall w. EventHandle0 (Readable w) +finishH = EventHandle "finish" identity --- | Listen for `close` events. -foreign import onClose - :: forall w - . Stream w - -> Effect Unit - -> Effect Unit +closeH :: forall w. EventHandle0 (Readable w) +closeH = EventHandle "close" identity --- | Listen for `error` events. -foreign import onError - :: forall w - . Stream w - -> (Error -> Effect Unit) - -> Effect Unit +errorH :: forall w. EventHandle1 (Readable w) Error +errorH = EventHandle "error" mkEffectFn1 -- | Resume reading from the stream. foreign import resume :: forall w. Readable w -> Effect Unit From c59bb63f19578b0b926e59f77c349e96f9d318d6 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 10:34:55 -0500 Subject: [PATCH 05/15] Update exports to modern conventions --- src/Node/Stream.purs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 45933d7..35fcd2e 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -1,12 +1,12 @@ -- | This module provides a low-level wrapper for the [Node Stream API](https://nodejs.org/api/stream.html). module Node.Stream - ( Stream() - , Read() - , Readable() - , Write() - , Writable() - , Duplex() + ( Stream + , Read + , Readable + , Write + , Writable + , Duplex , toEventEmitter , onData , onDataString From 731c4178da68404a4aa37b5e326ffdfaa38d6324 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 10:50:58 -0500 Subject: [PATCH 06/15] Add missing handlers for Writable/Readable --- src/Node/Stream.purs | 44 ++++++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 35fcd2e..5b7e23e 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -12,11 +12,16 @@ module Node.Stream , onDataString , onDataEither , setEncoding - , readableH - , endH - , finishH , closeH , errorH + , drainH + , finishH + , pipeH + , unpipeH + , pauseH + , readableH + , resumeH + , endH , resume , pause , isPaused @@ -195,20 +200,35 @@ setEncoding -> Effect Unit setEncoding r enc = setEncodingImpl r (show enc) -readableH :: forall w. EventHandle0 (Readable w) -readableH = EventHandle "readable" identity +closeH :: forall rw. EventHandle0 (Stream rw) +closeH = EventHandle "close" identity -endH :: forall w. EventHandle0 (Readable w) -endH = EventHandle "end" identity +errorH :: forall rw. EventHandle1 (Stream rw) Error +errorH = EventHandle "error" mkEffectFn1 -finishH :: forall w. EventHandle0 (Readable w) +drainH :: forall r. EventHandle0 (Writable r) +drainH = EventHandle "drain" identity + +finishH :: forall r. EventHandle0 (Writable r) finishH = EventHandle "finish" identity -closeH :: forall w. EventHandle0 (Readable w) -closeH = EventHandle "close" identity +pipeH :: forall r w. EventHandle1 (Writable r) (Readable w) +pipeH = EventHandle "pipe" mkEffectFn1 -errorH :: forall w. EventHandle1 (Readable w) Error -errorH = EventHandle "error" mkEffectFn1 +unpipeH :: forall r w. EventHandle1 (Writable r) (Readable w) +unpipeH = EventHandle "unpipe" mkEffectFn1 + +pauseH :: forall w. EventHandle0 (Readable w) +pauseH = EventHandle "pause" identity + +readableH :: forall w. EventHandle0 (Readable w) +readableH = EventHandle "readable" identity + +resumeH :: forall w. EventHandle0 (Readable w) +resumeH = EventHandle "resume" identity + +endH :: forall w. EventHandle0 (Readable w) +endH = EventHandle "end" identity -- | Resume reading from the stream. foreign import resume :: forall w. Readable w -> Effect Unit From 4246dd220aec65aa156e9412dbbf3fcde32ea9bb Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 11:26:15 -0500 Subject: [PATCH 07/15] Add missing API for readable/writeable --- src/Node/Stream.js | 101 +++++++++-------- src/Node/Stream.purs | 264 ++++++++++++++++++++++++++++++------------- 2 files changed, 236 insertions(+), 129 deletions(-) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index 5ed8c89..2417a96 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -31,33 +31,29 @@ export function onDataEitherImpl(readChunk) { }; } -export function resume(s) { - return () => { - s.resume(); - }; -} +export const readableImpl = (r) => r.readable; -export function pause(s) { - return () => { - s.pause(); - }; -} +export const readableEndedImpl = (r) => r.readableEnded; -export function isPaused(s) { - return () => s.isPaused(); -} +export const readableFlowingImpl = (r) => r.readableFlowing; -export function pipe(r) { - return w => () => r.pipe(w); -} +export const readableHighWaterMarkImpl = (r) => r.readableHighWaterMark; -export function unpipe(r) { - return w => () => r.unpipe(w); -} +export const readableLengthImpl = (r) => r.readableLength; -export function unpipeAll(r) { - return () => r.unpipe(); -} +export const resumeImpl = (r) => r.resume(); + +export const pauseImpl = (r) => r.pause; + +export const isPausedImpl = (r) => r.isPaused; + +export const pipeImpl = (r, w) => r.pipe(w); + +export const pipeCbImpl = (r, w, cb) => r.pipe(w, cb); + +export const unpipeAllImpl = (r) => r.unpipe(); + +export const unpipeImpl = (r, w) => r.unpipe(w); export function readImpl(readChunk) { return Nothing => Just => r => s => () => { @@ -70,21 +66,17 @@ export function readImpl(readChunk) { }; } -export function writeImpl(w) { - return chunk => done => () => w.write(chunk, null, done); -} +export const writeImpl = (w, buf) => w.write(buf); -export function writeStringImpl(w) { - return enc => s => done => () => w.write(s, enc, done); -} +export const writeCbImpl = (w, buf) => w.write(buf); -export function cork(w) { - return () => w.cork(); -} +export const writeStringImpl = (w, str, enc) => w.write(str, enc); -export function uncork(w) { - return () => w.uncork(); -} +export const writeStringCbImpl = (w, str, enc, cb) => w.write(str, enc, cb); + +export const corkImpl = (w) => w.cork(); + +export const uncorkImpl = (w) => w.uncork(); export function setDefaultEncodingImpl(w) { return enc => () => { @@ -92,20 +84,31 @@ export function setDefaultEncodingImpl(w) { }; } -export function endImpl(w) { - return done => () => { - w.end(null, null, done); - }; -} -export function destroy(strm) { - return () => { - strm.destroy(null); - }; -} +export const endCbImpl = (w, cb) => w.end(cb); -export function destroyWithError(strm) { - return e => () => { - strm.destroy(e); - }; -} +export const endImpl = (w) => w.end(); + +export const writeableImpl = (w) => w.writeable; + +export const writeableEndedImpl = (w) => w.writeableEnded; + +export const writeableCorkedImpl = (w) => w.writeableCorked; + +export const erroredImpl = (w) => w.errored; + +export const writeableFinishedImpl = (w) => w.writeableFinished; + +export const writeableHighWaterMarkImpl = (w) => w.writeableHighWaterMark; + +export const writeableLengthImpl = (w) => w.writeableLength; + +export const writeableNeedDrainImpl = (w) => w.writeableNeedDrain; + +export const destroyImpl = (w) => w.destroy(); + +export const destroyErrorImpl = (w, e) => w.destroy(e); + +export const closedImpl = (w) => w.closed; + +export const destroyedImpl = (w) => w.destroyed; diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 5b7e23e..885ca51 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -22,36 +22,59 @@ module Node.Stream , readableH , resumeH , endH + , readable + , readableEnded + , readableFlowing + , readableHighWaterMark + , readableLength , resume , pause , isPaused , pipe + , pipe' , unpipe , unpipeAll , read , readString , readEither + , writeable + , writeableEnded + , writeableCorked + , errored + , writeableFinished + , writeableHighWaterMark + , writeableLength + , writeableNeedDrain , write + , write_ + , writeCb + , writeCb_ , writeString + , writeString_ + , writeStringCb + , writeStringCb_ , cork , uncork , setDefaultEncoding , end + , end_ , destroy - , destroyWithError + , destroy' + , closed + , destroyed ) where import Prelude import Data.Either (Either(..)) import Data.Maybe (Maybe(..), fromMaybe) -import Data.Nullable as N +import Data.Nullable (Nullable, toMaybe) import Effect (Effect) import Effect.Exception (Error, throw) -import Effect.Uncurried (EffectFn1, mkEffectFn1) +import Effect.Uncurried (EffectFn1, EffectFn2, EffectFn3, EffectFn4, mkEffectFn1, runEffectFn1, runEffectFn2, runEffectFn3, runEffectFn4) import Node.Buffer (Buffer) import Node.Buffer as Buffer -import Node.Encoding (Encoding) +import Node.Encoding (Encoding, encodingToNode) import Node.EventEmitter (EventEmitter, EventHandle(..)) import Node.EventEmitter.UtilTypes (EventHandle1, EventHandle0) import Unsafe.Coerce (unsafeCoerce) @@ -230,74 +253,155 @@ resumeH = EventHandle "resume" identity endH :: forall w. EventHandle0 (Readable w) endH = EventHandle "end" identity +readable :: forall w. Readable w -> Effect Boolean +readable r = runEffectFn1 readableImpl r + +foreign import readableImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableEnded :: forall w. Readable w -> Effect Boolean +readableEnded r = runEffectFn1 readableEndedImpl r + +foreign import readableEndedImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableFlowing :: forall w. Readable w -> Effect Boolean +readableFlowing r = runEffectFn1 readableFlowingImpl r + +foreign import readableFlowingImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableHighWaterMark :: forall w. Readable w -> Effect Boolean +readableHighWaterMark r = runEffectFn1 readableHighWaterMarkImpl r + +foreign import readableHighWaterMarkImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableLength :: forall w. Readable w -> Effect Boolean +readableLength r = runEffectFn1 readableLengthImpl r + +foreign import readableLengthImpl :: forall w. EffectFn1 (Readable w) (Boolean) + -- | Resume reading from the stream. -foreign import resume :: forall w. Readable w -> Effect Unit +resume :: forall w. Readable w -> Effect Unit +resume r = runEffectFn1 resumeImpl r + +foreign import resumeImpl :: forall w. EffectFn1 (Readable w) (Unit) -- | Pause reading from the stream. -foreign import pause :: forall w. Readable w -> Effect Unit +pause :: forall w. Readable w -> Effect Unit +pause r = runEffectFn1 pauseImpl r + +foreign import pauseImpl :: forall w. EffectFn1 (Readable w) (Unit) -- | Check whether or not a stream is paused for reading. -foreign import isPaused :: forall w. Readable w -> Effect Boolean +isPaused :: forall w. Readable w -> Effect Boolean +isPaused r = runEffectFn1 isPausedImpl r + +foreign import isPausedImpl :: forall w. EffectFn1 (Readable w) (Boolean) -- | Read chunks from a readable stream and write them to a writable stream. -foreign import pipe - :: forall r w - . Readable w - -> Writable r - -> Effect (Writable r) +pipe :: forall w r. Readable w -> Writable r -> Effect Unit +pipe r w = runEffectFn2 pipeImpl r w + +foreign import pipeImpl :: forall w r. EffectFn2 (Readable w) (Writable r) (Unit) + +pipe' :: forall w r. Readable w -> Writable r -> { end :: Boolean } -> Effect Unit +pipe' r w o = runEffectFn3 pipeCbImpl r w o + +foreign import pipeCbImpl :: forall w r. EffectFn3 (Readable w) (Writable r) ({ end :: Boolean }) (Unit) -- | Detach a Writable stream previously attached using `pipe`. -foreign import unpipe - :: forall r w - . Readable w - -> Writable r - -> Effect Unit +unpipe :: forall w r. Readable w -> Writable r -> Effect Unit +unpipe r w = runEffectFn2 unpipeImpl r w + +foreign import unpipeImpl :: forall w r. EffectFn2 (Readable w) (Writable r) (Unit) -- | Detach all Writable streams previously attached using `pipe`. -foreign import unpipeAll - :: forall w - . Readable w - -> Effect Unit +unpipeAll :: forall w. Readable w -> Effect Unit +unpipeAll r = runEffectFn1 unpipeAllImpl r -foreign import writeImpl - :: forall r - . Writable r - -> Buffer - -> EffectFn1 (N.Nullable Error) Unit - -> Effect Boolean +foreign import unpipeAllImpl :: forall w. EffectFn1 (Readable w) (Unit) --- | Write a Buffer to a writable stream. -write - :: forall r - . Writable r - -> Buffer - -> (Maybe Error -> Effect Unit) - -> Effect Boolean -write w b cb = writeImpl w b $ mkEffectFn1 (cb <<< N.toMaybe) +writeable :: forall r. Writable r -> Effect Boolean +writeable w = runEffectFn1 writeableImpl w -foreign import writeStringImpl - :: forall r - . Writable r - -> String - -> String - -> EffectFn1 (N.Nullable Error) Unit - -> Effect Boolean +foreign import writeableImpl :: forall r. EffectFn1 (Writable r) (Boolean) --- | Write a string in the specified encoding to a writable stream. -writeString - :: forall r - . Writable r - -> Encoding - -> String - -> (Maybe Error -> Effect Unit) - -> Effect Boolean -writeString w enc s cb = writeStringImpl w (show enc) s $ mkEffectFn1 (cb <<< N.toMaybe) +writeableEnded :: forall r. Writable r -> Effect Boolean +writeableEnded w = runEffectFn1 writeableEndedImpl w + +foreign import writeableEndedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +writeableCorked :: forall r. Writable r -> Effect Boolean +writeableCorked w = runEffectFn1 writeableCorkedImpl w + +foreign import writeableCorkedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +errored :: forall rw. Stream rw -> Effect Boolean +errored rw = runEffectFn1 erroredImpl rw + +foreign import erroredImpl :: forall rw. EffectFn1 (Stream rw) (Boolean) + +writeableFinished :: forall r. Writable r -> Effect Boolean +writeableFinished w = runEffectFn1 writeableFinishedImpl w + +foreign import writeableFinishedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +writeableHighWaterMark :: forall r. Writable r -> Effect Number +writeableHighWaterMark w = runEffectFn1 writeableHighWaterMarkImpl w + +foreign import writeableHighWaterMarkImpl :: forall r. EffectFn1 (Writable r) (Number) + +writeableLength :: forall r. Writable r -> Effect Number +writeableLength w = runEffectFn1 writeableLengthImpl w + +foreign import writeableLengthImpl :: forall r. EffectFn1 (Writable r) (Number) + +writeableNeedDrain :: forall r. Writable r -> Effect Boolean +writeableNeedDrain w = runEffectFn1 writeableNeedDrainImpl w + +foreign import writeableNeedDrainImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +write :: forall r. Writable r -> Buffer -> Effect Boolean +write w b = runEffectFn2 writeImpl w b + +write_ :: forall r. Writable r -> Buffer -> Effect Unit +write_ w b = void $ write w b + +foreign import writeImpl :: forall r a. EffectFn2 (Writable r) (Buffer) (a) + +writeCb :: forall r. Writable r -> Buffer -> (Maybe Error -> Effect Unit) -> Effect Boolean +writeCb w b cb = runEffectFn3 writeCbImpl w b $ mkEffectFn1 \err -> cb (toMaybe err) + +writeCb_ :: forall r. Writable r -> Buffer -> (Maybe Error -> Effect Unit) -> Effect Unit +writeCb_ w b cb = void $ writeCb w b cb + +foreign import writeCbImpl :: forall r a. EffectFn3 (Writable r) (Buffer) (EffectFn1 (Nullable Error) Unit) (a) + +writeString :: forall r. Writable r -> Encoding -> String -> Effect Boolean +writeString w enc str = runEffectFn3 writeStringImpl w str (encodingToNode enc) + +writeString_ :: forall r. Writable r -> Encoding -> String -> Effect Unit +writeString_ w enc str = void $ writeString w enc str + +foreign import writeStringImpl :: forall r a. EffectFn3 (Writable r) (String) (String) (a) + +writeStringCb :: forall r. Writable r -> Encoding -> String -> (Maybe Error -> Effect Unit) -> Effect Boolean +writeStringCb w enc str cb = runEffectFn4 writeStringCbImpl w str (encodingToNode enc) $ mkEffectFn1 \err -> cb (toMaybe err) + +writeStringCb_ :: forall r. Writable r -> Encoding -> String -> (Maybe Error -> Effect Unit) -> Effect Unit +writeStringCb_ w enc str cb = void $ writeStringCb w enc str cb + +foreign import writeStringCbImpl :: forall r a. EffectFn4 (Writable r) (String) (String) (EffectFn1 (Nullable Error) Unit) (a) -- | Force buffering of writes. -foreign import cork :: forall r. Writable r -> Effect Unit +cork :: forall r. Writable r -> Effect Unit +cork s = runEffectFn1 corkImpl s + +foreign import corkImpl :: forall r. EffectFn1 (Writable r) (Unit) -- | Flush buffered data. -foreign import uncork :: forall r. Writable r -> Effect Unit +uncork :: forall r. Writable r -> Effect Unit +uncork w = runEffectFn1 uncorkImpl w + +foreign import uncorkImpl :: forall r. EffectFn1 (Writable r) (Unit) foreign import setDefaultEncodingImpl :: forall r @@ -317,33 +421,33 @@ setDefaultEncoding -> Effect Unit setDefaultEncoding r enc = setDefaultEncodingImpl r (show enc) -foreign import endImpl - :: forall r - . Writable r - -> EffectFn1 (N.Nullable Error) Unit - -> Effect Unit - -- | End writing data to the stream. -end - :: forall r - . Writable r - -> (Maybe Error -> Effect Unit) - -> Effect Unit -end w cb = endImpl w $ mkEffectFn1 (cb <<< N.toMaybe) +end :: forall r. Writable r -> (Maybe Error -> Effect Unit) -> Effect Unit +end w cb = runEffectFn2 endCbImpl w $ mkEffectFn1 \err -> cb (toMaybe err) --- | Destroy the stream. It will release any internal resources. --- --- Added in node 8.0. -foreign import destroy - :: forall r - . Stream r - -> Effect Unit +foreign import endCbImpl :: forall r. EffectFn2 (Writable r) (EffectFn1 (Nullable Error) Unit) (Unit) --- | Destroy the stream and emit 'error'. --- --- Added in node 8.0. -foreign import destroyWithError - :: forall r - . Stream r - -> Error - -> Effect Unit +end_ :: forall r. Writable r -> Effect Unit +end_ w = runEffectFn1 endImpl w + +foreign import endImpl :: forall r. EffectFn1 (Writable r) (Unit) + +destroy :: forall r. Stream r -> Effect Unit +destroy w = runEffectFn1 destroyImpl w + +foreign import destroyImpl :: forall r. EffectFn1 (Stream r) (Unit) + +destroy' :: forall r. Stream r -> Error -> Effect Unit +destroy' w e = runEffectFn2 destroyErrorImpl w e + +foreign import destroyErrorImpl :: forall r. EffectFn2 (Stream r) (Error) Unit + +closed :: forall r. Stream r -> Effect Boolean +closed w = runEffectFn1 closedImpl w + +foreign import closedImpl :: forall r. EffectFn1 (Stream r) (Boolean) + +destroyed :: forall r. Stream r -> Effect Boolean +destroyed w = runEffectFn1 destroyedImpl w + +foreign import destroyedImpl :: forall r. EffectFn1 (Stream r) (Boolean) From 23852df240e0ae6664d278468ac0a6de3d75cbca Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 11:28:25 -0500 Subject: [PATCH 08/15] Add other misc. missing API --- src/Node/Stream.js | 10 ++++++++++ src/Node/Stream.purs | 21 +++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index 2417a96..56ab605 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -1,3 +1,5 @@ +import stream from "node:stream"; + const _undefined = undefined; export { _undefined as undefined }; @@ -112,3 +114,11 @@ export const destroyErrorImpl = (w, e) => w.destroy(e); export const closedImpl = (w) => w.closed; export const destroyedImpl = (w) => w.destroyed; + +export const allowHalfOpenImpl = (d) => d.allowHalfOpen; + +export const pipelineImpl = (src, transforms, dst, cb) => stream.pipeline([src, ...transforms, dst], cb); + +export const readableFromImpl = (buf) => stream.ReadableStream.from(buf); + +export const newPassThrough = () => new stream.PassThrough({ objectMode: false }); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 885ca51..ecfe5e5 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -62,6 +62,10 @@ module Node.Stream , destroy' , closed , destroyed + , allowHalfOpen + , pipeline + , fromBuffer + , newPassThrough ) where import Prelude @@ -451,3 +455,20 @@ destroyed :: forall r. Stream r -> Effect Boolean destroyed w = runEffectFn1 destroyedImpl w foreign import destroyedImpl :: forall r. EffectFn1 (Stream r) (Boolean) + +allowHalfOpen :: Duplex -> Effect Boolean +allowHalfOpen d = runEffectFn1 allowHalfOpenImpl d + +foreign import allowHalfOpenImpl :: EffectFn1 (Duplex) (Boolean) + +pipeline :: forall w r. Readable w -> Array Duplex -> Writable r -> (Error -> Effect Unit) -> Effect Unit +pipeline src transforms dest cb = runEffectFn4 pipelineImpl src transforms dest cb + +foreign import pipelineImpl :: forall w r. EffectFn4 (Readable w) (Array Duplex) (Writable r) ((Error -> Effect Unit)) (Unit) + +fromBuffer :: Buffer -> Effect (Readable ()) +fromBuffer buf = runEffectFn1 readableFromImpl buf + +foreign import readableFromImpl :: EffectFn1 (Buffer) (Readable ()) + +foreign import newPassThrough :: Effect Duplex From 211ae56883b728ad48062d4f21409cab2f6420ed Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 11:31:31 -0500 Subject: [PATCH 09/15] Convert encoding FFI to uncurried --- src/Node/Stream.js | 13 ++----------- src/Node/Stream.purs | 20 ++++++-------------- 2 files changed, 8 insertions(+), 25 deletions(-) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index 56ab605..d61085c 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -3,11 +3,7 @@ import stream from "node:stream"; const _undefined = undefined; export { _undefined as undefined }; -export function setEncodingImpl(s) { - return enc => () => { - s.setEncoding(enc); - }; -} +export const setEncodingImpl = (s, enc) => s.setEncoding(enc); export function readChunkImpl(Left) { return Right => chunk => { @@ -80,12 +76,7 @@ export const corkImpl = (w) => w.cork(); export const uncorkImpl = (w) => w.uncork(); -export function setDefaultEncodingImpl(w) { - return enc => () => { - w.setDefaultEncoding(enc); - }; -} - +export const setDefaultEncodingImpl = (w, enc) => w.setDefaultEncoding(enc); export const endCbImpl = (w, cb) => w.end(cb); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index ecfe5e5..753c928 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -208,12 +208,6 @@ foreign import onDataEitherImpl -> (Either String Buffer -> Effect Unit) -> Effect Unit -foreign import setEncodingImpl - :: forall w - . Readable w - -> String - -> Effect Unit - -- | Set the encoding used to read chunks as strings from the stream. This -- | function may be useful when you are passing a readable stream to some other -- | JavaScript library, which already expects an encoding to be set. @@ -225,7 +219,9 @@ setEncoding . Readable w -> Encoding -> Effect Unit -setEncoding r enc = setEncodingImpl r (show enc) +setEncoding r enc = runEffectFn2 setEncodingImpl r (show enc) + +foreign import setEncodingImpl :: forall w. EffectFn2 (Readable w) String Unit closeH :: forall rw. EventHandle0 (Stream rw) closeH = EventHandle "close" identity @@ -407,12 +403,6 @@ uncork w = runEffectFn1 uncorkImpl w foreign import uncorkImpl :: forall r. EffectFn1 (Writable r) (Unit) -foreign import setDefaultEncodingImpl - :: forall r - . Writable r - -> String - -> Effect Unit - -- | Set the default encoding used to write strings to the stream. This function -- | is useful when you are passing a writable stream to some other JavaScript -- | library, which already expects a default encoding to be set. It has no @@ -423,7 +413,9 @@ setDefaultEncoding . Writable r -> Encoding -> Effect Unit -setDefaultEncoding r enc = setDefaultEncodingImpl r (show enc) +setDefaultEncoding r enc = runEffectFn2 setDefaultEncodingImpl r (show enc) + +foreign import setDefaultEncodingImpl :: forall r. EffectFn2 (Writable r) String Unit -- | End writing data to the stream. end :: forall r. Writable r -> (Maybe Error -> Effect Unit) -> Effect Unit From f1f4502b61c36c0fffb37aa4b4c447faa33ad63e Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 12:02:24 -0500 Subject: [PATCH 10/15] Update read functions / define dataH* handlers --- src/Node/Stream.js | 52 ++++-------- src/Node/Stream.purs | 193 +++++++++++++++++++++++-------------------- 2 files changed, 121 insertions(+), 124 deletions(-) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index d61085c..c5e27cf 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -1,33 +1,20 @@ import stream from "node:stream"; -const _undefined = undefined; -export { _undefined as undefined }; - export const setEncodingImpl = (s, enc) => s.setEncoding(enc); -export function readChunkImpl(Left) { - return Right => chunk => { - if (chunk instanceof Buffer) { - return Right(chunk); - } else if (typeof chunk === "string") { - return Left(chunk); - } else { - throw new Error( - "Node.Stream.readChunkImpl: Unrecognised " + - "chunk type; expected String or Buffer, got: " + - chunk - ); - } - }; -} - -export function onDataEitherImpl(readChunk) { - return r => f => () => { - r.on("data", data => { - f(readChunk(data))(); - }); - }; -} +export const readChunkImpl = (useBuffer, useString, chunk) => { + if (chunk instanceof Buffer) { + return useBuffer(chunk); + } else if (typeof chunk === "string") { + return useString(chunk); + } else { + throw new Error( + "Node.Stream.readChunkImpl: Unrecognised " + + "chunk type; expected String or Buffer, got: " + + chunk + ); + } +}; export const readableImpl = (r) => r.readable; @@ -53,16 +40,9 @@ export const unpipeAllImpl = (r) => r.unpipe(); export const unpipeImpl = (r, w) => r.unpipe(w); -export function readImpl(readChunk) { - return Nothing => Just => r => s => () => { - const v = r.read(s); - if (v === null) { - return Nothing; - } else { - return Just(readChunk(v)); - } - }; -} +export const readImpl = (r) => r.read(); + +export const readSizeImpl = (r, size) => r.read(size); export const writeImpl = (w, buf) => w.write(buf); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 753c928..870e7e4 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -8,9 +8,6 @@ module Node.Stream , Writable , Duplex , toEventEmitter - , onData - , onDataString - , onDataEither , setEncoding , closeH , errorH @@ -18,6 +15,10 @@ module Node.Stream , finishH , pipeH , unpipeH + , Chunk + , dataH + , dataHStr + , dataHEither , pauseH , readableH , resumeH @@ -35,8 +36,9 @@ module Node.Stream , unpipe , unpipeAll , read + , read' , readString - , readEither + , readString' , writeable , writeableEnded , writeableCorked @@ -71,7 +73,7 @@ module Node.Stream import Prelude import Data.Either (Either(..)) -import Data.Maybe (Maybe(..), fromMaybe) +import Data.Maybe (Maybe(..)) import Data.Nullable (Nullable, toMaybe) import Effect (Effect) import Effect.Exception (Error, throw) @@ -109,104 +111,119 @@ type Duplex = Stream (read :: Read, write :: Write) toEventEmitter :: forall rw. Stream rw -> EventEmitter toEventEmitter = unsafeCoerce -foreign import undefined :: forall a. a - +-- | Internal type. This should not be used by end-users. foreign import data Chunk :: Type foreign import readChunkImpl - :: (forall l r. l -> Either l r) - -> (forall l r. r -> Either l r) - -> Chunk - -> Either String Buffer - -readChunk :: Chunk -> Either String Buffer -readChunk = readChunkImpl Left Right + :: forall r + . EffectFn3 + (EffectFn1 Buffer r) + (EffectFn1 String r) + Chunk + r -- | Listen for `data` events, returning data in a Buffer. Note that this will fail -- | if `setEncoding` has been called on the stream. -onData - :: forall w - . Readable w - -> (Buffer -> Effect Unit) - -> Effect Unit -onData r cb = - onDataEither r (cb <=< fromEither) - where - fromEither x = - case x of - Left _ -> - throw "Stream encoding should not be set" - Right buf -> - pure buf - -read - :: forall w - . Readable w - -> Maybe Int - -> Effect (Maybe Buffer) -read r size = do - v <- readEither r size - case v of - Nothing -> pure Nothing - Just (Left _) -> throw "Stream encoding should not be set" - Just (Right b) -> pure (Just b) - +-- | +-- | This is likely the handler you want to use for converting a `Stream` into a `String`: +-- | ``` +-- | let useStringCb = ... +-- | ref <- Ref.new +-- | stream # on dataH \buf -> +-- | Ref.modify_ (\ref' -> Array.snoc ref' buf) ref +-- | stream # on endH do +-- | bufs <- Ref.read ref +-- | useStringCb $ Buffer.toString UTF8 $ Buffer.concat bufs +-- | ``` +dataH :: forall w. EventHandle (Readable w) (Buffer -> Effect Unit) (EffectFn1 Chunk Unit) +dataH = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 cb) + (mkEffectFn1 \_ -> throw "Got a String, not a Buffer. Stream encoding should not be set") + chunk + +-- | Listen for `data` events, returning data as a String. Note that this will fail +-- | if `setEncoding` has NOT been called on the stream. +dataHStr :: forall w. EventHandle (Readable w) (String -> Effect Unit) (EffectFn1 Chunk Unit) +dataHStr = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 \_ -> throw "Got a Buffer, not String. Stream encoding must be set to get a String.") + (mkEffectFn1 cb) + chunk + +-- | Listen for `data` events, returning data in a Buffer or String. This will work +-- | regardless of whether `setEncoding` has been called or not. +dataHEither :: forall w. EventHandle (Readable w) (Either Buffer String -> Effect Unit) (EffectFn1 Chunk Unit) +dataHEither = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 (cb <<< Left)) + (mkEffectFn1 (cb <<< Right)) + chunk + +-- | Note: this will fail if `setEncoding` has been called on the stream. +read :: forall w. Readable w -> Effect (Maybe Buffer) +read r = do + chunk <- runEffectFn1 readImpl r + case toMaybe chunk of + Nothing -> + pure Nothing + Just c -> + runEffectFn3 readChunkImpl + (mkEffectFn1 \buf -> pure $ Just buf) + (mkEffectFn1 \_ -> throw "Stream encoding should not be set") + c + +foreign import readImpl :: forall w. EffectFn1 (Readable w) (Nullable Chunk) + +-- | Note: this will fail if `setEncoding` has been called on the stream. +read' :: forall w. Readable w -> Int -> Effect (Maybe Buffer) +read' r size = do + chunk <- runEffectFn2 readSizeImpl r size + case toMaybe chunk of + Nothing -> + pure Nothing + Just c -> + runEffectFn3 readChunkImpl + (mkEffectFn1 \buf -> pure $ Just buf) + (mkEffectFn1 \_ -> throw "Stream encoding should not be set") + c + +foreign import readSizeImpl :: forall w. EffectFn2 (Readable w) (Int) (Nullable Chunk) + +-- | Note: this will fail if `setEncoding` has been called on the stream. readString :: forall w . Readable w - -> Maybe Int -> Encoding -> Effect (Maybe String) -readString r size enc = do - v <- readEither r size - case v of - Nothing -> pure Nothing - Just (Left _) -> throw "Stream encoding should not be set" - Just (Right buf) -> Just <$> Buffer.toString enc buf - -readEither +readString r enc = do + mbBuf <- read r + case mbBuf of + Nothing -> + pure Nothing + Just buf -> do + Just <$> Buffer.toString enc buf + +-- | Note: this will fail if `setEncoding` has been called on the stream. +readString' :: forall w . Readable w - -> Maybe Int - -> Effect (Maybe (Either String Buffer)) -readEither r size = readImpl readChunk Nothing Just r (fromMaybe undefined size) - -foreign import readImpl - :: forall r - . (Chunk -> Either String Buffer) - -> (forall a. Maybe a) - -> (forall a. a -> Maybe a) - -> Readable r -> Int - -> Effect (Maybe (Either String Buffer)) - --- | Listen for `data` events, returning data in a String, which will be --- | decoded using the given encoding. Note that this will fail if `setEncoding` --- | has been called on the stream. -onDataString - :: forall w - . Readable w -> Encoding - -> (String -> Effect Unit) - -> Effect Unit -onDataString r enc cb = onData r (cb <=< Buffer.toString enc) - --- | Listen for `data` events, returning data in an `Either String Buffer`. This --- | function is provided for the (hopefully rare) case that `setEncoding` has --- | been called on the stream. -onDataEither - :: forall r - . Readable r - -> (Either String Buffer -> Effect Unit) - -> Effect Unit -onDataEither r cb = onDataEitherImpl readChunk r cb - -foreign import onDataEitherImpl - :: forall r - . (Chunk -> Either String Buffer) - -> Readable r - -> (Either String Buffer -> Effect Unit) - -> Effect Unit + -> Effect (Maybe String) +readString' r size enc = do + mbBuf <- read' r size + case mbBuf of + Nothing -> + pure Nothing + Just buf -> do + Just <$> Buffer.toString enc buf -- | Set the encoding used to read chunks as strings from the stream. This -- | function may be useful when you are passing a readable stream to some other From 8771cd36f8e5fa2c2b482b935905258165636373 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 21:31:13 -0500 Subject: [PATCH 11/15] Use consistent naming scheme --- src/Node/Stream.purs | 58 ++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 870e7e4..2b837d5 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -1,14 +1,13 @@ -- | This module provides a low-level wrapper for the [Node Stream API](https://nodejs.org/api/stream.html). module Node.Stream - ( Stream - , Read - , Readable + ( Read , Write + , Stream + , Readable , Writable , Duplex , toEventEmitter - , setEncoding , closeH , errorH , drainH @@ -49,17 +48,18 @@ module Node.Stream , writeableNeedDrain , write , write_ - , writeCb - , writeCb_ + , write' + , write'_ , writeString , writeString_ - , writeStringCb - , writeStringCb_ + , writeString' + , writeString'_ , cork , uncork + , setEncoding , setDefaultEncoding , end - , end_ + , end' , destroy , destroy' , closed @@ -122,8 +122,8 @@ foreign import readChunkImpl Chunk r --- | Listen for `data` events, returning data in a Buffer. Note that this will fail --- | if `setEncoding` has been called on the stream. +-- | Listen for `data` events on a stream where `setEncoding` has NOT been called, returning data as a Buffer. +-- | If `setEncoding` HAS been called on the stream, this will throw an error. -- | -- | This is likely the handler you want to use for converting a `Stream` into a `String`: -- | ``` @@ -144,8 +144,8 @@ dataH = EventHandle "data" \cb -> (mkEffectFn1 \_ -> throw "Got a String, not a Buffer. Stream encoding should not be set") chunk --- | Listen for `data` events, returning data as a String. Note that this will fail --- | if `setEncoding` has NOT been called on the stream. +-- | Listen for `data` events on a stream where `setEncoding` has been called, returning data as a String. +-- | If `setEncoding` has NOT been called on the stream, this will throw an error. dataHStr :: forall w. EventHandle (Readable w) (String -> Effect Unit) (EffectFn1 Chunk Unit) dataHStr = EventHandle "data" \cb -> mkEffectFn1 \chunk -> do @@ -157,13 +157,13 @@ dataHStr = EventHandle "data" \cb -> -- | Listen for `data` events, returning data in a Buffer or String. This will work -- | regardless of whether `setEncoding` has been called or not. -dataHEither :: forall w. EventHandle (Readable w) (Either Buffer String -> Effect Unit) (EffectFn1 Chunk Unit) +dataHEither :: forall w. EventHandle (Readable w) (Either String Buffer -> Effect Unit) (EffectFn1 Chunk Unit) dataHEither = EventHandle "data" \cb -> mkEffectFn1 \chunk -> do runEffectFn3 readChunkImpl - (mkEffectFn1 (cb <<< Left)) (mkEffectFn1 (cb <<< Right)) + (mkEffectFn1 (cb <<< Left)) chunk -- | Note: this will fail if `setEncoding` has been called on the stream. @@ -384,11 +384,11 @@ write_ w b = void $ write w b foreign import writeImpl :: forall r a. EffectFn2 (Writable r) (Buffer) (a) -writeCb :: forall r. Writable r -> Buffer -> (Maybe Error -> Effect Unit) -> Effect Boolean -writeCb w b cb = runEffectFn3 writeCbImpl w b $ mkEffectFn1 \err -> cb (toMaybe err) +write' :: forall r. Writable r -> Buffer -> (Maybe Error -> Effect Unit) -> Effect Boolean +write' w b cb = runEffectFn3 writeCbImpl w b $ mkEffectFn1 \err -> cb (toMaybe err) -writeCb_ :: forall r. Writable r -> Buffer -> (Maybe Error -> Effect Unit) -> Effect Unit -writeCb_ w b cb = void $ writeCb w b cb +write'_ :: forall r. Writable r -> Buffer -> (Maybe Error -> Effect Unit) -> Effect Unit +write'_ w b cb = void $ write' w b cb foreign import writeCbImpl :: forall r a. EffectFn3 (Writable r) (Buffer) (EffectFn1 (Nullable Error) Unit) (a) @@ -400,11 +400,11 @@ writeString_ w enc str = void $ writeString w enc str foreign import writeStringImpl :: forall r a. EffectFn3 (Writable r) (String) (String) (a) -writeStringCb :: forall r. Writable r -> Encoding -> String -> (Maybe Error -> Effect Unit) -> Effect Boolean -writeStringCb w enc str cb = runEffectFn4 writeStringCbImpl w str (encodingToNode enc) $ mkEffectFn1 \err -> cb (toMaybe err) +writeString' :: forall r. Writable r -> Encoding -> String -> (Maybe Error -> Effect Unit) -> Effect Boolean +writeString' w enc str cb = runEffectFn4 writeStringCbImpl w str (encodingToNode enc) $ mkEffectFn1 \err -> cb (toMaybe err) -writeStringCb_ :: forall r. Writable r -> Encoding -> String -> (Maybe Error -> Effect Unit) -> Effect Unit -writeStringCb_ w enc str cb = void $ writeStringCb w enc str cb +writeString'_ :: forall r. Writable r -> Encoding -> String -> (Maybe Error -> Effect Unit) -> Effect Unit +writeString'_ w enc str cb = void $ writeString' w enc str cb foreign import writeStringCbImpl :: forall r a. EffectFn4 (Writable r) (String) (String) (EffectFn1 (Nullable Error) Unit) (a) @@ -435,15 +435,15 @@ setDefaultEncoding r enc = runEffectFn2 setDefaultEncodingImpl r (show enc) foreign import setDefaultEncodingImpl :: forall r. EffectFn2 (Writable r) String Unit -- | End writing data to the stream. -end :: forall r. Writable r -> (Maybe Error -> Effect Unit) -> Effect Unit -end w cb = runEffectFn2 endCbImpl w $ mkEffectFn1 \err -> cb (toMaybe err) +end :: forall r. Writable r -> Effect Unit +end w = runEffectFn1 endImpl w -foreign import endCbImpl :: forall r. EffectFn2 (Writable r) (EffectFn1 (Nullable Error) Unit) (Unit) +foreign import endImpl :: forall r. EffectFn1 (Writable r) (Unit) -end_ :: forall r. Writable r -> Effect Unit -end_ w = runEffectFn1 endImpl w +end' :: forall r. Writable r -> (Maybe Error -> Effect Unit) -> Effect Unit +end' w cb = runEffectFn2 endCbImpl w $ mkEffectFn1 \err -> cb (toMaybe err) -foreign import endImpl :: forall r. EffectFn1 (Writable r) (Unit) +foreign import endCbImpl :: forall r. EffectFn2 (Writable r) (EffectFn1 (Nullable Error) Unit) (Unit) destroy :: forall r. Stream r -> Effect Unit destroy w = runEffectFn1 destroyImpl w From e2455c13310e33954fa36ace7d9022ad3fa6d113 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 21:31:54 -0500 Subject: [PATCH 12/15] Update tests --- test/Main.js | 25 -------- test/Main.purs | 161 ++++++++++++++++++++++--------------------------- 2 files changed, 72 insertions(+), 114 deletions(-) diff --git a/test/Main.js b/test/Main.js index d623c00..ed4e635 100644 --- a/test/Main.js +++ b/test/Main.js @@ -1,26 +1 @@ -import { WritableStreamBuffer, ReadableStreamBuffer } from "stream-buffers"; -import { PassThrough } from "stream"; - -export function writableStreamBuffer() { - return new WritableStreamBuffer; -} - -export function getContentsAsString(w) { - return () => w.getContentsAsString("utf8"); -} - -export function readableStreamBuffer() { - return new ReadableStreamBuffer; -} - -export function putImpl(str) { - return enc => r => () => { - r.put(str, enc); - }; -} - export { createGzip, createGunzip } from "zlib"; - -export function passThrough() { - return new PassThrough; -} diff --git a/test/Main.purs b/test/Main.purs index 9d9a520..e577820 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -2,14 +2,14 @@ module Test.Main where import Prelude -import Data.Either (Either(..)) -import Data.Maybe (Maybe(..), fromJust, isJust, isNothing) +import Data.Maybe (fromJust, isJust, isNothing) import Effect (Effect) import Effect.Console (log) import Effect.Exception (error) import Node.Buffer as Buffer import Node.Encoding (Encoding(..)) -import Node.Stream (Duplex, Readable, Writable, destroyWithError, end, onData, onDataEither, onDataString, onError, onReadable, pipe, read, readString, setDefaultEncoding, setEncoding, writeString) +import Node.EventEmitter (on_) +import Node.Stream (Duplex, dataH, dataHStr, destroy', end, end', errorH, newPassThrough, pipe, read, readString, readableH, setDefaultEncoding, setEncoding, writeString'_, writeString_) import Partial.Unsafe (unsafePartial) import Test.Assert (assert, assert') @@ -17,28 +17,16 @@ assertEqual :: forall a. Show a => Eq a => a -> a -> Effect Unit assertEqual x y = assert' (show x <> " did not equal " <> show y) (x == y) - -foreign import writableStreamBuffer :: Effect (Writable ()) - -foreign import getContentsAsString :: forall r. Writable r -> Effect String - -foreign import readableStreamBuffer :: Effect (Readable ()) - -foreign import putImpl :: forall r. String -> String -> Readable r -> Effect Unit - -put :: forall r. String -> Encoding -> Readable r -> Effect Unit -put str enc = putImpl str (show enc) - -main :: Effect Boolean +main :: Effect Unit main = do log "setDefaultEncoding should not affect writing" - _ <- testSetDefaultEncoding + testSetDefaultEncoding log "setEncoding should not affect reading" testSetEncoding log "test pipe" - _ <- testPipe + testPipe log "test write" testWrite @@ -52,55 +40,54 @@ main = do testString :: String testString = "üöß💡" -testReads :: Effect Boolean +testReads :: Effect Unit testReads = do - _ <- testReadString + testReadString testReadBuf where - testReadString = do - sIn <- passThrough - v <- readString sIn Nothing UTF8 - assert (isNothing v) - - onReadable sIn do - str <- readString sIn Nothing UTF8 - assert (isJust str) - assertEqual (unsafePartial (fromJust str)) testString - pure unit - - writeString sIn UTF8 testString \_ -> do - pure unit - - testReadBuf = do - sIn <- passThrough - v <- read sIn Nothing - assert (isNothing v) - - onReadable sIn do - buf <- read sIn Nothing - assert (isJust buf) - _ <- assertEqual <$> (Buffer.toString UTF8 (unsafePartial (fromJust buf))) - <*> pure testString - pure unit - - writeString sIn UTF8 testString \_ -> do - pure unit - -testSetDefaultEncoding :: Effect Boolean + testReadString = do + sIn <- newPassThrough + v <- readString sIn UTF8 + assert (isNothing v) + + sIn # on_ readableH do + str <- readString sIn UTF8 + assert (isJust str) + assertEqual (unsafePartial (fromJust str)) testString + pure unit + + writeString_ sIn UTF8 testString + + testReadBuf = do + sIn <- newPassThrough + v <- read sIn + assert (isNothing v) + + sIn # on_ readableH do + buf <- read sIn + assert (isJust buf) + _ <- assertEqual <$> (Buffer.toString UTF8 (unsafePartial (fromJust buf))) + <*> pure testString + pure unit + + writeString_ sIn UTF8 testString + +testSetDefaultEncoding :: Effect Unit testSetDefaultEncoding = do - w1 <- writableStreamBuffer - _ <- check w1 + w1 <- newPassThrough + check w1 - w2 <- writableStreamBuffer + w2 <- newPassThrough setDefaultEncoding w2 UCS2 check w2 where check w = do - writeString w UTF8 testString \_ -> do - c <- getContentsAsString w - assertEqual testString c + w # on_ dataH \buf -> do + str <- Buffer.toString UTF8 buf + assertEqual testString str + writeString_ w UTF8 testString testSetEncoding :: Effect Unit testSetEncoding = do @@ -109,23 +96,23 @@ testSetEncoding = do check UCS2 where check enc = do - r1 <- readableStreamBuffer - put testString enc r1 + r1 <- newPassThrough + writeString_ r1 enc testString - r2 <- readableStreamBuffer - put testString enc r2 + r2 <- newPassThrough + writeString_ r1 enc testString setEncoding r2 enc - onData r1 \buf -> unsafePartial do - onDataEither r2 \(Left str) -> do - _ <- assertEqual <$> Buffer.toString enc buf <*> pure testString + r1 # on_ dataH \buf -> do + r2 # on_ dataHStr \str -> do + join $ assertEqual <$> Buffer.toString enc buf <*> pure testString assertEqual str testString -testPipe :: Effect Boolean +testPipe :: Effect Unit testPipe = do - sIn <- passThrough - sOut <- passThrough - zip <- createGzip + sIn <- newPassThrough + sOut <- newPassThrough + zip <- createGzip unzip <- createGunzip log "pipe 1" @@ -135,36 +122,32 @@ testPipe = do log "pipe 3" _ <- unzip `pipe` sOut - writeString sIn UTF8 testString \_ -> do - end sIn \_ -> do - onDataString sOut UTF8 \str -> do + writeString'_ sIn UTF8 testString \_ -> do + end' sIn \_ -> do + sOut # on_ dataH \buf -> do + str <- Buffer.toString UTF8 buf assertEqual str testString - foreign import createGzip :: Effect Duplex foreign import createGunzip :: Effect Duplex - --- | Create a PassThrough stream, which simply writes its input to its output. -foreign import passThrough :: Effect Duplex - testWrite :: Effect Unit testWrite = do hasError noError where hasError = do - w1 <- writableStreamBuffer - _ <- onError w1 (const $ pure unit) - void $ end w1 $ const $ pure unit - void $ writeString w1 UTF8 "msg" \err -> do + w1 <- newPassThrough + w1 # on_ errorH (const $ pure unit) + end w1 + writeString'_ w1 UTF8 "msg" \err -> do assert' "writeString - should have error" $ isJust err noError = do - w1 <- writableStreamBuffer - void $ writeString w1 UTF8 "msg1" \err -> do + w1 <- newPassThrough + writeString'_ w1 UTF8 "msg1" \err -> do assert' "writeString - should have no error" $ isNothing err - void $ end w1 (const $ pure unit) + end w1 testEnd :: Effect Unit testEnd = do @@ -172,14 +155,14 @@ testEnd = do noError where hasError = do - w1 <- writableStreamBuffer - _ <- onError w1 (const $ pure unit) - void $ writeString w1 UTF8 "msg" \_ -> do - _ <- destroyWithError w1 $ error "Problem" - end w1 \err -> do + w1 <- newPassThrough + w1 # on_ errorH (const $ pure unit) + writeString'_ w1 UTF8 "msg" \_ -> do + _ <- destroy' w1 $ error "Problem" + end' w1 \err -> do assert' "end - should have error" $ isJust err noError = do - w1 <- writableStreamBuffer - end w1 \err -> do + w1 <- newPassThrough + end' w1 \err -> do assert' "end - should have no error" $ isNothing err From d5235ea20e8da3fb8dcaeebd6bceb09a3a284d11 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 21:34:22 -0500 Subject: [PATCH 13/15] Update link to v16 docs --- src/Node/Stream.purs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 2b837d5..5cb3d14 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -1,4 +1,4 @@ --- | This module provides a low-level wrapper for the [Node Stream API](https://nodejs.org/api/stream.html). +-- | This module provides a low-level wrapper for the [Node Stream API (v16 LTS)](https://nodejs.org/docs/latest-v16.x/api/stream.html). module Node.Stream ( Read From fc5b1f53e4049edc060f7f33612f1747c4847585 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 21:40:20 -0500 Subject: [PATCH 14/15] Drop incorrect encouragement towards onDataString --- src/Node/Stream.purs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 5cb3d14..7e9e11e 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -228,9 +228,6 @@ readString' r size enc = do -- | Set the encoding used to read chunks as strings from the stream. This -- | function may be useful when you are passing a readable stream to some other -- | JavaScript library, which already expects an encoding to be set. --- | --- | Where possible, you should try to use `onDataString` instead of this --- | function. setEncoding :: forall w . Readable w From d920fc8993f714e94a5b29316cdc864361bbfdcc Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 24 Jun 2023 21:49:26 -0500 Subject: [PATCH 15/15] Add support for Readable.from --- src/Node/Stream.js | 4 +++- src/Node/Stream.purs | 10 ++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/Node/Stream.js b/src/Node/Stream.js index c5e27cf..f4b80ef 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -90,6 +90,8 @@ export const allowHalfOpenImpl = (d) => d.allowHalfOpen; export const pipelineImpl = (src, transforms, dst, cb) => stream.pipeline([src, ...transforms, dst], cb); -export const readableFromImpl = (buf) => stream.ReadableStream.from(buf); +export const readableFromStrImpl = (str) => stream.Readable.from(str, { objectMode: false }); + +export const readableFromBufImpl = (buf) => stream.Readable.from(buf, { objectMode: false }); export const newPassThrough = () => new stream.PassThrough({ objectMode: false }); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 7e9e11e..22b7a8a 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -66,6 +66,7 @@ module Node.Stream , destroyed , allowHalfOpen , pipeline + , fromString , fromBuffer , newPassThrough ) where @@ -472,9 +473,14 @@ pipeline src transforms dest cb = runEffectFn4 pipelineImpl src transforms dest foreign import pipelineImpl :: forall w r. EffectFn4 (Readable w) (Array Duplex) (Writable r) ((Error -> Effect Unit)) (Unit) +fromString :: String -> Effect (Readable ()) +fromString str = runEffectFn1 readableFromStrImpl str + +foreign import readableFromStrImpl :: EffectFn1 (String) (Readable ()) + fromBuffer :: Buffer -> Effect (Readable ()) -fromBuffer buf = runEffectFn1 readableFromImpl buf +fromBuffer buf = runEffectFn1 readableFromBufImpl buf -foreign import readableFromImpl :: EffectFn1 (Buffer) (Readable ()) +foreign import readableFromBufImpl :: EffectFn1 (Buffer) (Readable ()) foreign import newPassThrough :: Effect Duplex