Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bower.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
"purescript-partial": "^4.0.0"
},
"dependencies": {
"purescript-arrays": "^7.2.0",
"purescript-effect": "^4.0.0",
"purescript-either": "^6.0.0",
"purescript-exceptions": "^6.0.0",
"purescript-maybe": "^6.0.0",
"purescript-node-buffer": "^8.0.0",
"purescript-nullable": "^6.0.0",
"purescript-prelude": "^6.0.0"
"purescript-prelude": "^6.0.1",
"purescript-unsafe-coerce": "^6.0.0"
}
}
7 changes: 7 additions & 0 deletions src/Node/Stream.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import * as stream from "node:stream";
export { pipeline as pipelineImpl } from "node:stream";

const _undefined = undefined;
export { _undefined as undefined };

Expand Down Expand Up @@ -141,3 +144,7 @@ export function destroyWithError(strm) {
strm.destroy(e);
};
}

export function passThrough() {
return new stream.PassThrough();
}
44 changes: 32 additions & 12 deletions src/Node/Stream.purs
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,24 @@ module Node.Stream
, end
, destroy
, destroyWithError
, pipeline
, passThrough
) where

import Prelude

import Effect (Effect)
import Effect.Exception (throw, Error)
import Data.Array as Array
import Data.Either (Either(..))
import Data.Maybe (Maybe(..), fromMaybe)
import Node.Buffer (Buffer)
import Data.Nullable (Nullable)
import Data.Nullable as N
import Effect.Uncurried (EffectFn1, mkEffectFn1)
import Effect (Effect)
import Effect.Exception (throw, Error)
import Effect.Uncurried (EffectFn1, EffectFn2, mkEffectFn1, runEffectFn2)
import Node.Buffer (Buffer)
import Node.Buffer as Buffer
import Node.Encoding (Encoding)
import Unsafe.Coerce (unsafeCoerce)

-- | A stream.
-- |
Expand Down Expand Up @@ -95,21 +100,21 @@ onData r cb =
where
fromEither x =
case x of
Left _ ->
Left _ ->
throw "Stream encoding should not be set"
Right buf ->
pure buf

read
:: forall w
. Readable w
-> Maybe Int
-> Effect (Maybe Buffer)
-> Maybe Int
-> Effect (Maybe Buffer)
read r size = do
v <- readEither r size
case v of
Nothing -> pure Nothing
Just (Left _) -> throw "Stream encoding should not be set"
Nothing -> pure Nothing
Just (Left _) -> throw "Stream encoding should not be set"
Just (Right b) -> pure (Just b)

readString
Expand All @@ -121,9 +126,9 @@ readString
readString r size enc = do
v <- readEither r size
case v of
Nothing -> pure Nothing
Just (Left _) -> throw "Stream encoding should not be set"
Just (Right buf) -> Just <$> Buffer.toString enc buf
Nothing -> pure Nothing
Just (Left _) -> throw "Stream encoding should not be set"
Just (Right buf) -> Just <$> Buffer.toString enc buf

readEither
:: forall w
Expand Down Expand Up @@ -340,3 +345,18 @@ foreign import destroyWithError
. Stream r
-> Error
-> Effect Unit

foreign import pipelineImpl :: EffectFn2 (Array Duplex) (EffectFn1 (Nullable Error) Unit) Unit

pipeline :: forall srcR destR. Readable srcR -> Array Duplex -> Writable destR -> (Maybe Error -> Effect Unit) -> Effect Unit
pipeline src trans dest cb = runEffectFn2 pipelineImpl (src `consStream` trans `snocStream` dest) $ mkEffectFn1 \err ->
cb $ N.toMaybe err
where
consStream :: Readable srcR -> Array Duplex -> Array Duplex
consStream = unsafeCoerce Array.cons

snocStream :: Array Duplex -> Writable destR -> Array Duplex
snocStream = unsafeCoerce Array.snoc

