5151 * $through->write(array(2, true));
5252 * ```
5353 *
54+ * The callback function is allowed to throw an `Exception`. In this case,
55+ * the stream will emit an `error` event and then [`close()`](#close-1) the stream.
56+ *
57+ * ```php
58+ * $through = new ThroughStream(function ($data) {
59+ * if (!is_string($data)) {
60+ * throw new \UnexpectedValueException('Only strings allowed');
61+ * }
62+ * return $data;
63+ * });
64+ * $through->on('error', $this->expectCallableOnce()));
65+ * $through->on('close', $this->expectCallableOnce()));
66+ * $through->on('data', $this->expectCallableNever()));
67+ *
68+ * $through->write(2);
69+ * ```
70+ *
5471 * @see WritableStreamInterface::write()
5572 * @see WritableStreamInterface::end()
5673 * @see DuplexStreamInterface::close()
@@ -109,7 +126,18 @@ public function write($data)
109126 return false ;
110127 }
111128
112- $ this ->emit ('data ' , array ($ this ->filter ($ data )));
129+ if ($ this ->callback !== null ) {
130+ try {
131+ $ data = call_user_func ($ this ->callback , $ data );
132+ } catch (\Exception $ e ) {
133+ $ this ->emit ('error ' , array ($ e ));
134+ $ this ->close ();
135+
136+ return false ;
137+ }
138+ }
139+
140+ $ this ->emit ('data ' , array ($ data ));
113141
114142 if ($ this ->paused ) {
115143 $ this ->drain = true ;
@@ -127,6 +155,11 @@ public function end($data = null)
127155
128156 if (null !== $ data ) {
129157 $ this ->write ($ data );
158+
159+ // return if write() already caused the stream to close
160+ if (!$ this ->writable ) {
161+ return ;
162+ }
130163 }
131164
132165 $ this ->readable = false ;
@@ -153,13 +186,4 @@ public function close()
153186
154187 $ this ->emit ('close ' );
155188 }
156-
157- private function filter ($ data )
158- {
159- if ($ this ->callback !== null ) {
160- $ data = call_user_func ($ this ->callback , $ data );
161- }
162-
163- return $ data ;
164- }
165189}
0 commit comments