From 9d5ba6989e977997ae424e405be626a126bb9f7a Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Fri, 7 Jul 2023 10:51:32 -0500 Subject: [PATCH 01/10] Copy over Node.Stream.Aff as-is --- src/Node/Stream/Aff.purs | 459 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 459 insertions(+) create mode 100644 src/Node/Stream/Aff.purs diff --git a/src/Node/Stream/Aff.purs b/src/Node/Stream/Aff.purs new file mode 100644 index 0000000..fc6d25d --- /dev/null +++ b/src/Node/Stream/Aff.purs @@ -0,0 +1,459 @@ +-- | Asynchronous I/O with the [*Node.js* Stream API](https://nodejs.org/docs/latest/api/stream.html). +-- | +-- | Open __file streams__ with +-- | [__Node.FS.Stream__](https://pursuit.purescript.org/packages/purescript-node-fs/docs/Node.FS.Stream). +-- | +-- | Open __process streams__ with +-- | [__Node.Process__](https://pursuit.purescript.org/packages/purescript-node-process/docs/Node.Process). +-- | +-- | All __I/O errors__ will be thrown through the `Aff` `MonadError` class +-- | instance. +-- | +-- | `Aff` __cancellation__ will clean up all *Node.js* event listeners. +-- | +-- | All of these `Aff` functions will prevent the *Node.js* __event loop__ from +-- | exiting until the `Aff` function completes. +-- | +-- | ## Reading +-- | +-- | #### Implementation +-- | +-- | The reading functions in this module all operate on a `Readable` stream +-- | in +-- | [“paused mode”](https://nodejs.org/docs/latest/api/stream.html#stream_two_reading_modes). +-- | +-- | Internally the reading functions use the +-- | [`readable.read([size])`](https://nodejs.org/docs/latest/api/stream.html#readablereadsize) +-- | function and are subject to the caveats of that function. +-- | +-- | #### Result Buffers +-- | +-- | The result of a reading function may be chunked into more than one `Buffer`. +-- | The `buffers` element of the result is an `Array Buffer` of what +-- | was read. +-- | To concatenate the result into a single `Buffer`, use +-- | [`Node.Buffer.concat :: Array Buffer -> m Buffer`](https://pursuit.purescript.org/packages/purescript-node-buffer/docs/Node.Buffer#t:MutableBuffer). +-- | +-- | ``` +-- | input :: Buffer +-- | <- liftEffect <<< concat <<< _.buffers =<< readSome stdin +-- | ``` +-- | +-- | To calculate the number of bytes read, use +-- | `Node.Buffer.size :: Buffer -> m Int`. +-- | +-- | ``` +-- | {buffers} :: Array Buffer <- readSome stdin +-- | bytesRead :: Int +-- | <- liftEffect $ Array.foldM (\a b -> (a+_) <$> size b) 0 buffers +-- | ``` +-- | +-- | #### Result `readagain` flag +-- | +-- | The `readagain` field of the result is a `Boolean` flag which +-- | is `true` if the stream has not reached End-Of-File (and also if the stream +-- | has not errored or been destroyed), so we know we can read again. +-- | If the flag is `false` then the stream is not `readable` +-- | no more bytes will ever be produced by the stream. +-- | +-- | Reading from an ended, closed, errored, or destroyed stream +-- | will complete immediately with `{buffers:[], readagain:false}`. +-- | +-- | The `readagain` flag will give the same answer as a +-- | subsequent call to `Internal.readable`. +-- | +-- | ## Writing +-- | +-- | #### Implementation +-- | +-- | The writing functions in this module all operate on a `Writeable` stream. +-- | +-- | Internally the writing functions will call the +-- | [`writable.write(chunk[, encoding][, callback])`](https://nodejs.org/docs/latest/api/stream.html#writablewritechunk-encoding-callback) +-- | function on each of the `Buffer`s, +-- | and will asychronously wait if there is “backpressure” from the stream. +-- | +-- | #### Result +-- | +-- | The writing functions will complete after all the data is flushed to the +-- | stream. +-- | +-- | If a write fails then it will `throwError` in the `Aff`. +module Node.Stream.Aff + ( readSome + , readAll + , readN + , write + , end + , toStringUTF8 + , fromStringUTF8 + ) where + +import Prelude + +import Control.Monad.Rec.Class (Step(..), tailRecM) +import Control.Monad.ST.Class (liftST) +import Data.Array as Array +import Data.Array.ST as Array.ST +import Data.Either (Either(..)) +import Data.Maybe (Maybe(..)) +import Effect (Effect, untilE) +import Effect.Aff (effectCanceler, error, makeAff, nonCanceler) +import Effect.Aff.Class (class MonadAff, liftAff) +import Effect.Class (class MonadEffect, liftEffect) +import Effect.Exception (catchException) +import Effect.Ref as Ref +import Node.Buffer (Buffer) +import Node.Buffer as Buffer +import Node.Encoding as Encoding +import Node.Stream (Readable, Writable) +import Node.Stream as Stream +import Node.Stream.Aff.Internal (onceClose, onceDrain, onceEnd, onceError, onceReadable, readable) + +-- | Wait until there is some data available from the stream, then read it. +-- | +-- | This function is useful for streams like __stdin__ which never +-- | reach End-Of-File. +readSome + :: forall m r + . MonadAff m + => Readable r + -> m { buffers :: Array Buffer, readagain :: Boolean } +readSome r = liftAff <<< makeAff $ \complete -> do + bufs <- liftST $ Array.ST.new + + removeError <- onceError r \err -> complete (Left err) + + removeClose <- onceClose r do + -- Don't error, instead return whatever we've read. + removeError + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + removeEnd <- onceEnd r do + removeError + removeClose + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + complete (Left err) + pure nonCanceler + + catchException cleanupRethrow do + ifM (readable r) + do + -- try to read right away. + untilE do + Stream.read r Nothing >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + + ret1 <- liftST $ Array.ST.unsafeFreeze bufs + readagain <- readable r + if readagain && Array.length ret1 == 0 then do + -- if still readable and we couldn't read anything right away, + -- then wait for the readable event. + -- “The 'readable' event will also be emitted once the end of the + -- stream data has been reached but before the 'end' event is emitted.” + -- if not readable then this was a zero-length Readable stream. + -- https://nodejs.org/api/stream.html#event-readable + removeReadable <- onceReadable r do + untilE do + Stream.read r Nothing >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + ret2 <- liftST $ Array.ST.unsafeFreeze bufs + removeError + removeClose + removeEnd + readagain2 <- readable r + complete (Right { buffers: ret2, readagain: readagain2 }) + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + removeReadable + -- else return what we read right away + else do + removeError + removeClose + removeEnd + complete (Right { buffers: ret1, readagain }) + pure nonCanceler + do + removeError + removeClose + removeEnd + complete (Right { buffers: [], readagain: false }) + pure nonCanceler + +-- | Read all data until the end of the stream. After completion the stream +-- | will no longer be `readable`. +-- | +-- | Note that __stdin__ will never end. +readAll + :: forall m r + . MonadAff m + => Readable r + -> m (Array Buffer) +readAll r = liftAff <<< makeAff $ \complete -> do + bufs <- liftST $ Array.ST.new + removeReadable <- Ref.new (pure unit :: Effect Unit) + + removeError <- onceError r \err -> do + join $ Ref.read removeReadable + complete (Left err) + + removeClose <- onceClose r do + -- Don't error, instead return whatever we've read. + removeError + join $ Ref.read removeReadable -- can 'close' be raised while waiting for 'readable'? Maybe? + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right ret) + + removeEnd <- onceEnd r do + removeError + removeClose + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right ret) + + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + complete (Left err) + pure nonCanceler + + -- try to read right away. + catchException cleanupRethrow do + ifM (readable r) + do + untilE do + Stream.read r Nothing >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + + -- then wait for the stream to be readable until the stream has ended. + let + waitToRead = do + removeReadable' <- onceReadable r do + -- “The 'readable' event will also be emitted once the end of the + -- stream data has been reached but before the 'end' event is emitted.” + untilE do + Stream.read r Nothing >>= case _ of + Nothing -> pure true + Just chunk -> do + _ <- liftST $ Array.ST.push chunk bufs + pure false + waitToRead -- this is not recursion + Ref.write removeReadable' removeReadable + + waitToRead + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + + do + removeError + removeClose + removeEnd + complete (Right []) + pure nonCanceler + +-- | Wait for *N* bytes to become available from the stream. +-- | +-- | If more than *N* bytes are available on the stream, then +-- | completes with *N* bytes and leaves the rest in the stream’s internal buffer. +-- | +-- | If the end of the stream is reached before *N* bytes are available, +-- | then completes with less than *N* bytes. +readN + :: forall m r + . MonadAff m + => Readable r + -> Int + -> m { buffers :: Array Buffer, readagain :: Boolean } +readN r n = liftAff <<< makeAff $ \complete -> + if n < 0 then complete (Left $ error "read bytes must be > 0") *> pure nonCanceler + else do + redRef <- Ref.new 0 + bufs <- liftST $ Array.ST.new + removeReadable <- Ref.new (pure unit :: Effect Unit) + + -- On error, we're not calling removeClose or removeEnd... maybe that's fine? + removeError <- onceError r \err -> do + join $ Ref.read removeReadable + complete (Left err) + + removeClose <- onceClose r do + -- Don't error, instead return whatever we've read. + removeError + join $ Ref.read removeReadable + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + removeEnd <- onceEnd r do + removeError + removeClose + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + complete (Left err) + pure nonCanceler + + -- try to read N bytes and then either return N bytes or run a continuation + tryToRead continuation = do + untilE do + red <- Ref.read redRef + -- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size + -- “If size bytes are not available to be read, null will be returned + -- unless the stream has ended, in which case all of the data remaining + -- in the internal buffer will be returned.” + Stream.read r (Just (n - red)) >>= case _ of + Nothing -> pure true + Just chunk -> do + _ <- liftST $ Array.ST.push chunk bufs + s <- Buffer.size chunk + red' <- Ref.modify (_ + s) redRef + if red' >= n then + pure true + else + pure false + red <- Ref.read redRef + if red >= n then do + removeError + removeClose + removeEnd + ret <- liftST $ Array.ST.unsafeFreeze bufs + readagain <- readable r + complete (Right { buffers: ret, readagain }) + else + continuation unit + + -- if there were not enough bytes right away, then wait for bytes to come in. + waitToRead _ = do + removeReadable' <- onceReadable r do + tryToRead waitToRead -- not recursion + Ref.write removeReadable' removeReadable + + catchException cleanupRethrow do + -- try to read right away. + ifM (readable r) + do + tryToRead waitToRead + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + do + removeError + removeClose + removeEnd + -- If the stream is not readable should that be a fail? No. + complete (Right { buffers: [], readagain: false }) + pure nonCanceler + +-- | Write to a stream. +-- | +-- | Will complete after the data is flushed to the stream. +write + :: forall m w + . MonadAff m + => Writable w + -> Array Buffer + -> m Unit +write w bs = liftAff <<< makeAff $ \complete -> do + removeDrain <- Ref.new (pure unit :: Effect Unit) + + let + oneWrite i' = flip tailRecM i' \i -> do + case Array.index bs i of + Nothing -> do + complete (Right unit) + pure (Done unit) + Just b -> do + -- “write … calls the supplied callback once the data has been fully handled. + -- If an error occurs, the callback will be called with the error + -- as its first argument. The callback is called asynchronously and + -- before 'error' is emitted.” + nobackpressure <- Stream.write w b $ case _ of + Nothing -> do + pure unit + Just err -> do + complete (Left err) + + if nobackpressure then do + pure (Loop (i + 1)) + else do + removeDrain' <- onceDrain w (oneWrite (i + 1)) + Ref.write removeDrain' removeDrain + pure (Done unit) + oneWrite 0 + + -- canceller might be called while waiting for `onceDrain` + pure $ effectCanceler do + join $ Ref.read removeDrain + +-- | Signal that no more data will be written to the `Writable`. Will complete +-- | after all data is written and flushed. +-- | +-- | When the `Writable` is an [__fs.WriteStream__](https://nodejs.org/api/fs.html#class-fswritestream) +-- | then this will close the file descriptor because +-- | +-- | > “If `autoClose` is set to true (default behavior) on `'error'` +-- | > or `'finish'` the file descriptor will be closed automatically.” +end + :: forall m w + . MonadAff m + => Writable w + -> m Unit +end w = liftAff <<< makeAff $ \complete -> do + Stream.end w $ case _ of + Nothing -> complete (Right unit) + Just err -> complete (Left err) + pure nonCanceler + +-- | Concatenate an `Array` of UTF-8 encoded `Buffer`s into a `String`. +-- | +-- | Example: +-- | +-- | ``` +-- | inputstring <- toStringUTF8 =<< readAll stream +-- | ``` +toStringUTF8 :: forall m. MonadEffect m => Array Buffer -> m String +toStringUTF8 bs = liftEffect $ Buffer.toString Encoding.UTF8 =<< Buffer.concat bs + +-- | Encode a `String` as an `Array` containing one UTF-8 encoded `Buffer`. +-- | +-- | Example: +-- | +-- | ``` +-- | write stream =<< fromStringUTF8 "outputstring" +-- | ``` +fromStringUTF8 :: forall m. MonadEffect m => String -> m (Array Buffer) +fromStringUTF8 s = liftEffect $ map pure $ Buffer.fromString s Encoding.UTF8 From 10c7e19471025d5cb83404b84f1e3457977b9b3d Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Fri, 7 Jul 2023 11:04:53 -0500 Subject: [PATCH 02/10] Update code for breaking changes --- src/Node/Stream/Aff.purs | 44 ++++++++++++++++++++-------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/Node/Stream/Aff.purs b/src/Node/Stream/Aff.purs index fc6d25d..fb45352 100644 --- a/src/Node/Stream/Aff.purs +++ b/src/Node/Stream/Aff.purs @@ -106,9 +106,9 @@ import Effect.Ref as Ref import Node.Buffer (Buffer) import Node.Buffer as Buffer import Node.Encoding as Encoding -import Node.Stream (Readable, Writable) +import Node.EventEmitter (once) +import Node.Stream (Readable, Writable, closeH, drainH, endH, errorH, readable, readableH) import Node.Stream as Stream -import Node.Stream.Aff.Internal (onceClose, onceDrain, onceEnd, onceError, onceReadable, readable) -- | Wait until there is some data available from the stream, then read it. -- | @@ -122,15 +122,15 @@ readSome readSome r = liftAff <<< makeAff $ \complete -> do bufs <- liftST $ Array.ST.new - removeError <- onceError r \err -> complete (Left err) + removeError <- r # once errorH \err -> complete (Left err) - removeClose <- onceClose r do + removeClose <- r # once closeH do -- Don't error, instead return whatever we've read. removeError ret <- liftST $ Array.ST.unsafeFreeze bufs complete (Right { buffers: ret, readagain: false }) - removeEnd <- onceEnd r do + removeEnd <- r # once endH do removeError removeClose ret <- liftST $ Array.ST.unsafeFreeze bufs @@ -149,7 +149,7 @@ readSome r = liftAff <<< makeAff $ \complete -> do do -- try to read right away. untilE do - Stream.read r Nothing >>= case _ of + Stream.read r >>= case _ of Nothing -> pure true Just chunk -> do void $ liftST $ Array.ST.push chunk bufs @@ -164,9 +164,9 @@ readSome r = liftAff <<< makeAff $ \complete -> do -- stream data has been reached but before the 'end' event is emitted.” -- if not readable then this was a zero-length Readable stream. -- https://nodejs.org/api/stream.html#event-readable - removeReadable <- onceReadable r do + removeReadable <- r # once readableH do untilE do - Stream.read r Nothing >>= case _ of + Stream.read r >>= case _ of Nothing -> pure true Just chunk -> do void $ liftST $ Array.ST.push chunk bufs @@ -210,18 +210,18 @@ readAll r = liftAff <<< makeAff $ \complete -> do bufs <- liftST $ Array.ST.new removeReadable <- Ref.new (pure unit :: Effect Unit) - removeError <- onceError r \err -> do + removeError <- r # once errorH \err -> do join $ Ref.read removeReadable complete (Left err) - removeClose <- onceClose r do + removeClose <- r # once closeH do -- Don't error, instead return whatever we've read. removeError join $ Ref.read removeReadable -- can 'close' be raised while waiting for 'readable'? Maybe? ret <- liftST $ Array.ST.unsafeFreeze bufs complete (Right ret) - removeEnd <- onceEnd r do + removeEnd <- r # once endH do removeError removeClose ret <- liftST $ Array.ST.unsafeFreeze bufs @@ -241,7 +241,7 @@ readAll r = liftAff <<< makeAff $ \complete -> do ifM (readable r) do untilE do - Stream.read r Nothing >>= case _ of + Stream.read r >>= case _ of Nothing -> pure true Just chunk -> do void $ liftST $ Array.ST.push chunk bufs @@ -250,11 +250,11 @@ readAll r = liftAff <<< makeAff $ \complete -> do -- then wait for the stream to be readable until the stream has ended. let waitToRead = do - removeReadable' <- onceReadable r do + removeReadable' <- r # once readableH do -- “The 'readable' event will also be emitted once the end of the -- stream data has been reached but before the 'end' event is emitted.” untilE do - Stream.read r Nothing >>= case _ of + Stream.read r >>= case _ of Nothing -> pure true Just chunk -> do _ <- liftST $ Array.ST.push chunk bufs @@ -298,18 +298,18 @@ readN r n = liftAff <<< makeAff $ \complete -> removeReadable <- Ref.new (pure unit :: Effect Unit) -- On error, we're not calling removeClose or removeEnd... maybe that's fine? - removeError <- onceError r \err -> do + removeError <- r # once errorH \err -> do join $ Ref.read removeReadable complete (Left err) - removeClose <- onceClose r do + removeClose <- r # once closeH do -- Don't error, instead return whatever we've read. removeError join $ Ref.read removeReadable ret <- liftST $ Array.ST.unsafeFreeze bufs complete (Right { buffers: ret, readagain: false }) - removeEnd <- onceEnd r do + removeEnd <- r # once endH do removeError removeClose ret <- liftST $ Array.ST.unsafeFreeze bufs @@ -332,7 +332,7 @@ readN r n = liftAff <<< makeAff $ \complete -> -- “If size bytes are not available to be read, null will be returned -- unless the stream has ended, in which case all of the data remaining -- in the internal buffer will be returned.” - Stream.read r (Just (n - red)) >>= case _ of + Stream.read' r (n - red) >>= case _ of Nothing -> pure true Just chunk -> do _ <- liftST $ Array.ST.push chunk bufs @@ -355,7 +355,7 @@ readN r n = liftAff <<< makeAff $ \complete -> -- if there were not enough bytes right away, then wait for bytes to come in. waitToRead _ = do - removeReadable' <- onceReadable r do + removeReadable' <- r # once readableH do tryToRead waitToRead -- not recursion Ref.write removeReadable' removeReadable @@ -401,7 +401,7 @@ write w bs = liftAff <<< makeAff $ \complete -> do -- If an error occurs, the callback will be called with the error -- as its first argument. The callback is called asynchronously and -- before 'error' is emitted.” - nobackpressure <- Stream.write w b $ case _ of + nobackpressure <- Stream.write' w b $ case _ of Nothing -> do pure unit Just err -> do @@ -410,7 +410,7 @@ write w bs = liftAff <<< makeAff $ \complete -> do if nobackpressure then do pure (Loop (i + 1)) else do - removeDrain' <- onceDrain w (oneWrite (i + 1)) + removeDrain' <- w # once drainH (oneWrite (i + 1)) Ref.write removeDrain' removeDrain pure (Done unit) oneWrite 0 @@ -433,7 +433,7 @@ end => Writable w -> m Unit end w = liftAff <<< makeAff $ \complete -> do - Stream.end w $ case _ of + Stream.end' w $ case _ of Nothing -> complete (Right unit) Just err -> complete (Left err) pure nonCanceler From 68960e34889224a44938588626d81e728b43d5b0 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Fri, 7 Jul 2023 11:22:09 -0500 Subject: [PATCH 03/10] Simplify usage of `readable` --- src/Node/Stream/Aff.purs | 374 +++++++++++++++++++-------------------- 1 file changed, 182 insertions(+), 192 deletions(-) diff --git a/src/Node/Stream/Aff.purs b/src/Node/Stream/Aff.purs index fb45352..00a2299 100644 --- a/src/Node/Stream/Aff.purs +++ b/src/Node/Stream/Aff.purs @@ -120,82 +120,79 @@ readSome => Readable r -> m { buffers :: Array Buffer, readagain :: Boolean } readSome r = liftAff <<< makeAff $ \complete -> do - bufs <- liftST $ Array.ST.new - - removeError <- r # once errorH \err -> complete (Left err) + isReadable <- readable r + if isReadable then do + bufs <- liftST $ Array.ST.new - removeClose <- r # once closeH do - -- Don't error, instead return whatever we've read. - removeError - ret <- liftST $ Array.ST.unsafeFreeze bufs - complete (Right { buffers: ret, readagain: false }) + removeError <- r # once errorH \err -> complete (Left err) - removeEnd <- r # once endH do - removeError - removeClose - ret <- liftST $ Array.ST.unsafeFreeze bufs - complete (Right { buffers: ret, readagain: false }) + removeClose <- r # once closeH do + -- Don't error, instead return whatever we've read. + removeError + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) - let - cleanupRethrow err = do + removeEnd <- r # once endH do removeError removeClose - removeEnd - complete (Left err) - pure nonCanceler + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) - catchException cleanupRethrow do - ifM (readable r) - do - -- try to read right away. - untilE do - Stream.read r >>= case _ of - Nothing -> pure true - Just chunk -> do - void $ liftST $ Array.ST.push chunk bufs - pure false - - ret1 <- liftST $ Array.ST.unsafeFreeze bufs - readagain <- readable r - if readagain && Array.length ret1 == 0 then do - -- if still readable and we couldn't read anything right away, - -- then wait for the readable event. - -- “The 'readable' event will also be emitted once the end of the - -- stream data has been reached but before the 'end' event is emitted.” - -- if not readable then this was a zero-length Readable stream. - -- https://nodejs.org/api/stream.html#event-readable - removeReadable <- r # once readableH do - untilE do - Stream.read r >>= case _ of - Nothing -> pure true - Just chunk -> do - void $ liftST $ Array.ST.push chunk bufs - pure false - ret2 <- liftST $ Array.ST.unsafeFreeze bufs - removeError - removeClose - removeEnd - readagain2 <- readable r - complete (Right { buffers: ret2, readagain: readagain2 }) - -- canceller might by called while waiting for `onceReadable` - pure $ effectCanceler do - removeError - removeClose - removeEnd - removeReadable - -- else return what we read right away - else do + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + complete (Left err) + pure nonCanceler + + catchException cleanupRethrow do + -- try to read right away. + untilE do + Stream.read r >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + + ret1 <- liftST $ Array.ST.unsafeFreeze bufs + readagain <- readable r + if readagain && Array.length ret1 == 0 then do + -- if still readable and we couldn't read anything right away, + -- then wait for the readable event. + -- “The 'readable' event will also be emitted once the end of the + -- stream data has been reached but before the 'end' event is emitted.” + -- if not readable then this was a zero-length Readable stream. + -- https://nodejs.org/api/stream.html#event-readable + removeReadable <- r # once readableH do + untilE do + Stream.read r >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + ret2 <- liftST $ Array.ST.unsafeFreeze bufs removeError removeClose removeEnd - complete (Right { buffers: ret1, readagain }) - pure nonCanceler - do + readagain2 <- readable r + complete (Right { buffers: ret2, readagain: readagain2 }) + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + removeReadable + -- else return what we read right away + else do removeError removeClose removeEnd - complete (Right { buffers: [], readagain: false }) + complete (Right { buffers: ret1, readagain }) pure nonCanceler + else do + complete (Right { buffers: [], readagain: false }) + pure nonCanceler -- | Read all data until the end of the stream. After completion the stream -- | will no longer be `readable`. @@ -207,76 +204,72 @@ readAll => Readable r -> m (Array Buffer) readAll r = liftAff <<< makeAff $ \complete -> do - bufs <- liftST $ Array.ST.new - removeReadable <- Ref.new (pure unit :: Effect Unit) - - removeError <- r # once errorH \err -> do - join $ Ref.read removeReadable - complete (Left err) - - removeClose <- r # once closeH do - -- Don't error, instead return whatever we've read. - removeError - join $ Ref.read removeReadable -- can 'close' be raised while waiting for 'readable'? Maybe? - ret <- liftST $ Array.ST.unsafeFreeze bufs - complete (Right ret) - - removeEnd <- r # once endH do - removeError - removeClose - ret <- liftST $ Array.ST.unsafeFreeze bufs - complete (Right ret) + isReadable <- readable r + if isReadable then do + bufs <- liftST $ Array.ST.new + removeReadable <- Ref.new (pure unit :: Effect Unit) - let - cleanupRethrow err = do - removeError - removeClose - removeEnd + removeError <- r # once errorH \err -> do join $ Ref.read removeReadable complete (Left err) - pure nonCanceler - -- try to read right away. - catchException cleanupRethrow do - ifM (readable r) - do - untilE do - Stream.read r >>= case _ of - Nothing -> pure true - Just chunk -> do - void $ liftST $ Array.ST.push chunk bufs - pure false - - -- then wait for the stream to be readable until the stream has ended. - let - waitToRead = do - removeReadable' <- r # once readableH do - -- “The 'readable' event will also be emitted once the end of the - -- stream data has been reached but before the 'end' event is emitted.” - untilE do - Stream.read r >>= case _ of - Nothing -> pure true - Just chunk -> do - _ <- liftST $ Array.ST.push chunk bufs - pure false - waitToRead -- this is not recursion - Ref.write removeReadable' removeReadable - - waitToRead - -- canceller might by called while waiting for `onceReadable` - pure $ effectCanceler do - removeError - removeClose - removeEnd - join $ Ref.read removeReadable + removeClose <- r # once closeH do + -- Don't error, instead return whatever we've read. + removeError + join $ Ref.read removeReadable -- can 'close' be raised while waiting for 'readable'? Maybe? + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right ret) + + removeEnd <- r # once endH do + removeError + removeClose + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right ret) - do + let + cleanupRethrow err = do removeError removeClose removeEnd - complete (Right []) + join $ Ref.read removeReadable + complete (Left err) pure nonCanceler + -- try to read right away. + catchException cleanupRethrow do + untilE do + Stream.read r >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + + -- then wait for the stream to be readable until the stream has ended. + let + waitToRead = do + removeReadable' <- r # once readableH do + -- “The 'readable' event will also be emitted once the end of the + -- stream data has been reached but before the 'end' event is emitted.” + untilE do + Stream.read r >>= case _ of + Nothing -> pure true + Just chunk -> do + _ <- liftST $ Array.ST.push chunk bufs + pure false + waitToRead -- this is not recursion + Ref.write removeReadable' removeReadable + + waitToRead + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + else do + complete (Right []) + pure nonCanceler + -- | Wait for *N* bytes to become available from the stream. -- | -- | If more than *N* bytes are available on the stream, then @@ -293,90 +286,87 @@ readN readN r n = liftAff <<< makeAff $ \complete -> if n < 0 then complete (Left $ error "read bytes must be > 0") *> pure nonCanceler else do - redRef <- Ref.new 0 - bufs <- liftST $ Array.ST.new - removeReadable <- Ref.new (pure unit :: Effect Unit) - - -- On error, we're not calling removeClose or removeEnd... maybe that's fine? - removeError <- r # once errorH \err -> do - join $ Ref.read removeReadable - complete (Left err) - - removeClose <- r # once closeH do - -- Don't error, instead return whatever we've read. - removeError - join $ Ref.read removeReadable - ret <- liftST $ Array.ST.unsafeFreeze bufs - complete (Right { buffers: ret, readagain: false }) + isReadable <- readable r + if isReadable then do + redRef <- Ref.new 0 + bufs <- liftST $ Array.ST.new + removeReadable <- Ref.new (pure unit :: Effect Unit) + + -- On error, we're not calling removeClose or removeEnd... maybe that's fine? + removeError <- r # once errorH \err -> do + join $ Ref.read removeReadable + complete (Left err) - removeEnd <- r # once endH do - removeError - removeClose - ret <- liftST $ Array.ST.unsafeFreeze bufs - complete (Right { buffers: ret, readagain: false }) + removeClose <- r # once closeH do + -- Don't error, instead return whatever we've read. + removeError + join $ Ref.read removeReadable + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) - let - cleanupRethrow err = do + removeEnd <- r # once endH do removeError removeClose - removeEnd - join $ Ref.read removeReadable - complete (Left err) - pure nonCanceler + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) - -- try to read N bytes and then either return N bytes or run a continuation - tryToRead continuation = do - untilE do - red <- Ref.read redRef - -- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size - -- “If size bytes are not available to be read, null will be returned - -- unless the stream has ended, in which case all of the data remaining - -- in the internal buffer will be returned.” - Stream.read' r (n - red) >>= case _ of - Nothing -> pure true - Just chunk -> do - _ <- liftST $ Array.ST.push chunk bufs - s <- Buffer.size chunk - red' <- Ref.modify (_ + s) redRef - if red' >= n then - pure true - else - pure false - red <- Ref.read redRef - if red >= n then do + let + cleanupRethrow err = do removeError removeClose removeEnd - ret <- liftST $ Array.ST.unsafeFreeze bufs - readagain <- readable r - complete (Right { buffers: ret, readagain }) - else - continuation unit - - -- if there were not enough bytes right away, then wait for bytes to come in. - waitToRead _ = do - removeReadable' <- r # once readableH do - tryToRead waitToRead -- not recursion - Ref.write removeReadable' removeReadable + join $ Ref.read removeReadable + complete (Left err) + pure nonCanceler - catchException cleanupRethrow do - -- try to read right away. - ifM (readable r) - do - tryToRead waitToRead - -- canceller might by called while waiting for `onceReadable` - pure $ effectCanceler do + -- try to read N bytes and then either return N bytes or run a continuation + tryToRead continuation = do + untilE do + red <- Ref.read redRef + -- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size + -- “If size bytes are not available to be read, null will be returned + -- unless the stream has ended, in which case all of the data remaining + -- in the internal buffer will be returned.” + Stream.read' r (n - red) >>= case _ of + Nothing -> pure true + Just chunk -> do + _ <- liftST $ Array.ST.push chunk bufs + s <- Buffer.size chunk + red' <- Ref.modify (_ + s) redRef + if red' >= n then + pure true + else + pure false + red <- Ref.read redRef + if red >= n then do removeError removeClose removeEnd - join $ Ref.read removeReadable - do + ret <- liftST $ Array.ST.unsafeFreeze bufs + readagain <- readable r + complete (Right { buffers: ret, readagain }) + else + continuation unit + + -- if there were not enough bytes right away, then wait for bytes to come in. + waitToRead _ = do + removeReadable' <- r # once readableH do + tryToRead waitToRead -- not recursion + Ref.write removeReadable' removeReadable + + catchException cleanupRethrow do + -- try to read right away. + tryToRead waitToRead + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do removeError removeClose removeEnd - -- If the stream is not readable should that be a fail? No. - complete (Right { buffers: [], readagain: false }) - pure nonCanceler + join $ Ref.read removeReadable + else do + -- If the stream is not readable should that be a fail? No. + complete (Right { buffers: [], readagain: false }) + pure nonCanceler -- | Write to a stream. -- | From 438dd99b87551cf79694095abb1de0cf9fdc8dc7 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 8 Jul 2023 08:36:56 -0500 Subject: [PATCH 04/10] Copy over tests as-is, renaming Main.purs --- test/Main1.purs | 161 ++++++++++++++++++++++++++++++++++++++++++++++++ test/Main2.purs | 34 ++++++++++ test/Main3.purs | 56 +++++++++++++++++ test/Main4.purs | 42 +++++++++++++ 4 files changed, 293 insertions(+) create mode 100644 test/Main1.purs create mode 100644 test/Main2.purs create mode 100644 test/Main3.purs create mode 100644 test/Main4.purs diff --git a/test/Main1.purs b/test/Main1.purs new file mode 100644 index 0000000..b8d460a --- /dev/null +++ b/test/Main1.purs @@ -0,0 +1,161 @@ +-- | How to test: +-- | +-- | ``` +-- | spago -x spago-dev.dhall test +-- | ``` +-- | +-- | We want to read from a file, not stdin, because stdin has no EOF. +module Test.Main1 where + +import Prelude + +import Control.Parallel (parSequence_) +import Data.Array ((..)) +import Data.Array as Array +import Data.Foldable (for_) +import Data.Maybe (Maybe(..)) +import Effect (Effect) +import Effect.Aff (Aff, Milliseconds(..), launchAff_) +import Effect.Class (liftEffect) +import Node.Buffer (Buffer, concat) +import Node.Buffer as Buffer +import Node.FS.Stream (createReadStream, createWriteStream) +import Node.Stream (destroy) +import Node.Stream.Aff (end, fromStringUTF8, readAll, readN, readSome, toStringUTF8, write) +import Node.Stream.Aff.Internal (newReadableStringUTF8, newStreamPassThrough) +import Partial.Unsafe (unsafePartial) +import Test.Spec (describe, it) +import Test.Spec.Assertions (expectError, shouldEqual) +import Test.Spec.Reporter (consoleReporter) +import Test.Spec.Runner (defaultConfig, runSpec') + +main :: Effect Unit +main = unsafePartial $ do + launchAff_ do + runSpec' (defaultConfig { timeout = Just (Milliseconds 40000.0) }) [ consoleReporter ] do + describe "Node.Stream.Aff" do + it "PassThrough" do + s <- newStreamPassThrough + _ <- write s =<< fromStringUTF8 "test" + end s + b1 <- toStringUTF8 =<< readAll s + shouldEqual b1 "test" + it "overflow PassThrough" do + s <- newStreamPassThrough + let magnitude = 10000 + [ outstring ] <- fromStringUTF8 "aaaaaaaaaa" + parSequence_ + [ write s $ Array.replicate magnitude outstring + , void $ readSome s + ] + it "reads from a zero-length Readable" do + r <- newReadableStringUTF8 "" + -- readSome should return readagain false + shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome r + shouldEqual "" =<< toStringUTF8 =<< readAll r + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN r 0 + it "readN cleans up event handlers" do + s <- newReadableStringUTF8 "" + for_ (0 .. 100) \_ -> void $ readN s 0 + it "readSome cleans up event handlers" do + s <- newReadableStringUTF8 "" + for_ (0 .. 100) \_ -> void $ readSome s + it "readAll cleans up event handlers" do + s <- newReadableStringUTF8 "" + for_ (0 .. 100) \_ -> void $ readAll s + it "write cleans up event handlers" do + s <- newStreamPassThrough + [ b ] <- fromStringUTF8 "x" + for_ (0 .. 100) \_ -> void $ write s [ b ] + it "readSome from PassThrough" do + s <- newStreamPassThrough + write s =<< fromStringUTF8 "test" + end s + -- The first readSome readagain will be true, that's not good + shouldEqual { buffers: "test", readagain: true } =<< toStringBuffers =<< readSome s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + it "readSome from PassThrough concurrent" do + s <- newStreamPassThrough + parSequence_ + [ do + shouldEqual { buffers: "test", readagain: true } =<< toStringBuffers =<< readSome s + -- This is rediculous behavior + shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + , do + write s =<< fromStringUTF8 "test" + end s + ] + it "readAll from PassThrough concurrent" do + s <- newStreamPassThrough + parSequence_ + [ do + shouldEqual "test" =<< toStringUTF8 =<< readAll s + , do + write s =<< fromStringUTF8 "test" + end s + ] + it "readAll from empty PassThrough concurrent" do + s <- newStreamPassThrough + parSequence_ + [ shouldEqual "" =<< toStringUTF8 =<< readAll s + , end s + ] + it "readSome from destroyed PassThrough" do + s <- newStreamPassThrough + liftEffect $ destroy s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + it "readSome from destroyed PassThrough concurrent" do + s <- newStreamPassThrough + parSequence_ + [ shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + , liftEffect $ destroy s + ] + it "readAll from destroyed PassThrough concurrent " do + s <- newStreamPassThrough + parSequence_ + [ shouldEqual "" =<< toStringUTF8 =<< readAll s + , liftEffect $ destroy s + ] + it "readN from destroyed PassThrough concurrent " do + s <- newStreamPassThrough + parSequence_ + [ shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN s 1 + , liftEffect $ destroy s + ] + it "write to destroyed PassThrough" do + s <- newStreamPassThrough + liftEffect $ destroy s + expectError $ write s =<< fromStringUTF8 "test" + it "writes and reads to file" do + let outfilename = "/tmp/test1.txt" + let magnitude = 100000 + outfile <- liftEffect $ createWriteStream outfilename + [ outstring ] <- fromStringUTF8 "aaaaaaaaaa" + write outfile $ Array.replicate magnitude outstring + infile <- liftEffect $ createReadStream outfilename + { buffers: input1 } <- readSome infile + { buffers: input2 } <- readN infile (5 * magnitude) + input3 <- readAll infile + _ :: Buffer <- liftEffect <<< concat <<< _.buffers =<< readSome infile + void $ readN infile 1 + void $ readAll infile + let inputs = input1 <> input2 <> input3 + input :: Buffer <- liftEffect $ concat inputs + inputSize <- liftEffect $ Buffer.size input + shouldEqual inputSize (10 * magnitude) + it "writes and closes file" do + let outfilename = "/tmp/test2.txt" + outfile <- liftEffect $ createWriteStream outfilename + write outfile =<< fromStringUTF8 "test" + end outfile + expectError $ write outfile =<< fromStringUTF8 "test2" + + pure unit + +toStringBuffers + :: { buffers :: Array Buffer, readagain :: Boolean } + -> Aff { buffers :: String, readagain :: Boolean } +toStringBuffers { buffers, readagain } = do + buffers' <- toStringUTF8 buffers + pure { buffers: buffers', readagain } diff --git a/test/Main2.purs b/test/Main2.purs new file mode 100644 index 0000000..0d4cf15 --- /dev/null +++ b/test/Main2.purs @@ -0,0 +1,34 @@ +-- | How to test: +-- | +-- | ``` +-- | spago -x spago-dev.dhall test --main Test2 | wc -c +-- | ``` +module Test2 where + +import Prelude + +import Data.Array as Array +import Data.Either (Either(..)) +import Effect (Effect) +import Effect.Aff (Error, runAff_) +import Effect.Class (liftEffect) +import Effect.Class.Console as Console +import Node.Buffer as Buffer +import Node.Encoding (Encoding(..)) +import Node.Process (stdout) +import Node.Stream.Aff (write) +import Partial.Unsafe (unsafePartial) +import Unsafe.Coerce (unsafeCoerce) + +completion :: Either Error (Effect Unit) -> Effect Unit +completion = case _ of + Left e -> Console.error (unsafeCoerce e) + Right f -> f + +main :: Effect Unit +main = unsafePartial $ do + runAff_ completion do + do + b <- liftEffect $ Buffer.fromString "aaaaaaaaaa" UTF8 + write stdout $ Array.replicate 100000 b + pure (pure unit) diff --git a/test/Main3.purs b/test/Main3.purs new file mode 100644 index 0000000..b095eb0 --- /dev/null +++ b/test/Main3.purs @@ -0,0 +1,56 @@ +-- | How to test: +-- | +-- | ``` +-- | spago -x spago-dev.dhall test --main Test3 --exec-args <(head --bytes 1000000 /dev/zero) +-- | ``` +module Test3 where + +import Prelude + +import Data.Array as Array +import Data.Either (Either(..)) +import Effect (Effect) +import Effect.Aff (Error, runAff_) +import Effect.Class (liftEffect) +import Effect.Class.Console as Console +import Node.Buffer (Buffer, concat) +import Node.Buffer as Buffer +import Node.FS.Stream (createReadStream) +import Node.Process (argv) +import Node.Stream.Aff (readAll, readN, readSome) +import Partial.Unsafe (unsafePartial) +import Test.Spec (describe, it) +import Test.Spec.Assertions (shouldEqual) +import Test.Spec.Reporter (consoleReporter) +import Test.Spec.Runner (runSpec) +import Unsafe.Coerce (unsafeCoerce) + +completion :: Either Error (Effect Unit) -> Effect Unit +completion = case _ of + Left e -> Console.error (unsafeCoerce e) + Right f -> f + +main :: Effect Unit +main = unsafePartial $ do + runAff_ completion do + runSpec [ consoleReporter ] do + describe "Node.Stream.Aff" do + it "reads 1" do + infile <- liftEffect $ createReadStream =<< pure <<< flip Array.unsafeIndex 2 =<< argv + { buffers: inputs1 } <- readN infile 500000 + bytesRead1 :: Int <- liftEffect $ Array.foldM (\a b -> (a + _) <$> Buffer.size b) 0 inputs1 + shouldEqual 500000 bytesRead1 + { buffers: inputs2 } <- readSome infile + inputs3 <- readAll infile + let inputs = inputs1 <> inputs2 <> inputs3 + -- TODO read after EOF will hang + -- inputs4 <- readAll infile + -- inputs4 <- readSome infile + -- inputs4 <- readN infile 10 + -- let inputs = inputs1 <> inputs2 <> inputs3 <> inputs4 + bytesRead :: Int <- liftEffect $ Array.foldM (\a b -> (a + _) <$> Buffer.size b) 0 inputs + shouldEqual 1000000 bytesRead + input :: Buffer <- liftEffect $ concat inputs + inputSize <- liftEffect $ Buffer.size input + shouldEqual 1000000 inputSize + pure (pure unit) diff --git a/test/Main4.purs b/test/Main4.purs new file mode 100644 index 0000000..9f4b13d --- /dev/null +++ b/test/Main4.purs @@ -0,0 +1,42 @@ +-- | How to test: +-- | +-- | ``` +-- | spago -x spago-dev.dhall test --main Test4 +-- | ``` +-- | +-- | This is a test that `readSome` will prevent the Node.js event +-- | loop from exiting. +module Test4 where + +import Prelude + +import Control.Alt (alt) +import Data.Either (Either(..)) +import Effect (Effect) +import Effect.Aff (Error, Milliseconds(..), delay, parallel, runAff_, sequential) +import Effect.Class.Console as Console +import Node.Process (stdin) +import Node.Stream.Aff (readSome) +import Partial.Unsafe (unsafePartial) +import Test.Spec (describe, it) +import Test.Spec.Reporter (consoleReporter) +import Test.Spec.Runner (runSpec) +import Unsafe.Coerce (unsafeCoerce) + +completion :: Either Error (Effect Unit) -> Effect Unit +completion = case _ of + Left e -> Console.error (unsafeCoerce e) + Right f -> f + +main :: Effect Unit +main = unsafePartial $ do + runAff_ completion do + runSpec [ consoleReporter ] do + describe "Node.Stream.Aff" do + it "reads 1" do + sequential $ alt + do + parallel $ void $ readSome stdin + do + parallel $ delay (Milliseconds 500.0) + pure (pure unit) From 8401b986570ee09d33081f2caa11884dea31b15b Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 8 Jul 2023 08:43:56 -0500 Subject: [PATCH 05/10] Get tests to compile and pass --- bower.json | 3 ++- test/Main1.js | 4 ++++ test/Main1.purs | 44 +++++++++++++++++++++++--------------------- test/Main2.js | 3 +++ test/Main2.purs | 8 +++++--- test/Main3.js | 5 +++++ test/Main3.purs | 10 ++++++---- test/Main4.js | 3 +++ test/Main4.purs | 8 +++++--- 9 files changed, 56 insertions(+), 32 deletions(-) create mode 100644 test/Main1.js create mode 100644 test/Main2.js create mode 100644 test/Main3.js create mode 100644 test/Main4.js diff --git a/bower.json b/bower.json index c33fcac..643dd13 100644 --- a/bower.json +++ b/bower.json @@ -14,7 +14,8 @@ "devDependencies": { "purescript-assert": "^6.0.0", "purescript-console": "^6.0.0", - "purescript-partial": "^4.0.0" + "purescript-partial": "^4.0.0", + "purescript-spec": "^7.3.0" }, "dependencies": { "purescript-effect": "^4.0.0", diff --git a/test/Main1.js b/test/Main1.js new file mode 100644 index 0000000..4895ad6 --- /dev/null +++ b/test/Main1.js @@ -0,0 +1,4 @@ +import fs from "node:fs"; + +export const createReadStream = (filePath) => () => fs.createReadStream(filePath); +export const createWriteStream = (filePath) => () => fs.createWriteStream(filePath); diff --git a/test/Main1.purs b/test/Main1.purs index b8d460a..f0d38da 100644 --- a/test/Main1.purs +++ b/test/Main1.purs @@ -1,7 +1,7 @@ -- | How to test: -- | -- | ``` --- | spago -x spago-dev.dhall test +-- | pulp test --main Test.Main1 -- | ``` -- | -- | We want to read from a file, not stdin, because stdin has no EOF. @@ -19,29 +19,31 @@ import Effect.Aff (Aff, Milliseconds(..), launchAff_) import Effect.Class (liftEffect) import Node.Buffer (Buffer, concat) import Node.Buffer as Buffer -import Node.FS.Stream (createReadStream, createWriteStream) -import Node.Stream (destroy) +import Node.Stream (Readable, Writable, destroy, newPassThrough) +import Node.Stream as Stream import Node.Stream.Aff (end, fromStringUTF8, readAll, readN, readSome, toStringUTF8, write) -import Node.Stream.Aff.Internal (newReadableStringUTF8, newStreamPassThrough) import Partial.Unsafe (unsafePartial) import Test.Spec (describe, it) import Test.Spec.Assertions (expectError, shouldEqual) import Test.Spec.Reporter (consoleReporter) import Test.Spec.Runner (defaultConfig, runSpec') +foreign import createReadStream :: String -> Effect (Readable ()) +foreign import createWriteStream :: String -> Effect (Writable ()) + main :: Effect Unit main = unsafePartial $ do launchAff_ do runSpec' (defaultConfig { timeout = Just (Milliseconds 40000.0) }) [ consoleReporter ] do describe "Node.Stream.Aff" do it "PassThrough" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough _ <- write s =<< fromStringUTF8 "test" end s b1 <- toStringUTF8 =<< readAll s shouldEqual b1 "test" it "overflow PassThrough" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough let magnitude = 10000 [ outstring ] <- fromStringUTF8 "aaaaaaaaaa" parSequence_ @@ -49,33 +51,33 @@ main = unsafePartial $ do , void $ readSome s ] it "reads from a zero-length Readable" do - r <- newReadableStringUTF8 "" + r <- liftEffect $ Stream.fromString "" -- readSome should return readagain false shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome r shouldEqual "" =<< toStringUTF8 =<< readAll r shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN r 0 it "readN cleans up event handlers" do - s <- newReadableStringUTF8 "" + s <- liftEffect $ Stream.fromString "" for_ (0 .. 100) \_ -> void $ readN s 0 it "readSome cleans up event handlers" do - s <- newReadableStringUTF8 "" + s <- liftEffect $ Stream.fromString "" for_ (0 .. 100) \_ -> void $ readSome s it "readAll cleans up event handlers" do - s <- newReadableStringUTF8 "" + s <- liftEffect $ Stream.fromString "" for_ (0 .. 100) \_ -> void $ readAll s it "write cleans up event handlers" do - s <- newStreamPassThrough - [ b ] <- fromStringUTF8 "x" + s <- liftEffect $ newPassThrough + [ b ] <- liftEffect $ fromStringUTF8 "x" for_ (0 .. 100) \_ -> void $ write s [ b ] it "readSome from PassThrough" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough write s =<< fromStringUTF8 "test" end s -- The first readSome readagain will be true, that's not good shouldEqual { buffers: "test", readagain: true } =<< toStringBuffers =<< readSome s shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s it "readSome from PassThrough concurrent" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough parSequence_ [ do shouldEqual { buffers: "test", readagain: true } =<< toStringBuffers =<< readSome s @@ -87,7 +89,7 @@ main = unsafePartial $ do end s ] it "readAll from PassThrough concurrent" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough parSequence_ [ do shouldEqual "test" =<< toStringUTF8 =<< readAll s @@ -96,35 +98,35 @@ main = unsafePartial $ do end s ] it "readAll from empty PassThrough concurrent" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough parSequence_ [ shouldEqual "" =<< toStringUTF8 =<< readAll s , end s ] it "readSome from destroyed PassThrough" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough liftEffect $ destroy s shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s it "readSome from destroyed PassThrough concurrent" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough parSequence_ [ shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s , liftEffect $ destroy s ] it "readAll from destroyed PassThrough concurrent " do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough parSequence_ [ shouldEqual "" =<< toStringUTF8 =<< readAll s , liftEffect $ destroy s ] it "readN from destroyed PassThrough concurrent " do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough parSequence_ [ shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN s 1 , liftEffect $ destroy s ] it "write to destroyed PassThrough" do - s <- newStreamPassThrough + s <- liftEffect $ newPassThrough liftEffect $ destroy s expectError $ write s =<< fromStringUTF8 "test" it "writes and reads to file" do diff --git a/test/Main2.js b/test/Main2.js new file mode 100644 index 0000000..20bf893 --- /dev/null +++ b/test/Main2.js @@ -0,0 +1,3 @@ +import process from "node:process"; + +export const stdout = process.stdout; diff --git a/test/Main2.purs b/test/Main2.purs index 0d4cf15..4505aed 100644 --- a/test/Main2.purs +++ b/test/Main2.purs @@ -1,9 +1,9 @@ -- | How to test: -- | -- | ``` --- | spago -x spago-dev.dhall test --main Test2 | wc -c +-- | pulp test --main Test.Main2 | wc -c -- | ``` -module Test2 where +module Test.Main2 where import Prelude @@ -15,11 +15,13 @@ import Effect.Class (liftEffect) import Effect.Class.Console as Console import Node.Buffer as Buffer import Node.Encoding (Encoding(..)) -import Node.Process (stdout) +import Node.Stream (Writable) import Node.Stream.Aff (write) import Partial.Unsafe (unsafePartial) import Unsafe.Coerce (unsafeCoerce) +foreign import stdout :: Writable () + completion :: Either Error (Effect Unit) -> Effect Unit completion = case _ of Left e -> Console.error (unsafeCoerce e) diff --git a/test/Main3.js b/test/Main3.js new file mode 100644 index 0000000..b9a8b12 --- /dev/null +++ b/test/Main3.js @@ -0,0 +1,5 @@ +import fs from "node:fs"; +import process from "node:process"; + +export const createReadStream = (filePath) => () => fs.createReadStream(filePath); +export const argv = () => process.argv; diff --git a/test/Main3.purs b/test/Main3.purs index b095eb0..5aa8d6b 100644 --- a/test/Main3.purs +++ b/test/Main3.purs @@ -1,9 +1,9 @@ -- | How to test: -- | -- | ``` --- | spago -x spago-dev.dhall test --main Test3 --exec-args <(head --bytes 1000000 /dev/zero) +-- | pulp test --main Test.Main3 -- <(head --bytes 1000000 /dev/zero) -- | ``` -module Test3 where +module Test.Main3 where import Prelude @@ -15,8 +15,7 @@ import Effect.Class (liftEffect) import Effect.Class.Console as Console import Node.Buffer (Buffer, concat) import Node.Buffer as Buffer -import Node.FS.Stream (createReadStream) -import Node.Process (argv) +import Node.Stream (Readable) import Node.Stream.Aff (readAll, readN, readSome) import Partial.Unsafe (unsafePartial) import Test.Spec (describe, it) @@ -25,6 +24,9 @@ import Test.Spec.Reporter (consoleReporter) import Test.Spec.Runner (runSpec) import Unsafe.Coerce (unsafeCoerce) +foreign import createReadStream :: String -> Effect (Readable ()) +foreign import argv :: Effect (Array String) + completion :: Either Error (Effect Unit) -> Effect Unit completion = case _ of Left e -> Console.error (unsafeCoerce e) diff --git a/test/Main4.js b/test/Main4.js new file mode 100644 index 0000000..415152e --- /dev/null +++ b/test/Main4.js @@ -0,0 +1,3 @@ +import process from "node:process"; + +export const stdin = process.stdin; diff --git a/test/Main4.purs b/test/Main4.purs index 9f4b13d..2c702a1 100644 --- a/test/Main4.purs +++ b/test/Main4.purs @@ -1,12 +1,12 @@ -- | How to test: -- | -- | ``` --- | spago -x spago-dev.dhall test --main Test4 +-- | pulp test --main Test.Main4 -- | ``` -- | -- | This is a test that `readSome` will prevent the Node.js event -- | loop from exiting. -module Test4 where +module Test.Main4 where import Prelude @@ -15,7 +15,7 @@ import Data.Either (Either(..)) import Effect (Effect) import Effect.Aff (Error, Milliseconds(..), delay, parallel, runAff_, sequential) import Effect.Class.Console as Console -import Node.Process (stdin) +import Node.Stream (Readable) import Node.Stream.Aff (readSome) import Partial.Unsafe (unsafePartial) import Test.Spec (describe, it) @@ -23,6 +23,8 @@ import Test.Spec.Reporter (consoleReporter) import Test.Spec.Runner (runSpec) import Unsafe.Coerce (unsafeCoerce) +foreign import stdin :: Readable () + completion :: Either Error (Effect Unit) -> Effect Unit completion = case _ of Left e -> Console.error (unsafeCoerce e) From 94ba77f84d053d72aab2b0bd0df97de1c74242a5 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 8 Jul 2023 08:45:33 -0500 Subject: [PATCH 06/10] Update CI --- .github/workflows/ci.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fc3392b..439d0b2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,8 +33,12 @@ jobs: - name: Run tests run: | bower install - npm run-script test --if-present + pulp test + pulp test --main Test.Main1 + pulp test --main Test.Main2 | wc -c + pulp test --main Test.Main3 -- <(head --bytes 1000000 /dev/zero) + pulp test --main Test.Main4 - name: Check formatting run: | - purs-tidy check src test \ No newline at end of file + purs-tidy check src test From 410ed1aa675d272d3f1a1e5346a03b1b690aa870 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 8 Jul 2023 09:12:49 -0500 Subject: [PATCH 07/10] Format docs --- src/Node/Stream/Aff.purs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/Node/Stream/Aff.purs b/src/Node/Stream/Aff.purs index 00a2299..2902031 100644 --- a/src/Node/Stream/Aff.purs +++ b/src/Node/Stream/Aff.purs @@ -43,7 +43,7 @@ -- | `Node.Buffer.size :: Buffer -> m Int`. -- | -- | ``` --- | {buffers} :: Array Buffer <- readSome stdin +-- | { buffers } :: Array Buffer <- readSome stdin -- | bytesRead :: Int -- | <- liftEffect $ Array.foldM (\a b -> (a+_) <$> size b) 0 buffers -- | ``` @@ -53,14 +53,14 @@ -- | The `readagain` field of the result is a `Boolean` flag which -- | is `true` if the stream has not reached End-Of-File (and also if the stream -- | has not errored or been destroyed), so we know we can read again. --- | If the flag is `false` then the stream is not `readable` +-- | If the flag is `false` then the stream is not `readable`; -- | no more bytes will ever be produced by the stream. -- | -- | Reading from an ended, closed, errored, or destroyed stream --- | will complete immediately with `{buffers:[], readagain:false}`. +-- | will complete immediately with `{ buffers: [], readagain: false }`. -- | -- | The `readagain` flag will give the same answer as a --- | subsequent call to `Internal.readable`. +-- | subsequent call to `readable`. -- | -- | ## Writing -- | @@ -110,6 +110,7 @@ import Node.EventEmitter (once) import Node.Stream (Readable, Writable, closeH, drainH, endH, errorH, readable, readableH) import Node.Stream as Stream +-- | Works on streams in "paused" mode. -- | Wait until there is some data available from the stream, then read it. -- | -- | This function is useful for streams like __stdin__ which never @@ -194,6 +195,7 @@ readSome r = liftAff <<< makeAff $ \complete -> do complete (Right { buffers: [], readagain: false }) pure nonCanceler +-- | Works on streams in "paused" mode. -- | Read all data until the end of the stream. After completion the stream -- | will no longer be `readable`. -- | @@ -270,6 +272,7 @@ readAll r = liftAff <<< makeAff $ \complete -> do complete (Right []) pure nonCanceler +-- | Works on streams in "paused" mode. -- | Wait for *N* bytes to become available from the stream. -- | -- | If more than *N* bytes are available on the stream, then From ff6411c22371070508fde5d67c04a4c69425efe3 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 8 Jul 2023 09:12:55 -0500 Subject: [PATCH 08/10] Add changelog entry --- CHANGELOG.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ab09a0..0c55fd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,22 @@ New features: - pipeline - fromString, fromBuffer - newPassThrough +- Integrated `node-streams-aff` into library (#52 by @JordanMartinez) + + Convenience APIs added for readable streams in "paused" mode: + - readSome + - readAll + - readN + + Convenience APIs for writeable streams: + - write + - end + + Convenience APIs for converting `String`s from/to `Array Buffer` + - toStringUTF8 + - fromStringUTF8 + + The only APIs from the library not added were `newReadable` and `push`. Bugfixes: - Drop misleading comment for `setEncoding` (#51 by @JordanMartinez) @@ -65,6 +81,7 @@ Other improvements: - Refactor tests using `passThrough` streams (#49 by @JordanMartinez) - Updated FFI to use uncurried functions (#50 by @JordanMartinez) - Relocated `setEncoding`, `Read`, and `Write` for better locality in docs (#51 by @JordanMartinez) +- Added `node-streams-aff` tests (#52 by @JordanMartinez) ## [v7.0.0](https://github.com/purescript-node/purescript-node-streams/releases/tag/v7.0.0) - 2022-04-29 From 18245a63be41a78233990c299403806d2b412db2 Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 8 Jul 2023 09:16:36 -0500 Subject: [PATCH 09/10] Add missing dependency on aff --- bower.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bower.json b/bower.json index 643dd13..cd10b3e 100644 --- a/bower.json +++ b/bower.json @@ -24,6 +24,7 @@ "purescript-node-buffer": "^9.0.0", "purescript-nullable": "^6.0.0", "purescript-prelude": "^6.0.0", - "purescript-node-event-emitter": "https://github.com/purescript-node/purescript-node-event-emitter.git#^3.0.0" + "purescript-node-event-emitter": "https://github.com/purescript-node/purescript-node-event-emitter.git#^3.0.0", + "purescript-aff": "^7.1.0" } } From b3a873026e47d603e9ba0dfd84c411b34b2f350b Mon Sep 17 00:00:00 2001 From: Jordan Martinez Date: Sat, 8 Jul 2023 09:20:50 -0500 Subject: [PATCH 10/10] Update pulp to 16.0.2; fix CI usage of pulp --- .github/workflows/ci.yml | 10 +++++----- package.json | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 439d0b2..d9e3ae6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,11 +33,11 @@ jobs: - name: Run tests run: | bower install - pulp test - pulp test --main Test.Main1 - pulp test --main Test.Main2 | wc -c - pulp test --main Test.Main3 -- <(head --bytes 1000000 /dev/zero) - pulp test --main Test.Main4 + npx pulp test + npx pulp test --main Test.Main1 + npx pulp test --main Test.Main2 | wc -c + npx pulp test --main Test.Main3 -- <(head --bytes 1000000 /dev/zero) + npx pulp test --main Test.Main4 - name: Check formatting run: | diff --git a/package.json b/package.json index 9521b8a..14ab499 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ }, "devDependencies": { "eslint": "^7.15.0", - "pulp": "16.0.0-0", + "pulp": "^16.0.2", "purescript-psa": "^0.8.2", "rimraf": "^3.0.2" }