Skip to content

Commit 684041e

Browse files
Tweak Readable util functions (#53)
* Add readable prefix to fromString/Buffer; encoding * Add util for converting Readable to String/Buffer
1 parent c6e0134 commit 684041e

File tree

5 files changed

+92
-18
lines changed

5 files changed

+92
-18
lines changed

CHANGELOG.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,15 @@ Breaking changes:
4242

4343
New features:
4444
- Added event handlers for `Writeable` streams (#49 by @JordanMartinez)
45-
- Added missing APIs (#51 by @JordanMartinez)
45+
- Added missing APIs (#51, #53 by @JordanMartinez)
4646

4747
- readable, readableEnded, readableFlowing, readableHighWaterMark, readableLength
4848
- pipe'
4949
- writeable, writeableEnded, writeableCorked, errored, writeableFinished, writeableHighWaterMark, writeableLength, writeableNeedDrain
5050
- closed, destroyed
5151
- allowHalfOpen
5252
- pipeline
53-
- fromString, fromBuffer
53+
- readableFromString, readableFromBuffer
5454
- newPassThrough
5555
- Integrated `node-streams-aff` into library (#52 by @JordanMartinez)
5656

@@ -68,6 +68,11 @@ New features:
6868
- fromStringUTF8
6969

7070
The only APIs from the library not added were `newReadable` and `push`.
71+
- Added convenience API for converting `Readable` to `String` or `Buffer` (#53 by @JordanMartinez)
72+
73+
- `readableToStringUtf8`
74+
- `readableToString`
75+
- `readableToBuffers`
7176

7277
Bugfixes:
7378
- Drop misleading comment for `setEncoding` (#51 by @JordanMartinez)

src/Node/Stream.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export const allowHalfOpenImpl = (d) => d.allowHalfOpen;
9090

9191
export const pipelineImpl = (src, transforms, dst, cb) => stream.pipeline([src, ...transforms, dst], cb);
9292

93-
export const readableFromStrImpl = (str) => stream.Readable.from(str, { objectMode: false });
93+
export const readableFromStrImpl = (str, encoding) => stream.Readable.from(str, { encoding, objectMode: false });
9494

9595
export const readableFromBufImpl = (buf) => stream.Readable.from(buf, { objectMode: false });
9696

src/Node/Stream.purs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ module Node.Stream
6464
, destroyed
6565
, allowHalfOpen
6666
, pipeline
67-
, fromString
68-
, fromBuffer
67+
, readableFromString
68+
, readableFromBuffer
6969
, newPassThrough
7070
) where
7171

@@ -493,13 +493,13 @@ pipeline src transforms dest cb = runEffectFn4 pipelineImpl src transforms dest
493493

494494
foreign import pipelineImpl :: forall w r. EffectFn4 (Readable w) (Array Duplex) (Writable r) ((Error -> Effect Unit)) (Unit)
495495

496-
fromString :: String -> Effect (Readable ())
497-
fromString str = runEffectFn1 readableFromStrImpl str
496+
readableFromString :: String -> Encoding -> Effect (Readable ())
497+
readableFromString str enc = runEffectFn2 readableFromStrImpl str (encodingToNode enc)
498498

499-
foreign import readableFromStrImpl :: EffectFn1 (String) (Readable ())
499+
foreign import readableFromStrImpl :: EffectFn2 (String) (String) (Readable ())
500500

501-
fromBuffer :: Buffer -> Effect (Readable ())
502-
fromBuffer buf = runEffectFn1 readableFromBufImpl buf
501+
readableFromBuffer :: Buffer -> Effect (Readable ())
502+
readableFromBuffer buf = runEffectFn1 readableFromBufImpl buf
503503

504504
foreign import readableFromBufImpl :: EffectFn1 (Buffer) (Readable ())
505505

src/Node/Stream/Aff.purs

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
-- |
1919
-- | #### Implementation
2020
-- |
21-
-- | The reading functions in this module all operate on a `Readable` stream
21+
-- | The `read*` functions (not to be confused with the `readable*` functions)
22+
-- | in this module all operate on a `Readable` stream
2223
-- | in
2324
-- | [“paused mode”](https://nodejs.org/docs/latest/api/stream.html#stream_two_reading_modes).
2425
-- |
@@ -80,7 +81,10 @@
8081
-- |
8182
-- | If a write fails then it will `throwError` in the `Aff`.
8283
module Node.Stream.Aff
83-
( readSome
84+
( readableToStringUtf8
85+
, readableToString
86+
, readableToBuffers
87+
, readSome
8488
, readAll
8589
, readN
8690
, write
@@ -105,11 +109,75 @@ import Effect.Exception (catchException)
105109
import Effect.Ref as Ref
106110
import Node.Buffer (Buffer)
107111
import Node.Buffer as Buffer
112+
import Node.Encoding (Encoding(..))
108113
import Node.Encoding as Encoding
109-
import Node.EventEmitter (once)
110-
import Node.Stream (Readable, Writable, closeH, drainH, endH, errorH, readable, readableH)
114+
import Node.EventEmitter (on, once)
115+
import Node.Stream (Readable, Writable, closeH, dataH, drainH, endH, errorH, readable, readableH)
111116
import Node.Stream as Stream
112117

118+
-- | Works on streams in "flowing" mode.
119+
-- | Reads all of the stream's contents into a buffer
120+
-- | and converts the result into a UTF8-encoded String.
121+
readableToStringUtf8
122+
:: forall m w
123+
. MonadAff m
124+
=> Readable w
125+
-> m String
126+
readableToStringUtf8 r = readableToString r UTF8
127+
128+
-- | Works on streams in "flowing" mode.
129+
-- | Reads all of the stream's contents into a buffer
130+
-- | and converts the result into a String using the provided encoding.
131+
readableToString
132+
:: forall m w
133+
. MonadAff m
134+
=> Readable w
135+
-> Encoding
136+
-> m String
137+
readableToString r enc = do
138+
bufs <- readableToBuffers r
139+
liftEffect $ Buffer.toString enc =<< Buffer.concat bufs
140+
141+
-- | Works on streams in "flowing" mode.
142+
-- | Reads all of the stream's buffered contents into an array.
143+
readableToBuffers
144+
:: forall m w
145+
. MonadAff m
146+
=> Readable w
147+
-> m (Array Buffer)
148+
readableToBuffers r = liftAff $ makeAff \complete -> do
149+
bufs <- liftST $ Array.ST.new
150+
dataRef <- Ref.new (mempty :: Effect Unit)
151+
let removeData = join $ Ref.read dataRef
152+
153+
removeError <- r # once errorH \err -> do
154+
removeData
155+
complete $ Left err
156+
157+
removeClose <- r # once closeH do
158+
-- Don't error, instead return whatever we've read.
159+
removeError
160+
removeData
161+
result <- liftST $ Array.ST.unsafeFreeze bufs
162+
complete $ Right result
163+
164+
removeEnd <- r # once endH do
165+
removeError
166+
removeClose
167+
removeData
168+
result <- liftST $ Array.ST.unsafeFreeze bufs
169+
complete $ Right result
170+
171+
rmData <- r # on dataH \buf ->
172+
void $ liftST $ Array.ST.push buf bufs
173+
174+
Ref.write rmData dataRef
175+
pure $ effectCanceler do
176+
removeError
177+
removeClose
178+
removeEnd
179+
removeData
180+
113181
-- | Works on streams in "paused" mode.
114182
-- | Wait until there is some data available from the stream, then read it.
115183
-- |

test/Main1.purs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import Effect.Aff (Aff, Milliseconds(..), launchAff_)
1919
import Effect.Class (liftEffect)
2020
import Node.Buffer (Buffer, concat)
2121
import Node.Buffer as Buffer
22+
import Node.Encoding (Encoding(..))
2223
import Node.Stream (Readable, Writable, destroy, newPassThrough)
2324
import Node.Stream as Stream
2425
import Node.Stream.Aff (end, fromStringUTF8, readAll, readN, readSome, toStringUTF8, write)
@@ -51,19 +52,19 @@ main = unsafePartial $ do
5152
, void $ readSome s
5253
]
5354
it "reads from a zero-length Readable" do
54-
r <- liftEffect $ Stream.fromString ""
55+
r <- liftEffect $ Stream.readableFromString "" UTF8
5556
-- readSome should return readagain false
5657
shouldEqual { buffers: "", readagain: true } =<< toStringBuffers =<< readSome r
5758
shouldEqual "" =<< toStringUTF8 =<< readAll r
5859
shouldEqual { buffers: "", readagain: false } =<< toStringBuffers =<< readN r 0
5960
it "readN cleans up event handlers" do
60-
s <- liftEffect $ Stream.fromString ""
61+
s <- liftEffect $ Stream.readableFromString "" UTF8
6162
for_ (0 .. 100) \_ -> void $ readN s 0
6263
it "readSome cleans up event handlers" do
63-
s <- liftEffect $ Stream.fromString ""
64+
s <- liftEffect $ Stream.readableFromString "" UTF8
6465
for_ (0 .. 100) \_ -> void $ readSome s
6566
it "readAll cleans up event handlers" do
66-
s <- liftEffect $ Stream.fromString ""
67+
s <- liftEffect $ Stream.readableFromString "" UTF8
6768
for_ (0 .. 100) \_ -> void $ readAll s
6869
it "write cleans up event handlers" do
6970
s <- liftEffect $ newPassThrough

0 commit comments

Comments
 (0)