Skip to content
This repository was archived by the owner on Oct 30, 2024. It is now read-only.

Commit b7fa9b9

Browse files
committed
Merge branch 'release/0.4.0'
2 parents 740e19e + dcecd9f commit b7fa9b9

3 files changed

Lines changed: 90 additions & 20 deletions

File tree

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.3.0
1+
0.4.0

src/Connection.php

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,15 @@ private function send($payload)
137137
private function receive($len = null)
138138
{
139139
if ($len) {
140-
return trim(fgets($this->streamSocket, $len + 1));
140+
$line = fgets($this->streamSocket, $len + 1);
141141
} else {
142-
return trim(fgets($this->streamSocket));
142+
$line = fgets($this->streamSocket);
143+
}
144+
145+
if ($line === false) {
146+
return $line;
147+
} else {
148+
return trim($line);
143149
}
144150
}
145151

@@ -242,12 +248,12 @@ public function publish($subject, $payload)
242248
/**
243249
* Subscribes to an specific event given a subject.
244250
*
245-
* @param string $subject Message topic.
246-
* @param mixed $callback Closure to be executed as callback.
251+
* @param string $subject Message topic.
252+
* @param \Closure $callback Closure to be executed as callback.
247253
*
248254
* @return string
249255
*/
250-
public function subscribe($subject, $callback)
256+
public function subscribe($subject, \Closure $callback)
251257
{
252258
$sid = uniqid();
253259
$msg = 'SUB '.$subject.' '.$sid;
@@ -327,13 +333,14 @@ public function wait($quantity = 0)
327333
$count = 0;
328334
while (!feof($this->streamSocket)) {
329335
$line = $this->receive();
336+
if ($line === false) {
337+
return null;
338+
}
330339

331-
// PING
332340
if (strpos($line, 'PING') === 0) {
333341
$this->handlePING();
334342
}
335343

336-
// MSG
337344
if (strpos($line, 'MSG') === 0) {
338345
$count = $count + 1;
339346
$this->handleMSG($line);
@@ -347,6 +354,26 @@ public function wait($quantity = 0)
347354
return $this;
348355
}
349356

357+
/**
358+
* Set Stream Timeout.
359+
*
360+
* @param integer $seconds Before timeout on stream.
361+
*
362+
* @return boolean
363+
*/
364+
public function setStreamTimeout($seconds)
365+
{
366+
if ($this->isConnected()) {
367+
try {
368+
return stream_set_timeout($this->streamSocket, $seconds);
369+
} catch (\Exception $e) {
370+
return false;
371+
}
372+
}
373+
374+
return false;
375+
}
376+
350377
/**
351378
* Reconnects to the server.
352379
*

tests/Unit/ConnectionTest.php

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,28 @@
1111
class ConnectionTest extends \PHPUnit_Framework_TestCase
1212
{
1313
/**
14+
* Client.
15+
*
1416
* @var resource Client
1517
*/
1618
private $c;
1719

1820
/**
19-
* @var resource A separated process
21+
* Process.
22+
*
23+
* @var resource A separated process.
2024
*/
2125
private static $process;
2226

2327
/**
28+
* Gnatsd switch.
29+
*
2430
* @var bool Am I using a real or a fake server?
2531
*/
2632
private static $isGnatsd = false;
2733

2834
/**
29-
* Before Class code setup
35+
* Before Class code setup.
3036
*
3137
* @return void
3238
*/
@@ -41,7 +47,7 @@ public static function setUpBeforeClass()
4147
}
4248

4349
/**
44-
* After Class code setup
50+
* After Class code setup.
4551
*
4652
* @return void
4753
*/
@@ -53,7 +59,7 @@ public static function tearDownAfterClass()
5359
}
5460

5561
/**
56-
* setUp test suite
62+
* SetUp test suite.
5763
*
5864
* @return void
5965
*/
@@ -155,19 +161,56 @@ public function testSubscription()
155161
}
156162

157163
/**
158-
* Test Request command
164+
* Test Request command.
159165
*
160166
* @return void
161167
*/
162168
public function testRequest()
163169
{
164-
$this->c->subscribe("sayhello", function ($res) {
165-
$res->reply("Hello, ".$res->getBody(). " !!!");
166-
});
170+
$this->c->subscribe(
171+
"sayhello",
172+
function ($res) {
173+
$res->reply("Hello, ".$res->getBody(). " !!!");
174+
}
175+
);
176+
177+
$this->c->request(
178+
'sayhello',
179+
'McFly',
180+
function ($message) {
181+
$this->assertNotNull($message);
182+
$this->assertEquals($message, 'Hello, McFly !!!');
183+
}
184+
);
185+
}
167186

168-
$this->c->request('sayhello', 'McFly', function ($message) {
169-
$this->assertNotNull($message);
170-
$this->assertEquals($message, 'Hello, McFly !!!');
171-
});
187+
/**
188+
* Test Unsubscribe command.
189+
*
190+
* @return void
191+
*/
192+
public function testUnsubscribe()
193+
{
194+
$sid = $this->c->subscribe(
195+
"unsub",
196+
function ($res) {
197+
$this->assertTrue(false);
198+
}
199+
);
200+
$this->c->unsubscribe($sid);
201+
$this->c->publish('unsub', 'bar');
202+
203+
$this->assertTrue(true);
204+
}
205+
206+
/**
207+
* Test setStreamTimeout command.
208+
*
209+
* @return void
210+
*/
211+
public function testSetStreamTimeout()
212+
{
213+
$this->assertTrue($this->c->setStreamTimeout(2));
214+
$this->assertFalse($this->c->setStreamTimeout("hello"));
172215
}
173216
}

0 commit comments

Comments
 (0)