Skip to content

Commit 148c979

Browse files
authored
Merge pull request #88 from clue-labs/through
ThroughStream is now a direct implementation of DuplexStreamInterface
2 parents 0d9a2c2 + bcbc0f5 commit 148c979

2 files changed

Lines changed: 99 additions & 46 deletions

File tree

src/ThroughStream.php

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,15 @@
22

33
namespace React\Stream;
44

5-
class ThroughStream extends CompositeStream
5+
use Evenement\EventEmitter;
6+
7+
class ThroughStream extends EventEmitter implements DuplexStreamInterface
68
{
9+
private $readable = true;
10+
private $writable = true;
11+
private $closed = false;
712
private $paused = false;
8-
9-
public function __construct()
10-
{
11-
$readable = new ReadableStream();
12-
$writable = new WritableStream();
13-
14-
parent::__construct($readable, $writable);
15-
}
13+
private $drain = false;
1614

1715
public function filter($data)
1816
{
@@ -21,39 +19,80 @@ public function filter($data)
2119

2220
public function pause()
2321
{
24-
parent::pause();
2522
$this->paused = true;
2623
}
2724

2825
public function resume()
2926
{
30-
parent::resume();
27+
if ($this->drain) {
28+
$this->drain = false;
29+
$this->emit('drain');
30+
}
3131
$this->paused = false;
3232
}
3333

34+
public function pipe(WritableStreamInterface $dest, array $options = array())
35+
{
36+
return Util::pipe($this, $dest, $options);
37+
}
38+
39+
public function isReadable()
40+
{
41+
return $this->readable;
42+
}
43+
44+
public function isWritable()
45+
{
46+
return $this->writable;
47+
}
48+
3449
public function write($data)
3550
{
36-
if (!$this->writable->isWritable()) {
51+
if (!$this->writable) {
3752
return false;
3853
}
3954

40-
$this->readable->emit('data', array($this->filter($data)));
55+
$this->emit('data', array($this->filter($data)));
56+
57+
if ($this->paused) {
58+
$this->drain = true;
59+
return false;
60+
}
4161

42-
return $this->writable->isWritable() && !$this->paused;
62+
return true;
4363
}
4464

4565
public function end($data = null)
4666
{
47-
if (!$this->writable->isWritable()) {
67+
if (!$this->writable) {
4868
return;
4969
}
5070

5171
if (null !== $data) {
52-
$this->readable->emit('data', array($this->filter($data)));
72+
$this->write($data);
5373
}
5474

55-
$this->readable->emit('end');
75+
$this->readable = false;
76+
$this->writable = false;
77+
$this->paused = true;
78+
$this->drain = false;
79+
80+
$this->emit('end');
81+
$this->close();
82+
}
83+
84+
public function close()
85+
{
86+
if ($this->closed) {
87+
return;
88+
}
89+
90+
$this->readable = false;
91+
$this->writable = false;
92+
$this->closed = true;
93+
$this->paused = true;
94+
$this->drain = false;
5695

57-
$this->writable->end();
96+
$this->emit('close');
5897
}
5998
}

tests/ThroughStreamTest.php

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,15 @@
1010
*/
1111
class ThroughStreamTest extends TestCase
1212
{
13+
/** @test */
14+
public function itShouldReturnTrueForAnyDataWrittenToIt()
15+
{
16+
$through = new ThroughStream();
17+
$ret = $through->write('foo');
18+
19+
$this->assertTrue($ret);
20+
}
21+
1322
/** @test */
1423
public function itShouldEmitAnyDataWrittenToIt()
1524
{
@@ -18,6 +27,39 @@ public function itShouldEmitAnyDataWrittenToIt()
1827
$through->write('foo');
1928
}
2029

30+
/** @test */
31+
public function itShouldReturnFalseForAnyDataWrittenToItWhenPaused()
32+
{
33+
$through = new ThroughStream();
34+
$through->pause();
35+
$ret = $through->write('foo');
36+
37+
$this->assertFalse($ret);
38+
}
39+
40+
/** @test */
41+
public function itShouldEmitDrainOnResumeAfterReturnFalseForAnyDataWrittenToItWhenPaused()
42+
{
43+
$through = new ThroughStream();
44+
$through->pause();
45+
$through->write('foo');
46+
47+
$through->on('drain', $this->expectCallableOnce());
48+
$through->resume();
49+
}
50+
51+
/** @test */
52+
public function itShouldReturnTrueForAnyDataWrittenToItWhenResumedAfterPause()
53+
{
54+
$through = new ThroughStream();
55+
$through->on('drain', $this->expectCallableNever());
56+
$through->pause();
57+
$through->resume();
58+
$ret = $through->write('foo');
59+
60+
$this->assertTrue($ret);
61+
}
62+
2163
/** @test */
2264
public function pipingStuffIntoItShouldWork()
2365
{
@@ -123,34 +165,6 @@ public function itShouldBeWritableByDefault()
123165
$this->assertTrue($through->isWritable());
124166
}
125167

126-
/** @test */
127-
public function pauseShouldDelegateToPipeSource()
128-
{
129-
$input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('pause'))->getMock();
130-
$input
131-
->expects($this->once())
132-
->method('pause');
133-
134-
$through = new ThroughStream();
135-
$input->pipe($through);
136-
137-
$through->pause();
138-
}
139-
140-
/** @test */
141-
public function resumeShouldDelegateToPipeSource()
142-
{
143-
$input = $this->getMockBuilder('React\Stream\ReadableStream')->setMethods(array('resume'))->getMock();
144-
$input
145-
->expects($this->once())
146-
->method('resume');
147-
148-
$through = new ThroughStream();
149-
$input->pipe($through);
150-
151-
$through->resume();
152-
}
153-
154168
/** @test */
155169
public function closeShouldCloseOnce()
156170
{

0 commit comments

Comments
 (0)