Skip to content

Commit ae230e5

Browse files
committed
CompositeStream forwards pause to pipe source on first write attempt
1 parent 64c88fa commit ae230e5

2 files changed

Lines changed: 0 additions & 78 deletions

File tree

src/CompositeStream.php

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ class CompositeStream extends EventEmitter implements DuplexStreamInterface
88
{
99
protected $readable;
1010
protected $writable;
11-
protected $pipeSource;
1211
protected $closed = false;
1312

1413
public function __construct(ReadableStreamInterface $readable, WritableStreamInterface $writable)
@@ -21,13 +20,6 @@ public function __construct(ReadableStreamInterface $readable, WritableStreamInt
2120

2221
$this->readable->on('close', array($this, 'close'));
2322
$this->writable->on('close', array($this, 'close'));
24-
25-
$this->on('pipe', array($this, 'handlePipeEvent'));
26-
}
27-
28-
public function handlePipeEvent($source)
29-
{
30-
$this->pipeSource = $source;
3123
}
3224

3325
public function isReadable()
@@ -37,10 +29,6 @@ public function isReadable()
3729

3830
public function pause()
3931
{
40-
if ($this->pipeSource) {
41-
$this->pipeSource->pause();
42-
}
43-
4432
$this->readable->pause();
4533
}
4634

@@ -50,10 +38,6 @@ public function resume()
5038
return;
5139
}
5240

53-
if ($this->pipeSource) {
54-
$this->pipeSource->resume();
55-
}
56-
5741
$this->readable->resume();
5842
}
5943

@@ -85,8 +69,6 @@ public function close()
8569
}
8670

8771
$this->closed = true;
88-
$this->pipeSource = null;
89-
9072
$this->readable->close();
9173
$this->writable->close();
9274

tests/CompositeStreamTest.php

Lines changed: 0 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -146,66 +146,6 @@ public function itShouldHandlePipingCorrectly()
146146
$input->emit('data', array('foo'));
147147
}
148148

149-
/** @test */
150-
public function itShouldForwardPauseUpstreamWhenPipedTo()
151-
{
152-
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
153-
$readable->expects($this->any())->method('isReadable')->willReturn(true);
154-
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
155-
$writable->expects($this->any())->method('isWritable')->willReturn(true);
156-
157-
$composite = new CompositeStream($readable, $writable);
158-
159-
$input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock();
160-
$input
161-
->expects($this->once())
162-
->method('pause');
163-
164-
$input->pipe($composite);
165-
$composite->pause();
166-
}
167-
168-
/** @test */
169-
public function itShouldForwardResumeUpstreamWhenPipedTo()
170-
{
171-
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
172-
$readable->expects($this->any())->method('isReadable')->willReturn(true);
173-
$writable = $this->getMockBuilder('React\Stream\WritableStreamInterface')->getMock();
174-
$writable->expects($this->any())->method('isWritable')->willReturn(true);
175-
176-
$composite = new CompositeStream($readable, $writable);
177-
178-
$input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock();
179-
$input
180-
->expects($this->once())
181-
->method('resume');
182-
183-
$input->pipe($composite);
184-
$composite->resume();
185-
}
186-
187-
/** @test */
188-
public function itShouldForwardPauseAndResumeUpstreamWhenPipedTo()
189-
{
190-
$readable = $this->getMockBuilder('React\Stream\ReadableStreamInterface')->getMock();
191-
$writable = new ThroughStream();
192-
$writable->pause();
193-
194-
$composite = new CompositeStream($readable, $writable);
195-
196-
$input = $this->getMockBuilder('React\Stream\ThroughStream')->setMethods(array('pause', 'resume'))->getMock();
197-
$input
198-
->expects($this->once())
199-
->method('pause');
200-
$input
201-
->expects($this->once())
202-
->method('resume');
203-
204-
$input->pipe($composite);
205-
$input->emit('data', array('foo'));
206-
$writable->emit('drain');
207-
}
208-
209149
/** @test */
210150
public function itShouldForwardPipeCallsToReadableStream()
211151
{

0 commit comments

Comments
 (0)