diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fc3392b..d9e3ae6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -33,8 +33,12 @@ jobs: - name: Run tests run: | bower install - npm run-script test --if-present + npx pulp test + npx pulp test --main Test.Main1 + npx pulp test --main Test.Main2 | wc -c + npx pulp test --main Test.Main3 -- <(head --bytes 1000000 /dev/zero) + npx pulp test --main Test.Main4 - name: Check formatting run: | - purs-tidy check src test \ No newline at end of file + purs-tidy check src test diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ab09a0..0c55fd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,22 @@ New features: - pipeline - fromString, fromBuffer - newPassThrough +- Integrated `node-streams-aff` into library (#52 by @JordanMartinez) + + Convenience APIs added for readable streams in "paused" mode: + - readSome + - readAll + - readN + + Convenience APIs for writeable streams: + - write + - end + + Convenience APIs for converting `String`s from/to `Array Buffer` + - toStringUTF8 + - fromStringUTF8 + + The only APIs from the library not added were `newReadable` and `push`. Bugfixes: - Drop misleading comment for `setEncoding` (#51 by @JordanMartinez) @@ -65,6 +81,7 @@ Other improvements: - Refactor tests using `passThrough` streams (#49 by @JordanMartinez) - Updated FFI to use uncurried functions (#50 by @JordanMartinez) - Relocated `setEncoding`, `Read`, and `Write` for better locality in docs (#51 by @JordanMartinez) +- Added `node-streams-aff` tests (#52 by @JordanMartinez) ## [v7.0.0](https://github.com/purescript-node/purescript-node-streams/releases/tag/v7.0.0) - 2022-04-29 diff --git a/bower.json b/bower.json index c33fcac..cd10b3e 100644 --- a/bower.json +++ b/bower.json @@ -14,7 +14,8 @@ "devDependencies": { "purescript-assert": "^6.0.0", "purescript-console": "^6.0.0", - "purescript-partial": "^4.0.0" + "purescript-partial": "^4.0.0", + "purescript-spec": "^7.3.0" }, "dependencies": { "purescript-effect": "^4.0.0", @@ -23,6 +24,7 @@ "purescript-node-buffer": "^9.0.0", "purescript-nullable": "^6.0.0", "purescript-prelude": "^6.0.0", - "purescript-node-event-emitter": "https://github.com/purescript-node/purescript-node-event-emitter.git#^3.0.0" + "purescript-node-event-emitter": "https://github.com/purescript-node/purescript-node-event-emitter.git#^3.0.0", + "purescript-aff": "^7.1.0" } } diff --git a/package.json b/package.json index 9521b8a..14ab499 100644 --- a/package.json +++ b/package.json @@ -9,7 +9,7 @@ }, "devDependencies": { "eslint": "^7.15.0", - "pulp": "16.0.0-0", + "pulp": "^16.0.2", "purescript-psa": "^0.8.2", "rimraf": "^3.0.2" } diff --git a/src/Node/Stream/Aff.purs b/src/Node/Stream/Aff.purs new file mode 100644 index 0000000..2902031 --- /dev/null +++ b/src/Node/Stream/Aff.purs @@ -0,0 +1,452 @@ +-- | Asynchronous I/O with the [*Node.js* Stream API](https://nodejs.org/docs/latest/api/stream.html). +-- | +-- | Open __file streams__ with +-- | [__Node.FS.Stream__](https://pursuit.purescript.org/packages/purescript-node-fs/docs/Node.FS.Stream). +-- | +-- | Open __process streams__ with +-- | [__Node.Process__](https://pursuit.purescript.org/packages/purescript-node-process/docs/Node.Process). +-- | +-- | All __I/O errors__ will be thrown through the `Aff` `MonadError` class +-- | instance. +-- | +-- | `Aff` __cancellation__ will clean up all *Node.js* event listeners. +-- | +-- | All of these `Aff` functions will prevent the *Node.js* __event loop__ from +-- | exiting until the `Aff` function completes. +-- | +-- | ## Reading +-- | +-- | #### Implementation +-- | +-- | The reading functions in this module all operate on a `Readable` stream +-- | in +-- | [“paused mode”](https://nodejs.org/docs/latest/api/stream.html#stream_two_reading_modes). +-- | +-- | Internally the reading functions use the +-- | [`readable.read([size])`](https://nodejs.org/docs/latest/api/stream.html#readablereadsize) +-- | function and are subject to the caveats of that function. +-- | +-- | #### Result Buffers +-- | +-- | The result of a reading function may be chunked into more than one `Buffer`. +-- | The `buffers` element of the result is an `Array Buffer` of what +-- | was read. +-- | To concatenate the result into a single `Buffer`, use +-- | [`Node.Buffer.concat :: Array Buffer -> m Buffer`](https://pursuit.purescript.org/packages/purescript-node-buffer/docs/Node.Buffer#t:MutableBuffer). +-- | +-- | ``` +-- | input :: Buffer +-- | <- liftEffect <<< concat <<< _.buffers =<< readSome stdin +-- | ``` +-- | +-- | To calculate the number of bytes read, use +-- | `Node.Buffer.size :: Buffer -> m Int`. +-- | +-- | ``` +-- | { buffers } :: Array Buffer <- readSome stdin +-- | bytesRead :: Int +-- | <- liftEffect $ Array.foldM (\a b -> (a+_) <$> size b) 0 buffers +-- | ``` +-- | +-- | #### Result `readagain` flag +-- | +-- | The `readagain` field of the result is a `Boolean` flag which +-- | is `true` if the stream has not reached End-Of-File (and also if the stream +-- | has not errored or been destroyed), so we know we can read again. +-- | If the flag is `false` then the stream is not `readable`; +-- | no more bytes will ever be produced by the stream. +-- | +-- | Reading from an ended, closed, errored, or destroyed stream +-- | will complete immediately with `{ buffers: [], readagain: false }`. +-- | +-- | The `readagain` flag will give the same answer as a +-- | subsequent call to `readable`. +-- | +-- | ## Writing +-- | +-- | #### Implementation +-- | +-- | The writing functions in this module all operate on a `Writeable` stream. +-- | +-- | Internally the writing functions will call the +-- | [`writable.write(chunk[, encoding][, callback])`](https://nodejs.org/docs/latest/api/stream.html#writablewritechunk-encoding-callback) +-- | function on each of the `Buffer`s, +-- | and will asychronously wait if there is “backpressure” from the stream. +-- | +-- | #### Result +-- | +-- | The writing functions will complete after all the data is flushed to the +-- | stream. +-- | +-- | If a write fails then it will `throwError` in the `Aff`. +module Node.Stream.Aff + ( readSome + , readAll + , readN + , write + , end + , toStringUTF8 + , fromStringUTF8 + ) where + +import Prelude + +import Control.Monad.Rec.Class (Step(..), tailRecM) +import Control.Monad.ST.Class (liftST) +import Data.Array as Array +import Data.Array.ST as Array.ST +import Data.Either (Either(..)) +import Data.Maybe (Maybe(..)) +import Effect (Effect, untilE) +import Effect.Aff (effectCanceler, error, makeAff, nonCanceler) +import Effect.Aff.Class (class MonadAff, liftAff) +import Effect.Class (class MonadEffect, liftEffect) +import Effect.Exception (catchException) +import Effect.Ref as Ref +import Node.Buffer (Buffer) +import Node.Buffer as Buffer +import Node.Encoding as Encoding +import Node.EventEmitter (once) +import Node.Stream (Readable, Writable, closeH, drainH, endH, errorH, readable, readableH) +import Node.Stream as Stream + +-- | Works on streams in "paused" mode. +-- | Wait until there is some data available from the stream, then read it. +-- | +-- | This function is useful for streams like __stdin__ which never +-- | reach End-Of-File. +readSome + :: forall m r + . MonadAff m + => Readable r + -> m { buffers :: Array Buffer, readagain :: Boolean } +readSome r = liftAff <<< makeAff $ \complete -> do + isReadable <- readable r + if isReadable then do + bufs <- liftST $ Array.ST.new + + removeError <- r # once errorH \err -> complete (Left err) + + removeClose <- r # once closeH do + -- Don't error, instead return whatever we've read. + removeError + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + removeEnd <- r # once endH do + removeError + removeClose + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + complete (Left err) + pure nonCanceler + + catchException cleanupRethrow do + -- try to read right away. + untilE do + Stream.read r >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + + ret1 <- liftST $ Array.ST.unsafeFreeze bufs + readagain <- readable r + if readagain && Array.length ret1 == 0 then do + -- if still readable and we couldn't read anything right away, + -- then wait for the readable event. + -- “The 'readable' event will also be emitted once the end of the + -- stream data has been reached but before the 'end' event is emitted.” + -- if not readable then this was a zero-length Readable stream. + -- https://nodejs.org/api/stream.html#event-readable + removeReadable <- r # once readableH do + untilE do + Stream.read r >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + ret2 <- liftST $ Array.ST.unsafeFreeze bufs + removeError + removeClose + removeEnd + readagain2 <- readable r + complete (Right { buffers: ret2, readagain: readagain2 }) + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + removeReadable + -- else return what we read right away + else do + removeError + removeClose + removeEnd + complete (Right { buffers: ret1, readagain }) + pure nonCanceler + else do + complete (Right { buffers: [], readagain: false }) + pure nonCanceler + +-- | Works on streams in "paused" mode. +-- | Read all data until the end of the stream. After completion the stream +-- | will no longer be `readable`. +-- | +-- | Note that __stdin__ will never end. +readAll + :: forall m r + . MonadAff m + => Readable r + -> m (Array Buffer) +readAll r = liftAff <<< makeAff $ \complete -> do + isReadable <- readable r + if isReadable then do + bufs <- liftST $ Array.ST.new + removeReadable <- Ref.new (pure unit :: Effect Unit) + + removeError <- r # once errorH \err -> do + join $ Ref.read removeReadable + complete (Left err) + + removeClose <- r # once closeH do + -- Don't error, instead return whatever we've read. + removeError + join $ Ref.read removeReadable -- can 'close' be raised while waiting for 'readable'? Maybe? + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right ret) + + removeEnd <- r # once endH do + removeError + removeClose + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right ret) + + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + complete (Left err) + pure nonCanceler + + -- try to read right away. + catchException cleanupRethrow do + untilE do + Stream.read r >>= case _ of + Nothing -> pure true + Just chunk -> do + void $ liftST $ Array.ST.push chunk bufs + pure false + + -- then wait for the stream to be readable until the stream has ended. + let + waitToRead = do + removeReadable' <- r # once readableH do + -- “The 'readable' event will also be emitted once the end of the + -- stream data has been reached but before the 'end' event is emitted.” + untilE do + Stream.read r >>= case _ of + Nothing -> pure true + Just chunk -> do + _ <- liftST $ Array.ST.push chunk bufs + pure false + waitToRead -- this is not recursion + Ref.write removeReadable' removeReadable + + waitToRead + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + else do + complete (Right []) + pure nonCanceler + +-- | Works on streams in "paused" mode. +-- | Wait for *N* bytes to become available from the stream. +-- | +-- | If more than *N* bytes are available on the stream, then +-- | completes with *N* bytes and leaves the rest in the stream’s internal buffer. +-- | +-- | If the end of the stream is reached before *N* bytes are available, +-- | then completes with less than *N* bytes. +readN + :: forall m r + . MonadAff m + => Readable r + -> Int + -> m { buffers :: Array Buffer, readagain :: Boolean } +readN r n = liftAff <<< makeAff $ \complete -> + if n < 0 then complete (Left $ error "read bytes must be > 0") *> pure nonCanceler + else do + isReadable <- readable r + if isReadable then do + redRef <- Ref.new 0 + bufs <- liftST $ Array.ST.new + removeReadable <- Ref.new (pure unit :: Effect Unit) + + -- On error, we're not calling removeClose or removeEnd... maybe that's fine? + removeError <- r # once errorH \err -> do + join $ Ref.read removeReadable + complete (Left err) + + removeClose <- r # once closeH do + -- Don't error, instead return whatever we've read. + removeError + join $ Ref.read removeReadable + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + removeEnd <- r # once endH do + removeError + removeClose + ret <- liftST $ Array.ST.unsafeFreeze bufs + complete (Right { buffers: ret, readagain: false }) + + let + cleanupRethrow err = do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + complete (Left err) + pure nonCanceler + + -- try to read N bytes and then either return N bytes or run a continuation + tryToRead continuation = do + untilE do + red <- Ref.read redRef + -- https://nodejs.org/docs/latest-v15.x/api/stream.html#stream_readable_read_size + -- “If size bytes are not available to be read, null will be returned + -- unless the stream has ended, in which case all of the data remaining + -- in the internal buffer will be returned.” + Stream.read' r (n - red) >>= case _ of + Nothing -> pure true + Just chunk -> do + _ <- liftST $ Array.ST.push chunk bufs + s <- Buffer.size chunk + red' <- Ref.modify (_ + s) redRef + if red' >= n then + pure true + else + pure false + red <- Ref.read redRef + if red >= n then do + removeError + removeClose + removeEnd + ret <- liftST $ Array.ST.unsafeFreeze bufs + readagain <- readable r + complete (Right { buffers: ret, readagain }) + else + continuation unit + + -- if there were not enough bytes right away, then wait for bytes to come in. + waitToRead _ = do + removeReadable' <- r # once readableH do + tryToRead waitToRead -- not recursion + Ref.write removeReadable' removeReadable + + catchException cleanupRethrow do + -- try to read right away. + tryToRead waitToRead + -- canceller might by called while waiting for `onceReadable` + pure $ effectCanceler do + removeError + removeClose + removeEnd + join $ Ref.read removeReadable + else do + -- If the stream is not readable should that be a fail? No. + complete (Right { buffers: [], readagain: false }) + pure nonCanceler + +-- | Write to a stream. +-- | +-- | Will complete after the data is flushed to the stream. +write + :: forall m w + . MonadAff m + => Writable w + -> Array Buffer + -> m Unit +write w bs = liftAff <<< makeAff $ \complete -> do + removeDrain <- Ref.new (pure unit :: Effect Unit) + + let + oneWrite i' = flip tailRecM i' \i -> do + case Array.index bs i of + Nothing -> do + complete (Right unit) + pure (Done unit) + Just b -> do + -- “write … calls the supplied callback once the data has been fully handled. + -- If an error occurs, the callback will be called with the error + -- as its first argument. The callback is called asynchronously and + -- before 'error' is emitted.” + nobackpressure <- Stream.write' w b $ case _ of + Nothing -> do + pure unit + Just err -> do + complete (Left err) + + if nobackpressure then do + pure (Loop (i + 1)) + else do + removeDrain' <- w # once drainH (oneWrite (i + 1)) + Ref.write removeDrain' removeDrain + pure (Done unit) + oneWrite 0 + + -- canceller might be called while waiting for `onceDrain` + pure $ effectCanceler do + join $ Ref.read removeDrain + +-- | Signal that no more data will be written to the `Writable`. Will complete +-- | after all data is written and flushed. +-- | +-- | When the `Writable` is an [__fs.WriteStream__](https://nodejs.org/api/fs.html#class-fswritestream) +-- | then this will close the file descriptor because +-- | +-- | > “If `autoClose` is set to true (default behavior) on `'error'` +-- | > or `'finish'` the file descriptor will be closed automatically.” +end + :: forall m w + . MonadAff m + => Writable w + -> m Unit +end w = liftAff <<< makeAff $ \complete -> do + Stream.end' w $ case _ of + Nothing -> complete (Right unit) + Just err -> complete (Left err) + pure nonCanceler + +-- | Concatenate an `Array` of UTF-8 encoded `Buffer`s into a `String`. +-- | +-- | Example: +-- | +-- | ``` +-- | inputstring <- toStringUTF8 =<< readAll stream +-- | ``` +toStringUTF8 :: forall m. MonadEffect m => Array Buffer -> m String +toStringUTF8 bs = liftEffect $ Buffer.toString Encoding.UTF8 =<< Buffer.concat bs + +-- | Encode a `String` as an `Array` containing one UTF-8 encoded `Buffer`. +-- | +-- | Example: +-- | +-- | ``` +-- | write stream =<< fromStringUTF8 "outputstring" +-- | ``` +fromStringUTF8 :: forall m. MonadEffect m => String -> m (Array Buffer) +fromStringUTF8 s = liftEffect $ map pure $ Buffer.fromString s Encoding.UTF8 diff --git a/test/Main1.js b/test/Main1.js new file mode 100644 index 0000000..4895ad6 --- /dev/null +++ b/test/Main1.js @@ -0,0 +1,4 @@ +import fs from "node:fs"; + +export const createReadStream = (filePath) => () => fs.createReadStream(filePath); +export const createWriteStream = (filePath) => () => fs.createWriteStream(filePath); diff --git a/test/Main1.purs b/test/Main1.purs new file mode 100644 index 0000000..f0d38da --- /dev/null +++ b/test/Main1.purs @@ -0,0 +1,163 @@ +-- | How to test: +-- | +-- | ``` +-- | pulp test --main Test.Main1 +-- | ``` +-- | +-- | We want to read from a file, not stdin, because stdin has no EOF. +module Test.Main1 where + +import Prelude + +import Control.Parallel (parSequence_) +import Data.Array ((..)) +import Data.Array as Array +import Data.Foldable (for_) +import Data.Maybe (Maybe(..)) +import Effect (Effect) +import Effect.Aff (Aff, Milliseconds(..), launchAff_) +import Effect.Class (liftEffect) +import Node.Buffer (Buffer, concat) +import Node.Buffer as Buffer +import Node.Stream (Readable, Writable, destroy, newPassThrough) +import Node.Stream as Stream +import Node.Stream.Aff (end, fromStringUTF8, readAll, readN, readSome, toStringUTF8, write) +import Partial.Unsafe (unsafePartial) +import Test.Spec (describe, it) +import Test.Spec.Assertions (expectError, shouldEqual) +import Test.Spec.Reporter (consoleReporter) +import Test.Spec.Runner (defaultConfig, runSpec') + +foreign import createReadStream :: String -> Effect (Readable ()) +foreign import createWriteStream :: String -> Effect (Writable ()) + +main :: Effect Unit +main = unsafePartial $ do + launchAff_ do + runSpec' (defaultConfig { timeout = Just (Milliseconds 40000.0) }) [ consoleReporter ] do + describe "Node.Stream.Aff" do + it "PassThrough" do + s <- liftEffect $ newPassThrough + _ <- write s =<< fromStringUTF8 "test" + end s + b1 <- toStringUTF8 =<< readAll s + shouldEqual b1 "test" + it "overflow PassThrough" do + s <- liftEffect $ newPassThrough + let magnitude = 10000 + [ outstring ] <- fromStringUTF8 "aaaaaaaaaa" + parSequence_ + [ write s $ Array.replicate magnitude outstring + , void $ readSome s + ] + it "reads from a zero-length Readable" do + r <- liftEffect $ Stream.fromString "" + -- readSome should return readagain false + shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome r + shouldEqual "" =<< toStringUTF8 =<< readAll r + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN r 0 + it "readN cleans up event handlers" do + s <- liftEffect $ Stream.fromString "" + for_ (0 .. 100) \_ -> void $ readN s 0 + it "readSome cleans up event handlers" do + s <- liftEffect $ Stream.fromString "" + for_ (0 .. 100) \_ -> void $ readSome s + it "readAll cleans up event handlers" do + s <- liftEffect $ Stream.fromString "" + for_ (0 .. 100) \_ -> void $ readAll s + it "write cleans up event handlers" do + s <- liftEffect $ newPassThrough + [ b ] <- liftEffect $ fromStringUTF8 "x" + for_ (0 .. 100) \_ -> void $ write s [ b ] + it "readSome from PassThrough" do + s <- liftEffect $ newPassThrough + write s =<< fromStringUTF8 "test" + end s + -- The first readSome readagain will be true, that's not good + shouldEqual { buffers: "test", readagain: true } =<< toStringBuffers =<< readSome s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + it "readSome from PassThrough concurrent" do + s <- liftEffect $ newPassThrough + parSequence_ + [ do + shouldEqual { buffers: "test", readagain: true } =<< toStringBuffers =<< readSome s + -- This is rediculous behavior + shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + , do + write s =<< fromStringUTF8 "test" + end s + ] + it "readAll from PassThrough concurrent" do + s <- liftEffect $ newPassThrough + parSequence_ + [ do + shouldEqual "test" =<< toStringUTF8 =<< readAll s + , do + write s =<< fromStringUTF8 "test" + end s + ] + it "readAll from empty PassThrough concurrent" do + s <- liftEffect $ newPassThrough + parSequence_ + [ shouldEqual "" =<< toStringUTF8 =<< readAll s + , end s + ] + it "readSome from destroyed PassThrough" do + s <- liftEffect $ newPassThrough + liftEffect $ destroy s + shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + it "readSome from destroyed PassThrough concurrent" do + s <- liftEffect $ newPassThrough + parSequence_ + [ shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readSome s + , liftEffect $ destroy s + ] + it "readAll from destroyed PassThrough concurrent " do + s <- liftEffect $ newPassThrough + parSequence_ + [ shouldEqual "" =<< toStringUTF8 =<< readAll s + , liftEffect $ destroy s + ] + it "readN from destroyed PassThrough concurrent " do + s <- liftEffect $ newPassThrough + parSequence_ + [ shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN s 1 + , liftEffect $ destroy s + ] + it "write to destroyed PassThrough" do + s <- liftEffect $ newPassThrough + liftEffect $ destroy s + expectError $ write s =<< fromStringUTF8 "test" + it "writes and reads to file" do + let outfilename = "/tmp/test1.txt" + let magnitude = 100000 + outfile <- liftEffect $ createWriteStream outfilename + [ outstring ] <- fromStringUTF8 "aaaaaaaaaa" + write outfile $ Array.replicate magnitude outstring + infile <- liftEffect $ createReadStream outfilename + { buffers: input1 } <- readSome infile + { buffers: input2 } <- readN infile (5 * magnitude) + input3 <- readAll infile + _ :: Buffer <- liftEffect <<< concat <<< _.buffers =<< readSome infile + void $ readN infile 1 + void $ readAll infile + let inputs = input1 <> input2 <> input3 + input :: Buffer <- liftEffect $ concat inputs + inputSize <- liftEffect $ Buffer.size input + shouldEqual inputSize (10 * magnitude) + it "writes and closes file" do + let outfilename = "/tmp/test2.txt" + outfile <- liftEffect $ createWriteStream outfilename + write outfile =<< fromStringUTF8 "test" + end outfile + expectError $ write outfile =<< fromStringUTF8 "test2" + + pure unit + +toStringBuffers + :: { buffers :: Array Buffer, readagain :: Boolean } + -> Aff { buffers :: String, readagain :: Boolean } +toStringBuffers { buffers, readagain } = do + buffers' <- toStringUTF8 buffers + pure { buffers: buffers', readagain } diff --git a/test/Main2.js b/test/Main2.js new file mode 100644 index 0000000..20bf893 --- /dev/null +++ b/test/Main2.js @@ -0,0 +1,3 @@ +import process from "node:process"; + +export const stdout = process.stdout; diff --git a/test/Main2.purs b/test/Main2.purs new file mode 100644 index 0000000..4505aed --- /dev/null +++ b/test/Main2.purs @@ -0,0 +1,36 @@ +-- | How to test: +-- | +-- | ``` +-- | pulp test --main Test.Main2 | wc -c +-- | ``` +module Test.Main2 where + +import Prelude + +import Data.Array as Array +import Data.Either (Either(..)) +import Effect (Effect) +import Effect.Aff (Error, runAff_) +import Effect.Class (liftEffect) +import Effect.Class.Console as Console +import Node.Buffer as Buffer +import Node.Encoding (Encoding(..)) +import Node.Stream (Writable) +import Node.Stream.Aff (write) +import Partial.Unsafe (unsafePartial) +import Unsafe.Coerce (unsafeCoerce) + +foreign import stdout :: Writable () + +completion :: Either Error (Effect Unit) -> Effect Unit +completion = case _ of + Left e -> Console.error (unsafeCoerce e) + Right f -> f + +main :: Effect Unit +main = unsafePartial $ do + runAff_ completion do + do + b <- liftEffect $ Buffer.fromString "aaaaaaaaaa" UTF8 + write stdout $ Array.replicate 100000 b + pure (pure unit) diff --git a/test/Main3.js b/test/Main3.js new file mode 100644 index 0000000..b9a8b12 --- /dev/null +++ b/test/Main3.js @@ -0,0 +1,5 @@ +import fs from "node:fs"; +import process from "node:process"; + +export const createReadStream = (filePath) => () => fs.createReadStream(filePath); +export const argv = () => process.argv; diff --git a/test/Main3.purs b/test/Main3.purs new file mode 100644 index 0000000..5aa8d6b --- /dev/null +++ b/test/Main3.purs @@ -0,0 +1,58 @@ +-- | How to test: +-- | +-- | ``` +-- | pulp test --main Test.Main3 -- <(head --bytes 1000000 /dev/zero) +-- | ``` +module Test.Main3 where + +import Prelude + +import Data.Array as Array +import Data.Either (Either(..)) +import Effect (Effect) +import Effect.Aff (Error, runAff_) +import Effect.Class (liftEffect) +import Effect.Class.Console as Console +import Node.Buffer (Buffer, concat) +import Node.Buffer as Buffer +import Node.Stream (Readable) +import Node.Stream.Aff (readAll, readN, readSome) +import Partial.Unsafe (unsafePartial) +import Test.Spec (describe, it) +import Test.Spec.Assertions (shouldEqual) +import Test.Spec.Reporter (consoleReporter) +import Test.Spec.Runner (runSpec) +import Unsafe.Coerce (unsafeCoerce) + +foreign import createReadStream :: String -> Effect (Readable ()) +foreign import argv :: Effect (Array String) + +completion :: Either Error (Effect Unit) -> Effect Unit +completion = case _ of + Left e -> Console.error (unsafeCoerce e) + Right f -> f + +main :: Effect Unit +main = unsafePartial $ do + runAff_ completion do + runSpec [ consoleReporter ] do + describe "Node.Stream.Aff" do + it "reads 1" do + infile <- liftEffect $ createReadStream =<< pure <<< flip Array.unsafeIndex 2 =<< argv + { buffers: inputs1 } <- readN infile 500000 + bytesRead1 :: Int <- liftEffect $ Array.foldM (\a b -> (a + _) <$> Buffer.size b) 0 inputs1 + shouldEqual 500000 bytesRead1 + { buffers: inputs2 } <- readSome infile + inputs3 <- readAll infile + let inputs = inputs1 <> inputs2 <> inputs3 + -- TODO read after EOF will hang + -- inputs4 <- readAll infile + -- inputs4 <- readSome infile + -- inputs4 <- readN infile 10 + -- let inputs = inputs1 <> inputs2 <> inputs3 <> inputs4 + bytesRead :: Int <- liftEffect $ Array.foldM (\a b -> (a + _) <$> Buffer.size b) 0 inputs + shouldEqual 1000000 bytesRead + input :: Buffer <- liftEffect $ concat inputs + inputSize <- liftEffect $ Buffer.size input + shouldEqual 1000000 inputSize + pure (pure unit) diff --git a/test/Main4.js b/test/Main4.js new file mode 100644 index 0000000..415152e --- /dev/null +++ b/test/Main4.js @@ -0,0 +1,3 @@ +import process from "node:process"; + +export const stdin = process.stdin; diff --git a/test/Main4.purs b/test/Main4.purs new file mode 100644 index 0000000..2c702a1 --- /dev/null +++ b/test/Main4.purs @@ -0,0 +1,44 @@ +-- | How to test: +-- | +-- | ``` +-- | pulp test --main Test.Main4 +-- | ``` +-- | +-- | This is a test that `readSome` will prevent the Node.js event +-- | loop from exiting. +module Test.Main4 where + +import Prelude + +import Control.Alt (alt) +import Data.Either (Either(..)) +import Effect (Effect) +import Effect.Aff (Error, Milliseconds(..), delay, parallel, runAff_, sequential) +import Effect.Class.Console as Console +import Node.Stream (Readable) +import Node.Stream.Aff (readSome) +import Partial.Unsafe (unsafePartial) +import Test.Spec (describe, it) +import Test.Spec.Reporter (consoleReporter) +import Test.Spec.Runner (runSpec) +import Unsafe.Coerce (unsafeCoerce) + +foreign import stdin :: Readable () + +completion :: Either Error (Effect Unit) -> Effect Unit +completion = case _ of + Left e -> Console.error (unsafeCoerce e) + Right f -> f + +main :: Effect Unit +main = unsafePartial $ do + runAff_ completion do + runSpec [ consoleReporter ] do + describe "Node.Stream.Aff" do + it "reads 1" do + sequential $ alt + do + parallel $ void $ readSome stdin + do + parallel $ delay (Milliseconds 500.0) + pure (pure unit)