33namespace React \Stream ;
44
55use Evenement \EventEmitter ;
6-
6+ use InvalidArgumentException ;
7+
8+ /**
9+ * The `ThroughStream` implements the
10+ * [`DuplexStreamInterface`](#duplexstreaminterface) and will simply pass any data
11+ * you write to it through to its readable end.
12+ *
13+ * ```php
14+ * $through = new ThroughStream();
15+ * $through->on('data', $this->expectCallableOnceWith('hello'));
16+ *
17+ * $through->write('hello');
18+ * ```
19+ *
20+ * Similarly, the [`end()` method](#end) will end the stream and emit an
21+ * [`end` event](#end-event) and then [`close()`](#close-1) the stream.
22+ * The [`close()` method](#close-1) will close the stream and emit a
23+ * [`close` event](#close-event).
24+ * Accordingly, this is can also be used in a [`pipe()`](#pipe) context like this:
25+ *
26+ * ```php
27+ * $through = new ThroughStream();
28+ * $source->pipe($through)->pipe($dest);
29+ * ```
30+ *
31+ * Optionally, its constructor accepts any callable function which will then be
32+ * used to *filter* any data written to it. This function receives a single data
33+ * argument as passed to the writable side and must return the data as it will be
34+ * passed to its readable end:
35+ *
36+ * ```php
37+ * $through = new ThroughStream('strtoupper');
38+ * $source->pipe($through)->pipe($dest);
39+ * ```
40+ *
41+ * Note that this class makes no assumptions about any data types. This can be
42+ * used to convert data, for example for transforming any structured data into
43+ * a newline-delimited JSON (NDJSON) stream like this:
44+ *
45+ * ```php
46+ * $through = new ThroughStream(function ($data) {
47+ * return json_encode($data) . PHP_EOL;
48+ * });
49+ * $through->on('data', $this->expectCallableOnceWith("[2, true]\n"));
50+ *
51+ * $through->write(array(2, true));
52+ * ```
53+ *
54+ * The callback function is allowed to throw an `Exception`. In this case,
55+ * the stream will emit an `error` event and then [`close()`](#close-1) the stream.
56+ *
57+ * ```php
58+ * $through = new ThroughStream(function ($data) {
59+ * if (!is_string($data)) {
60+ * throw new \UnexpectedValueException('Only strings allowed');
61+ * }
62+ * return $data;
63+ * });
64+ * $through->on('error', $this->expectCallableOnce()));
65+ * $through->on('close', $this->expectCallableOnce()));
66+ * $through->on('data', $this->expectCallableNever()));
67+ *
68+ * $through->write(2);
69+ * ```
70+ *
71+ * @see WritableStreamInterface::write()
72+ * @see WritableStreamInterface::end()
73+ * @see DuplexStreamInterface::close()
74+ * @see WritableStreamInterface::pipe()
75+ */
776class ThroughStream extends EventEmitter implements DuplexStreamInterface
877{
978 private $ readable = true ;
1079 private $ writable = true ;
1180 private $ closed = false ;
1281 private $ paused = false ;
1382 private $ drain = false ;
83+ private $ callback ;
1484
15- public function filter ( $ data )
85+ public function __construct ( $ callback = null )
1686 {
17- return $ data ;
87+ if ($ callback !== null && !is_callable ($ callback )) {
88+ throw new InvalidArgumentException ('Invalid transformation callback given ' );
89+ }
90+
91+ $ this ->callback = $ callback ;
1892 }
1993
2094 public function pause ()
@@ -52,7 +126,18 @@ public function write($data)
52126 return false ;
53127 }
54128
55- $ this ->emit ('data ' , array ($ this ->filter ($ data )));
129+ if ($ this ->callback !== null ) {
130+ try {
131+ $ data = call_user_func ($ this ->callback , $ data );
132+ } catch (\Exception $ e ) {
133+ $ this ->emit ('error ' , array ($ e ));
134+ $ this ->close ();
135+
136+ return false ;
137+ }
138+ }
139+
140+ $ this ->emit ('data ' , array ($ data ));
56141
57142 if ($ this ->paused ) {
58143 $ this ->drain = true ;
@@ -70,6 +155,11 @@ public function end($data = null)
70155
71156 if (null !== $ data ) {
72157 $ this ->write ($ data );
158+
159+ // return if write() already caused the stream to close
160+ if (!$ this ->writable ) {
161+ return ;
162+ }
73163 }
74164
75165 $ this ->readable = false ;
@@ -92,6 +182,7 @@ public function close()
92182 $ this ->closed = true ;
93183 $ this ->paused = true ;
94184 $ this ->drain = false ;
185+ $ this ->callback = null ;
95186
96187 $ this ->emit ('close ' );
97188 }
0 commit comments