-- | Create a PassThrough stream, which simply writes its input to its output
foreign import passThrough :: Effect Duplex
25 changes: 0 additions & 25 deletions test/Main.js
Original file line number Diff line number Diff line change
@@ -1,26 +1 @@
import { WritableStreamBuffer, ReadableStreamBuffer } from "stream-buffers";
import { PassThrough } from "stream";

export function writableStreamBuffer() {
return new WritableStreamBuffer;
}

export function getContentsAsString(w) {
return () => w.getContentsAsString("utf8");
}

export function readableStreamBuffer() {
return new ReadableStreamBuffer;
}

export function putImpl(str) {
return enc => r => () => {
r.put(str, enc);
};
}

export { createGzip, createGunzip } from "zlib";

export function passThrough() {
return new PassThrough;
}
151 changes: 77 additions & 74 deletions test/Main.purs
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,33 @@ import Data.Either (Either(..))
import Data.Maybe (Maybe(..), fromJust, isJust, isNothing)
import Effect (Effect)
import Effect.Console (log)
import Effect.Exception (error)
import Effect.Exception (error, throw)
import Effect.Exception as Exception
import Node.Buffer as Buffer
import Node.Encoding (Encoding(..))
import Node.Stream (Duplex, Readable, Writable, destroyWithError, end, onData, onDataEither, onDataString, onError, onReadable, pipe, read, readString, setDefaultEncoding, setEncoding, writeString)
import Node.Stream (Duplex, Readable, Writable, destroyWithError, end, onData, onDataEither, onDataString, onError, onReadable, passThrough, pipe, pipeline, read, readString, setDefaultEncoding, setEncoding, writeString)
import Partial.Unsafe (unsafePartial)
import Test.Assert (assert, assert')
import Unsafe.Coerce (unsafeCoerce)

assertEqual :: forall a. Show a => Eq a => a -> a -> Effect Unit
assertEqual x y =
assert' (show x <> " did not equal " <> show y) (x == y)


foreign import writableStreamBuffer :: Effect (Writable ())

foreign import getContentsAsString :: forall r. Writable r -> Effect String

foreign import readableStreamBuffer :: Effect (Readable ())

foreign import putImpl :: forall r. String -> String -> Readable r -> Effect Unit

put :: forall r. String -> Encoding -> Readable r -> Effect Unit
put str enc = putImpl str (show enc)

main :: Effect Boolean
main = do
log "setDefaultEncoding should not affect writing"
_ <- testSetDefaultEncoding
log "disabled - setDefaultEncoding should not affect writing"
-- _ <- testSetDefaultEncoding

log "setEncoding should not affect reading"
testSetEncoding

log "test pipe"
_ <- testPipe

log "test pipeline"
_ <- testPipeline

log "test write"
testWrite

Expand All @@ -58,49 +51,49 @@ testReads = do
testReadBuf

where
testReadString = do
sIn <- passThrough
v <- readString sIn Nothing UTF8
assert (isNothing v)

onReadable sIn do
str <- readString sIn Nothing UTF8
assert (isJust str)
assertEqual (unsafePartial (fromJust str)) testString
pure unit

writeString sIn UTF8 testString \_ -> do
pure unit

testReadBuf = do
sIn <- passThrough
v <- read sIn Nothing
assert (isNothing v)

onReadable sIn do
buf <- read sIn Nothing
assert (isJust buf)
_ <- assertEqual <$> (Buffer.toString UTF8 (unsafePartial (fromJust buf)))
<*> pure testString
pure unit

writeString sIn UTF8 testString \_ -> do
pure unit

testSetDefaultEncoding :: Effect Boolean
testSetDefaultEncoding = do
w1 <- writableStreamBuffer
_ <- check w1

w2 <- writableStreamBuffer
setDefaultEncoding w2 UCS2
check w2

where
check w = do
writeString w UTF8 testString \_ -> do
c <- getContentsAsString w
assertEqual testString c
testReadString = do
sIn <- passThrough
v <- readString sIn Nothing UTF8
assert (isNothing v)

onReadable sIn do
str <- readString sIn Nothing UTF8
assert (isJust str)
assertEqual (unsafePartial (fromJust str)) testString
pure unit

writeString sIn UTF8 testString \_ -> do
pure unit

testReadBuf = do
sIn <- passThrough
v <- read sIn Nothing
assert (isNothing v)

onReadable sIn do
buf <- read sIn Nothing
assert (isJust buf)
_ <- flip assertEqual testString <$> (Buffer.toString UTF8 (unsafePartial (fromJust buf)))
pure unit

writeString sIn UTF8 testString \_ -> do
pure unit

-- testSetDefaultEncoding :: Effect Boolean
-- testSetDefaultEncoding = do
-- w1 <- passThrough
-- _ <- check w1

-- w2 <- passThrough
-- setDefaultEncoding w2 UCS2
-- check w2

-- where
-- check w = do
-- writeString w UTF8 testString \_ -> do
-- c <- getContentsAsString w
-- assertEqual testString c
-- onData

testSetEncoding :: Effect Unit
testSetEncoding = do
Expand All @@ -109,11 +102,11 @@ testSetEncoding = do
check UCS2
where
check enc = do
r1 <- readableStreamBuffer
put testString enc r1
r1 <- passThrough
_ <- writeString r1 enc testString mempty

r2 <- readableStreamBuffer
put testString enc r2
r2 <- passThrough
_ <- writeString r2 enc testString mempty
setEncoding r2 enc

onData r1 \buf -> unsafePartial do
Expand All @@ -123,9 +116,9 @@ testSetEncoding = do

testPipe :: Effect Boolean
testPipe = do
sIn <- passThrough
sOut <- passThrough
zip <- createGzip
sIn <- passThrough
sOut <- passThrough
zip <- createGzip
unzip <- createGunzip

log "pipe 1"
Expand All @@ -140,28 +133,38 @@ testPipe = do
onDataString sOut UTF8 \str -> do
assertEqual str testString

testPipeline :: Effect Boolean
testPipeline = do
sIn <- passThrough
sOut <- passThrough
zip <- createGzip
unzip <- createGunzip

foreign import createGzip :: Effect Duplex
foreign import createGunzip :: Effect Duplex
pipeline sIn [ zip, unzip ] sOut mempty

onDataString sOut UTF8 \str ->
assertEqual str testString

-- | Create a PassThrough stream, which simply writes its input to its output.
foreign import passThrough :: Effect Duplex
writeString sIn UTF8 testString \_ -> do
end sIn mempty

foreign import createGzip :: Effect Duplex
foreign import createGunzip :: Effect Duplex

testWrite :: Effect Unit
testWrite = do
hasError
noError
where
hasError = do
w1 <- writableStreamBuffer
w1 <- passThrough
_ <- onError w1 (const $ pure unit)
void $ end w1 $ const $ pure unit
void $ writeString w1 UTF8 "msg" \err -> do
assert' "writeString - should have error" $ isJust err

noError = do
w1 <- writableStreamBuffer
w1 <- passThrough
void $ writeString w1 UTF8 "msg1" \err -> do
assert' "writeString - should have no error" $ isNothing err
void $ end w1 (const $ pure unit)
Expand All @@ -172,14 +175,14 @@ testEnd = do
noError
where
hasError = do
w1 <- writableStreamBuffer
w1 <- passThrough
_ <- onError w1 (const $ pure unit)
void $ writeString w1 UTF8 "msg" \_ -> do
_ <- destroyWithError w1 $ error "Problem"
end w1 \err -> do
assert' "end - should have error" $ isJust err

noError = do
w1 <- writableStreamBuffer
w1 <- passThrough
end w1 \err -> do
assert' "end - should have no error" $ isNothing err