Skip to content
This repository was archived by the owner on Oct 30, 2024. It is now read-only.

Commit 5b5fb56

Browse files
authored
Merge pull request repejota#97 from repejota/feature/encoded_connections
WIP: Feature/encoded connections
2 parents 310d831 + 5d74953 commit 5b5fb56

13 files changed

Lines changed: 337 additions & 34 deletions

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ services:
33
- docker
44
language: php
55
php:
6-
- 5.5
76
- 5.6
87
- 7
98
- 7.1

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ A PHP client for the [NATS messaging system](https://nats.io).
2121
Requirements
2222
------------
2323

24-
* php 5.5+
24+
* php 5.6+
2525
* [nats](https://github.com/derekcollison/nats) or [gnatsd](https://github.com/apcera/gnatsd)
2626

2727

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"ircmaxell/random-lib": "^1.1"
99
},
1010
"require-dev": {
11-
"phpunit/phpunit": "4.7.*",
11+
"phpunit/phpunit": "5.*",
1212
"satooshi/php-coveralls": "dev-master",
1313
"squizlabs/php_codesniffer": "~2.0",
1414
"phpspec/phpspec": "~2.0"

examples/pubsub/jsonencodedpub.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
require_once __DIR__.'/../../vendor/autoload.php';
3+
4+
$encoder = new \Nats\Encoders\JSONEncoder();
5+
$connectionOptions = new \Nats\ConnectionOptions();
6+
7+
$connectionOptions->setHost('localhost')->setPort(4222);
8+
$c = new Nats\EncodedConnection($connectionOptions, $encoder);
9+
$c->connect();
10+
11+
$c->reconnect();
12+
13+
$c->publish('foo', 'bar');
14+
$c->publish('foo', 'bar');
15+
$c->publish('foo', 'bar');
16+
$c->publish('foo', 'bar');
17+
$c->publish('foo', 'bar');

examples/pubsub/jsonencodedsub.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
require_once __DIR__.'/../../vendor/autoload.php';
3+
4+
$encoder = new \Nats\Encoders\JSONEncoder();
5+
$connectionOptions = new \Nats\ConnectionOptions();
6+
7+
$connectionOptions->setHost('localhost')->setPort(4222);
8+
$c = new Nats\EncodedConnection($connectionOptions, $encoder);
9+
$c->connect();
10+
11+
$callback = function ($payload) {
12+
printf("Data: %s\r\n", $payload);
13+
};
14+
15+
$sid = $c->subscribe('foo', $callback);
16+
17+
$c->wait(2);
18+
19+
$c->unsubscribe($sid);

spec/Nats/ConnectionOptionsSpec.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ function it_has_default_lang_value_as_php() {
3333
}
3434

3535
function it_has_default_version_value_as_null() {
36-
$this->getVersion()->shouldEqual("0.8.0");
36+
$this->getVersion()->shouldEqual("0.8.2");
3737
}
3838

3939
function it_has_default_verbose_value_as_null() {

src/Nats/Connection.php

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,23 @@
66

77
/**
88
* Connection Class.
9+
*
10+
* Handles the connection to a NATS server or cluster of servers.
911
*/
1012
class Connection
1113
{
1214

1315
/**
14-
* Number of PINGS.
16+
* Number of PINGs.
1517
*
16-
* @var int number of pings
18+
* @var integer number of pings.
1719
*/
1820
private $pings = 0;
1921

2022
/**
21-
* Chunk size in bytes to use when reading with fread.
23+
* Chunk size in bytes to use when reading an stream of data.
2224
*
23-
* @var integer
25+
* @var integer size of chunk.
2426
*/
2527
private $chunkSize = 1500;
2628

@@ -313,23 +315,24 @@ public function ping()
313315
/**
314316
* Request does a request and executes a callback with the response.
315317
*
316-
* @param string $subject Message topic.
317-
* @param string $payload Message data.
318-
* @param mixed $callback Closure to be executed as callback.
319-
* @param integer $wait Number of messages to wait for.
318+
* @param string $subject Message topic.
319+
* @param string $payload Message data.
320320
*
321321
* @return void
322322
*/
323-
public function request($subject, $payload, $callback, $wait = 1)
323+
public function request($subject, $payload)
324324
{
325325
$inbox = uniqid('_INBOX.');
326-
$this->subscribe($inbox, $callback);
327-
326+
$this->subscribe(
327+
$inbox,
328+
function ($message) {
329+
}
330+
);
328331
$msg = 'PUB '.$subject.' '.$inbox.' '.strlen($payload);
329332
$this->send($msg."\r\n".$payload);
330333
$this->pubs += 1;
331334

332-
$this->wait($wait);
335+
$this->wait(1);
333336
}
334337

335338

@@ -363,7 +366,6 @@ public function subscribe($subject, \Closure $callback)
363366
$msg = 'SUB '.$subject.' '.$sid;
364367
$this->send($msg);
365368
$this->subscriptions[$sid] = $callback;
366-
367369
return $sid;
368370
}
369371

@@ -383,7 +385,6 @@ public function queueSubscribe($subject, $queue, \Closure $callback)
383385
$msg = 'SUB '.$subject.' '.$queue.' '.$sid;
384386
$this->send($msg);
385387
$this->subscriptions[$sid] = $callback;
386-
387388
return $sid;
388389
}
389390

@@ -399,7 +400,6 @@ public function unsubscribe($sid)
399400
{
400401
$msg = 'UNSUB '.$sid;
401402
$this->send($msg);
402-
403403
unset($this->subscriptions[$sid]);
404404
}
405405

@@ -420,9 +420,8 @@ private function handlePING()
420420
*
421421
* @param string $line Message command from Nats.
422422
*
423-
* @return void
424-
* @throws Exception If subscription not found.
425-
* @codeCoverageIgnore
423+
* @return void
424+
* @throws Exception If subscription not found.
426425
*/
427426
private function handleMSG($line)
428427
{
@@ -452,8 +451,6 @@ private function handleMSG($line)
452451
} else {
453452
throw Exception::forSubscriptionCallbackInvalid($sid);
454453
}
455-
456-
return;
457454
}
458455

459456

src/Nats/EncodedConnection.php

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?php
2+
namespace Nats;
3+
4+
/**
5+
* Class EncodedConnection
6+
*
7+
* @package Nats
8+
*/
9+
class EncodedConnection extends Connection
10+
{
11+
12+
/**
13+
* Encoder for this connection.
14+
*
15+
* @var \Nats\Encoders\Encoder|null
16+
*/
17+
private $encoder = null;
18+
19+
20+
/**
21+
* EncodedConnection constructor.
22+
*
23+
* @param ConnectionOptions $options Connection options object.
24+
* @param \Nats\Encoders\Encoder|null $encoder Encoder to use with the payload.
25+
*/
26+
public function __construct(ConnectionOptions $options = null, \Nats\Encoders\Encoder $encoder = null)
27+
{
28+
$this->encoder = $encoder;
29+
parent::__construct($options);
30+
}
31+
32+
/**
33+
* Request does a request and executes a callback with the response.
34+
*
35+
* @param string $subject Message topic.
36+
* @param string $payload Message data.
37+
*
38+
* @return void
39+
*/
40+
public function request($subject, $payload)
41+
{
42+
$payload = $this->encoder->encode($payload);
43+
parent::request($subject, $payload);
44+
}
45+
46+
/**
47+
* Publish publishes the data argument to the given subject.
48+
*
49+
* @param string $subject Message topic.
50+
* @param string $payload Message data.
51+
*
52+
* @return void
53+
*/
54+
public function publish($subject, $payload = null)
55+
{
56+
$payload = $this->encoder->encode($payload);
57+
parent::publish($subject, $payload);
58+
}
59+
60+
/**
61+
* Subscribes to an specific event given a subject.
62+
*
63+
* @param string $subject Message topic.
64+
* @param \Closure $callback Closure to be executed as callback.
65+
*
66+
* @return void
67+
*/
68+
public function subscribe($subject, \Closure $callback)
69+
{
70+
$c = function ($message) use ($callback) {
71+
$message->setBody($this->encoder->decode($message->getBody()));
72+
$callback($message);
73+
};
74+
parent::subscribe($subject, $c);
75+
}
76+
77+
/**
78+
* Subscribes to an specific event given a subject and a queue.
79+
*
80+
* @param string $subject Message topic.
81+
* @param string $queue Queue name.
82+
* @param \Closure $callback Closure to be executed as callback.
83+
*
84+
* @return void
85+
*/
86+
public function queueSubscribe($subject, $queue, \Closure $callback)
87+
{
88+
$c = function ($message) use ($callback) {
89+
$message->setBody($this->encoder->decode($message->getBody()));
90+
$callback($message);
91+
};
92+
parent::queueSubscribe($subject, $queue, $c);
93+
}
94+
}

src/Nats/Encoders/Encoder.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
namespace Nats\Encoders;
3+
4+
/**
5+
* Interface Encoder
6+
*
7+
* @package Nats\Encoders
8+
*/
9+
interface Encoder
10+
{
11+
12+
13+
/**
14+
* Encodes a message.
15+
*
16+
* @param string $payload Message to decode.
17+
*
18+
* @return mixed
19+
*/
20+
public function encode($payload);
21+
22+
/**
23+
* Decodes a message.
24+
*
25+
* @param string $payload Message to decode.
26+
*
27+
* @return mixed
28+
*/
29+
public function decode($payload);
30+
}

src/Nats/Encoders/JSONEncoder.php

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
namespace Nats\Encoders;
3+
4+
/**
5+
* Class JSONEncoder
6+
*
7+
* Encodes and decodes messages in JSON format.
8+
*
9+
* @package Nats
10+
*/
11+
class JSONEncoder implements Encoder
12+
{
13+
14+
15+
/**
16+
* Encodes a message to JSON.
17+
*
18+
* @param string $payload Message to decode.
19+
*
20+
* @return mixed
21+
*/
22+
public function encode($payload)
23+
{
24+
$payload = json_encode($payload);
25+
return $payload;
26+
}
27+
28+
/**
29+
* Decodes a message from JSON.
30+
*
31+
* @param string $payload Message to decode.
32+
*
33+
* @return mixed
34+
*/
35+
public function decode($payload)
36+
{
37+
$payload = json_decode($payload);
38+
return $payload;
39+
}
40+
}

0 commit comments

Comments
 (0)