diff --git a/bower.json b/bower.json index be709a7..f5e7853 100644 --- a/bower.json +++ b/bower.json @@ -22,6 +22,7 @@ "purescript-exceptions": "^6.0.0", "purescript-node-buffer": "^8.0.0", "purescript-nullable": "^6.0.0", - "purescript-prelude": "^6.0.0" + "purescript-prelude": "^6.0.0", + "purescript-node-event-emitter": "https://github.com/purescript-node/purescript-node-event-emitter.git#^3.0.0" } } diff --git a/src/Node/Stream.js b/src/Node/Stream.js index cdd6748..f4b80ef 100644 --- a/src/Node/Stream.js +++ b/src/Node/Stream.js @@ -1,143 +1,97 @@ -const _undefined = undefined; -export { _undefined as undefined }; - -export function setEncodingImpl(s) { - return enc => () => { - s.setEncoding(enc); - }; -} - -export function readChunkImpl(Left) { - return Right => chunk => { - if (chunk instanceof Buffer) { - return Right(chunk); - } else if (typeof chunk === "string") { - return Left(chunk); - } else { - throw new Error( - "Node.Stream.readChunkImpl: Unrecognised " + - "chunk type; expected String or Buffer, got: " + - chunk - ); - } - }; -} - -export function onDataEitherImpl(readChunk) { - return r => f => () => { - r.on("data", data => { - f(readChunk(data))(); - }); - }; -} - -export function onEnd(s) { - return f => () => { - s.on("end", f); - }; -} - -export function onFinish(s) { - return f => () => { - s.on("finish", f); - }; -} - -export function onReadable(s) { - return f => () => { - s.on("readable", f); - }; -} - -export function onError(s) { - return f => () => { - s.on("error", e => { - f(e)(); - }); - }; -} - -export function onClose(s) { - return f => () => { - s.on("close", f); - }; -} - -export function resume(s) { - return () => { - s.resume(); - }; -} - -export function pause(s) { - return () => { - s.pause(); - }; -} - -export function isPaused(s) { - return () => s.isPaused(); -} - -export function pipe(r) { - return w => () => r.pipe(w); -} - -export function unpipe(r) { - return w => () => r.unpipe(w); -} - -export function unpipeAll(r) { - return () => r.unpipe(); -} - -export function readImpl(readChunk) { - return Nothing => Just => r => s => () => { - const v = r.read(s); - if (v === null) { - return Nothing; - } else { - return Just(readChunk(v)); - } - }; -} - -export function writeImpl(w) { - return chunk => done => () => w.write(chunk, null, done); -} - -export function writeStringImpl(w) { - return enc => s => done => () => w.write(s, enc, done); -} - -export function cork(w) { - return () => w.cork(); -} - -export function uncork(w) { - return () => w.uncork(); -} - -export function setDefaultEncodingImpl(w) { - return enc => () => { - w.setDefaultEncoding(enc); - }; -} - -export function endImpl(w) { - return done => () => { - w.end(null, null, done); - }; -} - -export function destroy(strm) { - return () => { - strm.destroy(null); - }; -} - -export function destroyWithError(strm) { - return e => () => { - strm.destroy(e); - }; -} +import stream from "node:stream"; + +export const setEncodingImpl = (s, enc) => s.setEncoding(enc); + +export const readChunkImpl = (useBuffer, useString, chunk) => { + if (chunk instanceof Buffer) { + return useBuffer(chunk); + } else if (typeof chunk === "string") { + return useString(chunk); + } else { + throw new Error( + "Node.Stream.readChunkImpl: Unrecognised " + + "chunk type; expected String or Buffer, got: " + + chunk + ); + } +}; + +export const readableImpl = (r) => r.readable; + +export const readableEndedImpl = (r) => r.readableEnded; + +export const readableFlowingImpl = (r) => r.readableFlowing; + +export const readableHighWaterMarkImpl = (r) => r.readableHighWaterMark; + +export const readableLengthImpl = (r) => r.readableLength; + +export const resumeImpl = (r) => r.resume(); + +export const pauseImpl = (r) => r.pause; + +export const isPausedImpl = (r) => r.isPaused; + +export const pipeImpl = (r, w) => r.pipe(w); + +export const pipeCbImpl = (r, w, cb) => r.pipe(w, cb); + +export const unpipeAllImpl = (r) => r.unpipe(); + +export const unpipeImpl = (r, w) => r.unpipe(w); + +export const readImpl = (r) => r.read(); + +export const readSizeImpl = (r, size) => r.read(size); + +export const writeImpl = (w, buf) => w.write(buf); + +export const writeCbImpl = (w, buf) => w.write(buf); + +export const writeStringImpl = (w, str, enc) => w.write(str, enc); + +export const writeStringCbImpl = (w, str, enc, cb) => w.write(str, enc, cb); + +export const corkImpl = (w) => w.cork(); + +export const uncorkImpl = (w) => w.uncork(); + +export const setDefaultEncodingImpl = (w, enc) => w.setDefaultEncoding(enc); + +export const endCbImpl = (w, cb) => w.end(cb); + +export const endImpl = (w) => w.end(); + +export const writeableImpl = (w) => w.writeable; + +export const writeableEndedImpl = (w) => w.writeableEnded; + +export const writeableCorkedImpl = (w) => w.writeableCorked; + +export const erroredImpl = (w) => w.errored; + +export const writeableFinishedImpl = (w) => w.writeableFinished; + +export const writeableHighWaterMarkImpl = (w) => w.writeableHighWaterMark; + +export const writeableLengthImpl = (w) => w.writeableLength; + +export const writeableNeedDrainImpl = (w) => w.writeableNeedDrain; + +export const destroyImpl = (w) => w.destroy(); + +export const destroyErrorImpl = (w, e) => w.destroy(e); + +export const closedImpl = (w) => w.closed; + +export const destroyedImpl = (w) => w.destroyed; + +export const allowHalfOpenImpl = (d) => d.allowHalfOpen; + +export const pipelineImpl = (src, transforms, dst, cb) => stream.pipeline([src, ...transforms, dst], cb); + +export const readableFromStrImpl = (str) => stream.Readable.from(str, { objectMode: false }); + +export const readableFromBufImpl = (buf) => stream.Readable.from(buf, { objectMode: false }); + +export const newPassThrough = () => new stream.PassThrough({ objectMode: false }); diff --git a/src/Node/Stream.purs b/src/Node/Stream.purs index 1f0c470..22b7a8a 100644 --- a/src/Node/Stream.purs +++ b/src/Node/Stream.purs @@ -1,51 +1,90 @@ --- | This module provides a low-level wrapper for the [Node Stream API](https://nodejs.org/api/stream.html). +-- | This module provides a low-level wrapper for the [Node Stream API (v16 LTS)](https://nodejs.org/docs/latest-v16.x/api/stream.html). module Node.Stream - ( Stream() - , Read() - , Readable() - , Write() - , Writable() - , Duplex() - , onData - , onDataString - , onDataEither - , setEncoding - , onReadable - , onEnd - , onFinish - , onClose - , onError + ( Read + , Write + , Stream + , Readable + , Writable + , Duplex + , toEventEmitter + , closeH + , errorH + , drainH + , finishH + , pipeH + , unpipeH + , Chunk + , dataH + , dataHStr + , dataHEither + , pauseH + , readableH + , resumeH + , endH + , readable + , readableEnded + , readableFlowing + , readableHighWaterMark + , readableLength , resume , pause , isPaused , pipe + , pipe' , unpipe , unpipeAll , read + , read' , readString - , readEither + , readString' + , writeable + , writeableEnded + , writeableCorked + , errored + , writeableFinished + , writeableHighWaterMark + , writeableLength + , writeableNeedDrain , write + , write_ + , write' + , write'_ , writeString + , writeString_ + , writeString' + , writeString'_ , cork , uncork + , setEncoding , setDefaultEncoding , end + , end' , destroy - , destroyWithError + , destroy' + , closed + , destroyed + , allowHalfOpen + , pipeline + , fromString + , fromBuffer + , newPassThrough ) where import Prelude -import Effect (Effect) -import Effect.Exception (throw, Error) import Data.Either (Either(..)) -import Data.Maybe (Maybe(..), fromMaybe) +import Data.Maybe (Maybe(..)) +import Data.Nullable (Nullable, toMaybe) +import Effect (Effect) +import Effect.Exception (Error, throw) +import Effect.Uncurried (EffectFn1, EffectFn2, EffectFn3, EffectFn4, mkEffectFn1, runEffectFn1, runEffectFn2, runEffectFn3, runEffectFn4) import Node.Buffer (Buffer) -import Data.Nullable as N -import Effect.Uncurried (EffectFn1, mkEffectFn1) import Node.Buffer as Buffer -import Node.Encoding (Encoding) +import Node.Encoding (Encoding, encodingToNode) +import Node.EventEmitter (EventEmitter, EventHandle(..)) +import Node.EventEmitter.UtilTypes (EventHandle1, EventHandle0) +import Unsafe.Coerce (unsafeCoerce) -- | A stream. -- | @@ -70,233 +109,314 @@ type Writable r = Stream (write :: Write | r) -- | A duplex (readable _and_ writable stream) type Duplex = Stream (read :: Read, write :: Write) -foreign import undefined :: forall a. a +toEventEmitter :: forall rw. Stream rw -> EventEmitter +toEventEmitter = unsafeCoerce +-- | Internal type. This should not be used by end-users. foreign import data Chunk :: Type foreign import readChunkImpl - :: (forall l r. l -> Either l r) - -> (forall l r. r -> Either l r) - -> Chunk - -> Either String Buffer - -readChunk :: Chunk -> Either String Buffer -readChunk = readChunkImpl Left Right - --- | Listen for `data` events, returning data in a Buffer. Note that this will fail --- | if `setEncoding` has been called on the stream. -onData - :: forall w - . Readable w - -> (Buffer -> Effect Unit) - -> Effect Unit -onData r cb = - onDataEither r (cb <=< fromEither) - where - fromEither x = - case x of - Left _ -> - throw "Stream encoding should not be set" - Right buf -> - pure buf - -read - :: forall w - . Readable w - -> Maybe Int - -> Effect (Maybe Buffer) -read r size = do - v <- readEither r size - case v of - Nothing -> pure Nothing - Just (Left _) -> throw "Stream encoding should not be set" - Just (Right b) -> pure (Just b) - + :: forall r + . EffectFn3 + (EffectFn1 Buffer r) + (EffectFn1 String r) + Chunk + r + +-- | Listen for `data` events on a stream where `setEncoding` has NOT been called, returning data as a Buffer. +-- | If `setEncoding` HAS been called on the stream, this will throw an error. +-- | +-- | This is likely the handler you want to use for converting a `Stream` into a `String`: +-- | ``` +-- | let useStringCb = ... +-- | ref <- Ref.new +-- | stream # on dataH \buf -> +-- | Ref.modify_ (\ref' -> Array.snoc ref' buf) ref +-- | stream # on endH do +-- | bufs <- Ref.read ref +-- | useStringCb $ Buffer.toString UTF8 $ Buffer.concat bufs +-- | ``` +dataH :: forall w. EventHandle (Readable w) (Buffer -> Effect Unit) (EffectFn1 Chunk Unit) +dataH = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 cb) + (mkEffectFn1 \_ -> throw "Got a String, not a Buffer. Stream encoding should not be set") + chunk + +-- | Listen for `data` events on a stream where `setEncoding` has been called, returning data as a String. +-- | If `setEncoding` has NOT been called on the stream, this will throw an error. +dataHStr :: forall w. EventHandle (Readable w) (String -> Effect Unit) (EffectFn1 Chunk Unit) +dataHStr = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 \_ -> throw "Got a Buffer, not String. Stream encoding must be set to get a String.") + (mkEffectFn1 cb) + chunk + +-- | Listen for `data` events, returning data in a Buffer or String. This will work +-- | regardless of whether `setEncoding` has been called or not. +dataHEither :: forall w. EventHandle (Readable w) (Either String Buffer -> Effect Unit) (EffectFn1 Chunk Unit) +dataHEither = EventHandle "data" \cb -> + mkEffectFn1 \chunk -> do + runEffectFn3 + readChunkImpl + (mkEffectFn1 (cb <<< Right)) + (mkEffectFn1 (cb <<< Left)) + chunk + +-- | Note: this will fail if `setEncoding` has been called on the stream. +read :: forall w. Readable w -> Effect (Maybe Buffer) +read r = do + chunk <- runEffectFn1 readImpl r + case toMaybe chunk of + Nothing -> + pure Nothing + Just c -> + runEffectFn3 readChunkImpl + (mkEffectFn1 \buf -> pure $ Just buf) + (mkEffectFn1 \_ -> throw "Stream encoding should not be set") + c + +foreign import readImpl :: forall w. EffectFn1 (Readable w) (Nullable Chunk) + +-- | Note: this will fail if `setEncoding` has been called on the stream. +read' :: forall w. Readable w -> Int -> Effect (Maybe Buffer) +read' r size = do + chunk <- runEffectFn2 readSizeImpl r size + case toMaybe chunk of + Nothing -> + pure Nothing + Just c -> + runEffectFn3 readChunkImpl + (mkEffectFn1 \buf -> pure $ Just buf) + (mkEffectFn1 \_ -> throw "Stream encoding should not be set") + c + +foreign import readSizeImpl :: forall w. EffectFn2 (Readable w) (Int) (Nullable Chunk) + +-- | Note: this will fail if `setEncoding` has been called on the stream. readString :: forall w . Readable w - -> Maybe Int -> Encoding -> Effect (Maybe String) -readString r size enc = do - v <- readEither r size - case v of - Nothing -> pure Nothing - Just (Left _) -> throw "Stream encoding should not be set" - Just (Right buf) -> Just <$> Buffer.toString enc buf - -readEither +readString r enc = do + mbBuf <- read r + case mbBuf of + Nothing -> + pure Nothing + Just buf -> do + Just <$> Buffer.toString enc buf + +-- | Note: this will fail if `setEncoding` has been called on the stream. +readString' :: forall w . Readable w - -> Maybe Int - -> Effect (Maybe (Either String Buffer)) -readEither r size = readImpl readChunk Nothing Just r (fromMaybe undefined size) - -foreign import readImpl - :: forall r - . (Chunk -> Either String Buffer) - -> (forall a. Maybe a) - -> (forall a. a -> Maybe a) - -> Readable r -> Int - -> Effect (Maybe (Either String Buffer)) - --- | Listen for `data` events, returning data in a String, which will be --- | decoded using the given encoding. Note that this will fail if `setEncoding` --- | has been called on the stream. -onDataString - :: forall w - . Readable w -> Encoding - -> (String -> Effect Unit) - -> Effect Unit -onDataString r enc cb = onData r (cb <=< Buffer.toString enc) - --- | Listen for `data` events, returning data in an `Either String Buffer`. This --- | function is provided for the (hopefully rare) case that `setEncoding` has --- | been called on the stream. -onDataEither - :: forall r - . Readable r - -> (Either String Buffer -> Effect Unit) - -> Effect Unit -onDataEither r cb = onDataEitherImpl readChunk r cb - -foreign import onDataEitherImpl - :: forall r - . (Chunk -> Either String Buffer) - -> Readable r - -> (Either String Buffer -> Effect Unit) - -> Effect Unit - -foreign import setEncodingImpl - :: forall w - . Readable w - -> String - -> Effect Unit + -> Effect (Maybe String) +readString' r size enc = do + mbBuf <- read' r size + case mbBuf of + Nothing -> + pure Nothing + Just buf -> do + Just <$> Buffer.toString enc buf -- | Set the encoding used to read chunks as strings from the stream. This -- | function may be useful when you are passing a readable stream to some other -- | JavaScript library, which already expects an encoding to be set. --- | --- | Where possible, you should try to use `onDataString` instead of this --- | function. setEncoding :: forall w . Readable w -> Encoding -> Effect Unit -setEncoding r enc = setEncodingImpl r (show enc) +setEncoding r enc = runEffectFn2 setEncodingImpl r (show enc) --- | Listen for `readable` events. -foreign import onReadable - :: forall w - . Readable w - -> Effect Unit - -> Effect Unit +foreign import setEncodingImpl :: forall w. EffectFn2 (Readable w) String Unit --- | Listen for `end` events. -foreign import onEnd - :: forall w - . Readable w - -> Effect Unit - -> Effect Unit +closeH :: forall rw. EventHandle0 (Stream rw) +closeH = EventHandle "close" identity --- | Listen for `finish` events. -foreign import onFinish - :: forall w - . Writable w - -> Effect Unit - -> Effect Unit +errorH :: forall rw. EventHandle1 (Stream rw) Error +errorH = EventHandle "error" mkEffectFn1 --- | Listen for `close` events. -foreign import onClose - :: forall w - . Stream w - -> Effect Unit - -> Effect Unit +drainH :: forall r. EventHandle0 (Writable r) +drainH = EventHandle "drain" identity --- | Listen for `error` events. -foreign import onError - :: forall w - . Stream w - -> (Error -> Effect Unit) - -> Effect Unit +finishH :: forall r. EventHandle0 (Writable r) +finishH = EventHandle "finish" identity + +pipeH :: forall r w. EventHandle1 (Writable r) (Readable w) +pipeH = EventHandle "pipe" mkEffectFn1 + +unpipeH :: forall r w. EventHandle1 (Writable r) (Readable w) +unpipeH = EventHandle "unpipe" mkEffectFn1 + +pauseH :: forall w. EventHandle0 (Readable w) +pauseH = EventHandle "pause" identity + +readableH :: forall w. EventHandle0 (Readable w) +readableH = EventHandle "readable" identity + +resumeH :: forall w. EventHandle0 (Readable w) +resumeH = EventHandle "resume" identity + +endH :: forall w. EventHandle0 (Readable w) +endH = EventHandle "end" identity + +readable :: forall w. Readable w -> Effect Boolean +readable r = runEffectFn1 readableImpl r + +foreign import readableImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableEnded :: forall w. Readable w -> Effect Boolean +readableEnded r = runEffectFn1 readableEndedImpl r + +foreign import readableEndedImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableFlowing :: forall w. Readable w -> Effect Boolean +readableFlowing r = runEffectFn1 readableFlowingImpl r + +foreign import readableFlowingImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableHighWaterMark :: forall w. Readable w -> Effect Boolean +readableHighWaterMark r = runEffectFn1 readableHighWaterMarkImpl r + +foreign import readableHighWaterMarkImpl :: forall w. EffectFn1 (Readable w) (Boolean) + +readableLength :: forall w. Readable w -> Effect Boolean +readableLength r = runEffectFn1 readableLengthImpl r + +foreign import readableLengthImpl :: forall w. EffectFn1 (Readable w) (Boolean) -- | Resume reading from the stream. -foreign import resume :: forall w. Readable w -> Effect Unit +resume :: forall w. Readable w -> Effect Unit +resume r = runEffectFn1 resumeImpl r + +foreign import resumeImpl :: forall w. EffectFn1 (Readable w) (Unit) -- | Pause reading from the stream. -foreign import pause :: forall w. Readable w -> Effect Unit +pause :: forall w. Readable w -> Effect Unit +pause r = runEffectFn1 pauseImpl r + +foreign import pauseImpl :: forall w. EffectFn1 (Readable w) (Unit) -- | Check whether or not a stream is paused for reading. -foreign import isPaused :: forall w. Readable w -> Effect Boolean +isPaused :: forall w. Readable w -> Effect Boolean +isPaused r = runEffectFn1 isPausedImpl r + +foreign import isPausedImpl :: forall w. EffectFn1 (Readable w) (Boolean) -- | Read chunks from a readable stream and write them to a writable stream. -foreign import pipe - :: forall r w - . Readable w - -> Writable r - -> Effect (Writable r) +pipe :: forall w r. Readable w -> Writable r -> Effect Unit +pipe r w = runEffectFn2 pipeImpl r w + +foreign import pipeImpl :: forall w r. EffectFn2 (Readable w) (Writable r) (Unit) + +pipe' :: forall w r. Readable w -> Writable r -> { end :: Boolean } -> Effect Unit +pipe' r w o = runEffectFn3 pipeCbImpl r w o + +foreign import pipeCbImpl :: forall w r. EffectFn3 (Readable w) (Writable r) ({ end :: Boolean }) (Unit) -- | Detach a Writable stream previously attached using `pipe`. -foreign import unpipe - :: forall r w - . Readable w - -> Writable r - -> Effect Unit +unpipe :: forall w r. Readable w -> Writable r -> Effect Unit +unpipe r w = runEffectFn2 unpipeImpl r w + +foreign import unpipeImpl :: forall w r. EffectFn2 (Readable w) (Writable r) (Unit) -- | Detach all Writable streams previously attached using `pipe`. -foreign import unpipeAll - :: forall w - . Readable w - -> Effect Unit +unpipeAll :: forall w. Readable w -> Effect Unit +unpipeAll r = runEffectFn1 unpipeAllImpl r -foreign import writeImpl - :: forall r - . Writable r - -> Buffer - -> EffectFn1 (N.Nullable Error) Unit - -> Effect Boolean +foreign import unpipeAllImpl :: forall w. EffectFn1 (Readable w) (Unit) --- | Write a Buffer to a writable stream. -write - :: forall r - . Writable r - -> Buffer - -> (Maybe Error -> Effect Unit) - -> Effect Boolean -write w b cb = writeImpl w b $ mkEffectFn1 (cb <<< N.toMaybe) +writeable :: forall r. Writable r -> Effect Boolean +writeable w = runEffectFn1 writeableImpl w -foreign import writeStringImpl - :: forall r - . Writable r - -> String - -> String - -> EffectFn1 (N.Nullable Error) Unit - -> Effect Boolean +foreign import writeableImpl :: forall r. EffectFn1 (Writable r) (Boolean) --- | Write a string in the specified encoding to a writable stream. -writeString - :: forall r - . Writable r - -> Encoding - -> String - -> (Maybe Error -> Effect Unit) - -> Effect Boolean -writeString w enc s cb = writeStringImpl w (show enc) s $ mkEffectFn1 (cb <<< N.toMaybe) +writeableEnded :: forall r. Writable r -> Effect Boolean +writeableEnded w = runEffectFn1 writeableEndedImpl w + +foreign import writeableEndedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +writeableCorked :: forall r. Writable r -> Effect Boolean +writeableCorked w = runEffectFn1 writeableCorkedImpl w + +foreign import writeableCorkedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +errored :: forall rw. Stream rw -> Effect Boolean +errored rw = runEffectFn1 erroredImpl rw + +foreign import erroredImpl :: forall rw. EffectFn1 (Stream rw) (Boolean) + +writeableFinished :: forall r. Writable r -> Effect Boolean +writeableFinished w = runEffectFn1 writeableFinishedImpl w + +foreign import writeableFinishedImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +writeableHighWaterMark :: forall r. Writable r -> Effect Number +writeableHighWaterMark w = runEffectFn1 writeableHighWaterMarkImpl w + +foreign import writeableHighWaterMarkImpl :: forall r. EffectFn1 (Writable r) (Number) + +writeableLength :: forall r. Writable r -> Effect Number +writeableLength w = runEffectFn1 writeableLengthImpl w + +foreign import writeableLengthImpl :: forall r. EffectFn1 (Writable r) (Number) + +writeableNeedDrain :: forall r. Writable r -> Effect Boolean +writeableNeedDrain w = runEffectFn1 writeableNeedDrainImpl w + +foreign import writeableNeedDrainImpl :: forall r. EffectFn1 (Writable r) (Boolean) + +write :: forall r. Writable r -> Buffer -> Effect Boolean +write w b = runEffectFn2 writeImpl w b + +write_ :: forall r. Writable r -> Buffer -> Effect Unit +write_ w b = void $ write w b + +foreign import writeImpl :: forall r a. EffectFn2 (Writable r) (Buffer) (a) + +write' :: forall r. Writable r -> Buffer -> (Maybe Error -> Effect Unit) -> Effect Boolean +write' w b cb = runEffectFn3 writeCbImpl w b $ mkEffectFn1 \err -> cb (toMaybe err) + +write'_ :: forall r. Writable r -> Buffer -> (Maybe Error -> Effect Unit) -> Effect Unit +write'_ w b cb = void $ write' w b cb + +foreign import writeCbImpl :: forall r a. EffectFn3 (Writable r) (Buffer) (EffectFn1 (Nullable Error) Unit) (a) + +writeString :: forall r. Writable r -> Encoding -> String -> Effect Boolean +writeString w enc str = runEffectFn3 writeStringImpl w str (encodingToNode enc) + +writeString_ :: forall r. Writable r -> Encoding -> String -> Effect Unit +writeString_ w enc str = void $ writeString w enc str + +foreign import writeStringImpl :: forall r a. EffectFn3 (Writable r) (String) (String) (a) + +writeString' :: forall r. Writable r -> Encoding -> String -> (Maybe Error -> Effect Unit) -> Effect Boolean +writeString' w enc str cb = runEffectFn4 writeStringCbImpl w str (encodingToNode enc) $ mkEffectFn1 \err -> cb (toMaybe err) + +writeString'_ :: forall r. Writable r -> Encoding -> String -> (Maybe Error -> Effect Unit) -> Effect Unit +writeString'_ w enc str cb = void $ writeString' w enc str cb + +foreign import writeStringCbImpl :: forall r a. EffectFn4 (Writable r) (String) (String) (EffectFn1 (Nullable Error) Unit) (a) -- | Force buffering of writes. -foreign import cork :: forall r. Writable r -> Effect Unit +cork :: forall r. Writable r -> Effect Unit +cork s = runEffectFn1 corkImpl s + +foreign import corkImpl :: forall r. EffectFn1 (Writable r) (Unit) -- | Flush buffered data. -foreign import uncork :: forall r. Writable r -> Effect Unit +uncork :: forall r. Writable r -> Effect Unit +uncork w = runEffectFn1 uncorkImpl w -foreign import setDefaultEncodingImpl - :: forall r - . Writable r - -> String - -> Effect Unit +foreign import uncorkImpl :: forall r. EffectFn1 (Writable r) (Unit) -- | Set the default encoding used to write strings to the stream. This function -- | is useful when you are passing a writable stream to some other JavaScript @@ -308,35 +428,59 @@ setDefaultEncoding . Writable r -> Encoding -> Effect Unit -setDefaultEncoding r enc = setDefaultEncodingImpl r (show enc) +setDefaultEncoding r enc = runEffectFn2 setDefaultEncodingImpl r (show enc) -foreign import endImpl - :: forall r - . Writable r - -> EffectFn1 (N.Nullable Error) Unit - -> Effect Unit +foreign import setDefaultEncodingImpl :: forall r. EffectFn2 (Writable r) String Unit -- | End writing data to the stream. -end - :: forall r - . Writable r - -> (Maybe Error -> Effect Unit) - -> Effect Unit -end w cb = endImpl w $ mkEffectFn1 (cb <<< N.toMaybe) +end :: forall r. Writable r -> Effect Unit +end w = runEffectFn1 endImpl w --- | Destroy the stream. It will release any internal resources. --- --- Added in node 8.0. -foreign import destroy - :: forall r - . Stream r - -> Effect Unit +foreign import endImpl :: forall r. EffectFn1 (Writable r) (Unit) --- | Destroy the stream and emit 'error'. --- --- Added in node 8.0. -foreign import destroyWithError - :: forall r - . Stream r - -> Error - -> Effect Unit +end' :: forall r. Writable r -> (Maybe Error -> Effect Unit) -> Effect Unit +end' w cb = runEffectFn2 endCbImpl w $ mkEffectFn1 \err -> cb (toMaybe err) + +foreign import endCbImpl :: forall r. EffectFn2 (Writable r) (EffectFn1 (Nullable Error) Unit) (Unit) + +destroy :: forall r. Stream r -> Effect Unit +destroy w = runEffectFn1 destroyImpl w + +foreign import destroyImpl :: forall r. EffectFn1 (Stream r) (Unit) + +destroy' :: forall r. Stream r -> Error -> Effect Unit +destroy' w e = runEffectFn2 destroyErrorImpl w e + +foreign import destroyErrorImpl :: forall r. EffectFn2 (Stream r) (Error) Unit + +closed :: forall r. Stream r -> Effect Boolean +closed w = runEffectFn1 closedImpl w + +foreign import closedImpl :: forall r. EffectFn1 (Stream r) (Boolean) + +destroyed :: forall r. Stream r -> Effect Boolean +destroyed w = runEffectFn1 destroyedImpl w + +foreign import destroyedImpl :: forall r. EffectFn1 (Stream r) (Boolean) + +allowHalfOpen :: Duplex -> Effect Boolean +allowHalfOpen d = runEffectFn1 allowHalfOpenImpl d + +foreign import allowHalfOpenImpl :: EffectFn1 (Duplex) (Boolean) + +pipeline :: forall w r. Readable w -> Array Duplex -> Writable r -> (Error -> Effect Unit) -> Effect Unit +pipeline src transforms dest cb = runEffectFn4 pipelineImpl src transforms dest cb + +foreign import pipelineImpl :: forall w r. EffectFn4 (Readable w) (Array Duplex) (Writable r) ((Error -> Effect Unit)) (Unit) + +fromString :: String -> Effect (Readable ()) +fromString str = runEffectFn1 readableFromStrImpl str + +foreign import readableFromStrImpl :: EffectFn1 (String) (Readable ()) + +fromBuffer :: Buffer -> Effect (Readable ()) +fromBuffer buf = runEffectFn1 readableFromBufImpl buf + +foreign import readableFromBufImpl :: EffectFn1 (Buffer) (Readable ()) + +foreign import newPassThrough :: Effect Duplex diff --git a/test/Main.js b/test/Main.js index d623c00..ed4e635 100644 --- a/test/Main.js +++ b/test/Main.js @@ -1,26 +1 @@ -import { WritableStreamBuffer, ReadableStreamBuffer } from "stream-buffers"; -import { PassThrough } from "stream"; - -export function writableStreamBuffer() { - return new WritableStreamBuffer; -} - -export function getContentsAsString(w) { - return () => w.getContentsAsString("utf8"); -} - -export function readableStreamBuffer() { - return new ReadableStreamBuffer; -} - -export function putImpl(str) { - return enc => r => () => { - r.put(str, enc); - }; -} - export { createGzip, createGunzip } from "zlib"; - -export function passThrough() { - return new PassThrough; -} diff --git a/test/Main.purs b/test/Main.purs index 9d9a520..e577820 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -2,14 +2,14 @@ module Test.Main where import Prelude -import Data.Either (Either(..)) -import Data.Maybe (Maybe(..), fromJust, isJust, isNothing) +import Data.Maybe (fromJust, isJust, isNothing) import Effect (Effect) import Effect.Console (log) import Effect.Exception (error) import Node.Buffer as Buffer import Node.Encoding (Encoding(..)) -import Node.Stream (Duplex, Readable, Writable, destroyWithError, end, onData, onDataEither, onDataString, onError, onReadable, pipe, read, readString, setDefaultEncoding, setEncoding, writeString) +import Node.EventEmitter (on_) +import Node.Stream (Duplex, dataH, dataHStr, destroy', end, end', errorH, newPassThrough, pipe, read, readString, readableH, setDefaultEncoding, setEncoding, writeString'_, writeString_) import Partial.Unsafe (unsafePartial) import Test.Assert (assert, assert') @@ -17,28 +17,16 @@ assertEqual :: forall a. Show a => Eq a => a -> a -> Effect Unit assertEqual x y = assert' (show x <> " did not equal " <> show y) (x == y) - -foreign import writableStreamBuffer :: Effect (Writable ()) - -foreign import getContentsAsString :: forall r. Writable r -> Effect String - -foreign import readableStreamBuffer :: Effect (Readable ()) - -foreign import putImpl :: forall r. String -> String -> Readable r -> Effect Unit - -put :: forall r. String -> Encoding -> Readable r -> Effect Unit -put str enc = putImpl str (show enc) - -main :: Effect Boolean +main :: Effect Unit main = do log "setDefaultEncoding should not affect writing" - _ <- testSetDefaultEncoding + testSetDefaultEncoding log "setEncoding should not affect reading" testSetEncoding log "test pipe" - _ <- testPipe + testPipe log "test write" testWrite @@ -52,55 +40,54 @@ main = do testString :: String testString = "üöß💡" -testReads :: Effect Boolean +testReads :: Effect Unit testReads = do - _ <- testReadString + testReadString testReadBuf where - testReadString = do - sIn <- passThrough - v <- readString sIn Nothing UTF8 - assert (isNothing v) - - onReadable sIn do - str <- readString sIn Nothing UTF8 - assert (isJust str) - assertEqual (unsafePartial (fromJust str)) testString - pure unit - - writeString sIn UTF8 testString \_ -> do - pure unit - - testReadBuf = do - sIn <- passThrough - v <- read sIn Nothing - assert (isNothing v) - - onReadable sIn do - buf <- read sIn Nothing - assert (isJust buf) - _ <- assertEqual <$> (Buffer.toString UTF8 (unsafePartial (fromJust buf))) - <*> pure testString - pure unit - - writeString sIn UTF8 testString \_ -> do - pure unit - -testSetDefaultEncoding :: Effect Boolean + testReadString = do + sIn <- newPassThrough + v <- readString sIn UTF8 + assert (isNothing v) + + sIn # on_ readableH do + str <- readString sIn UTF8 + assert (isJust str) + assertEqual (unsafePartial (fromJust str)) testString + pure unit + + writeString_ sIn UTF8 testString + + testReadBuf = do + sIn <- newPassThrough + v <- read sIn + assert (isNothing v) + + sIn # on_ readableH do + buf <- read sIn + assert (isJust buf) + _ <- assertEqual <$> (Buffer.toString UTF8 (unsafePartial (fromJust buf))) + <*> pure testString + pure unit + + writeString_ sIn UTF8 testString + +testSetDefaultEncoding :: Effect Unit testSetDefaultEncoding = do - w1 <- writableStreamBuffer - _ <- check w1 + w1 <- newPassThrough + check w1 - w2 <- writableStreamBuffer + w2 <- newPassThrough setDefaultEncoding w2 UCS2 check w2 where check w = do - writeString w UTF8 testString \_ -> do - c <- getContentsAsString w - assertEqual testString c + w # on_ dataH \buf -> do + str <- Buffer.toString UTF8 buf + assertEqual testString str + writeString_ w UTF8 testString testSetEncoding :: Effect Unit testSetEncoding = do @@ -109,23 +96,23 @@ testSetEncoding = do check UCS2 where check enc = do - r1 <- readableStreamBuffer - put testString enc r1 + r1 <- newPassThrough + writeString_ r1 enc testString - r2 <- readableStreamBuffer - put testString enc r2 + r2 <- newPassThrough + writeString_ r1 enc testString setEncoding r2 enc - onData r1 \buf -> unsafePartial do - onDataEither r2 \(Left str) -> do - _ <- assertEqual <$> Buffer.toString enc buf <*> pure testString + r1 # on_ dataH \buf -> do + r2 # on_ dataHStr \str -> do + join $ assertEqual <$> Buffer.toString enc buf <*> pure testString assertEqual str testString -testPipe :: Effect Boolean +testPipe :: Effect Unit testPipe = do - sIn <- passThrough - sOut <- passThrough - zip <- createGzip + sIn <- newPassThrough + sOut <- newPassThrough + zip <- createGzip unzip <- createGunzip log "pipe 1" @@ -135,36 +122,32 @@ testPipe = do log "pipe 3" _ <- unzip `pipe` sOut - writeString sIn UTF8 testString \_ -> do - end sIn \_ -> do - onDataString sOut UTF8 \str -> do + writeString'_ sIn UTF8 testString \_ -> do + end' sIn \_ -> do + sOut # on_ dataH \buf -> do + str <- Buffer.toString UTF8 buf assertEqual str testString - foreign import createGzip :: Effect Duplex foreign import createGunzip :: Effect Duplex - --- | Create a PassThrough stream, which simply writes its input to its output. -foreign import passThrough :: Effect Duplex - testWrite :: Effect Unit testWrite = do hasError noError where hasError = do - w1 <- writableStreamBuffer - _ <- onError w1 (const $ pure unit) - void $ end w1 $ const $ pure unit - void $ writeString w1 UTF8 "msg" \err -> do + w1 <- newPassThrough + w1 # on_ errorH (const $ pure unit) + end w1 + writeString'_ w1 UTF8 "msg" \err -> do assert' "writeString - should have error" $ isJust err noError = do - w1 <- writableStreamBuffer - void $ writeString w1 UTF8 "msg1" \err -> do + w1 <- newPassThrough + writeString'_ w1 UTF8 "msg1" \err -> do assert' "writeString - should have no error" $ isNothing err - void $ end w1 (const $ pure unit) + end w1 testEnd :: Effect Unit testEnd = do @@ -172,14 +155,14 @@ testEnd = do noError where hasError = do - w1 <- writableStreamBuffer - _ <- onError w1 (const $ pure unit) - void $ writeString w1 UTF8 "msg" \_ -> do - _ <- destroyWithError w1 $ error "Problem" - end w1 \err -> do + w1 <- newPassThrough + w1 # on_ errorH (const $ pure unit) + writeString'_ w1 UTF8 "msg" \_ -> do + _ <- destroy' w1 $ error "Problem" + end' w1 \err -> do assert' "end - should have error" $ isJust err noError = do - w1 <- writableStreamBuffer - end w1 \err -> do + w1 <- newPassThrough + end' w1 \err -> do assert' "end - should have no error" $ isNothing err