1414
1515/**
1616 * Connection Class
17- *
17+ *
1818 * @category Class
1919 * @package Nats\Tests\Unit
2020 * @author Raül Përez <repejota@gmail.com>
2323 */
2424class Connection
2525{
26+ /**
27+ * Version number
28+ */
29+ public $ VERSION = "0.0.0 " ;
30+
2631 /**
2732 * Number of PINGS
2833 *
@@ -120,7 +125,7 @@ public function getSubscriptions()
120125 *
121126 * @var mixed Socket file pointer
122127 */
123- private $ _fp ;
128+ private $ _streamSocket ;
124129
125130 /**
126131 * Server address
@@ -137,9 +142,12 @@ public function getSubscriptions()
137142 */
138143 public function __construct ($ host = "localhost " , $ port = 4222 )
139144 {
145+ $ this ->VERSION = file_get_contents ("./VERSION " );
146+
140147 $ this ->_pings = 0 ;
141148 $ this ->_pubs = 0 ;
142149 $ this ->_subscriptions = 0 ;
150+ $ this ->_subscriptions = [];
143151
144152 $ this ->_host = $ host ;
145153 $ this ->_port = $ port ;
@@ -155,9 +163,8 @@ public function __construct($host = "localhost", $port = 4222)
155163 */
156164 private function _send ($ payload )
157165 {
158-
159166 $ msg = $ payload . "\r\n" ;
160- fwrite ($ this ->_fp , $ msg , strlen ($ msg ));
167+ fwrite ($ this ->_streamSocket , $ msg , strlen ($ msg ));
161168 }
162169
163170 /**
@@ -170,17 +177,17 @@ private function _send($payload)
170177 private function _receive ($ len = null )
171178 {
172179 if ($ len ) {
173- return trim (fgets ($ this ->_fp , $ len + 1 ));
180+ return trim (fgets ($ this ->_streamSocket , $ len + 1 ));
174181 } else {
175- return trim (fgets ($ this ->_fp ));
182+ return trim (fgets ($ this ->_streamSocket ));
176183 }
177184 }
178185
179186 /**
180187 * Returns an stream socket to the desired server.
181188 *
182189 * @param string $address Server url string
183- *
190+ *
184191 * @return resource
185192 */
186193 private function _getStream ($ address )
@@ -193,6 +200,16 @@ private function _getStream($address)
193200 return $ fp ;
194201 }
195202
203+ /**
204+ * Checks if the client is connected to a server
205+ *
206+ * @return bool
207+ */
208+ public function isConnected ()
209+ {
210+ return isset ($ this ->_streamSocket );
211+ }
212+
196213 /**
197214 * Connect to server.
198215 *
@@ -206,12 +223,40 @@ private function _getStream($address)
206223 * Example:
207224 * nats://user:pass@localhost:4222
208225 *
226+ * @param null $host host name to connect
227+ * @param null $port host port to connect
228+ * @param bool $verbose if verbose mode is enabled
229+ * @param bool $pedantic if pedantic mode is enabled
230+ * @param bool $reconnect if reconnect mode is enabled
231+ *
209232 * @return void
210233 */
211- public function connect ()
212- {
213- $ this ->_fp = $ this ->_getStream ($ this ->_address );
214- $ msg = 'CONNECT {} ' ;
234+ public function connect ($ host = null ,
235+ $ port = null ,
236+ $ verbose = false ,
237+ $ pedantic = false ,
238+ $ reconnect = true
239+ ) {
240+ if (isset ($ host )) {
241+ $ this ->_host = $ host ;
242+ $ this ->_address = "tcp:// " . $ this ->_host . ": " . $ this ->_port ;
243+ }
244+ if (isset ($ port )) {
245+ $ this ->_port = $ port ;
246+ $ this ->_address = "tcp:// " . $ this ->_host . ": " . $ this ->_port ;
247+ }
248+ $ verbose = ($ verbose ) ? 'true ' : 'false ' ;
249+ $ pedantic = ($ pedantic ) ? 'true ' : 'false ' ;
250+ $ reconnect = ($ reconnect ) ? 'true ' : 'false ' ;
251+
252+ $ options = '{ ' ;
253+ $ options .= ' "verbose": ' . $ verbose . ', ' ;
254+ $ options .= ' "pedantic": ' . $ pedantic . ', ' ;
255+ $ options .= ' "reconnect": ' . $ reconnect ;
256+ $ options .= ' } ' ;
257+
258+ $ this ->_streamSocket = $ this ->_getStream ($ this ->_address );
259+ $ msg = 'CONNECT ' . $ options ;
215260 $ this ->_send ($ msg );
216261 }
217262
@@ -273,41 +318,61 @@ public function unsubscribe($sid)
273318 $ this ->_send ($ msg );
274319 }
275320
321+ /**
322+ * Handles PING command
323+ *
324+ * @return void
325+ */
326+ private function _handlePING ()
327+ {
328+ $ this ->_send ("PONG " );
329+ }
330+
331+ /**
332+ * Handles MSG command
333+ *
334+ * @param string $line Message command from NATS
335+ *
336+ * @return \Exception|void
337+ */
338+ private function _handleMSG ($ line )
339+ {
340+ $ parts = explode (" " , $ line );
341+ $ length = $ parts [3 ];
342+ $ sid = $ parts [2 ];
343+
344+ $ payload = $ this ->_receive ($ length );
345+
346+ $ func = $ this ->_subscriptions [$ sid ];
347+ if (is_callable ($ func )) {
348+ $ func ($ payload );
349+ } else {
350+ return new \Exception ("not callable " );
351+ }
352+ }
353+
276354 /**
277355 * Waits for messages
278356 *
279357 * @param int $quantity Number of messages to wait for
280- *
358+ *
281359 * @return \Exception|void
282360 */
283361 public function wait ($ quantity = 0 )
284362 {
285363 $ count = 0 ;
286- while (!feof ($ this ->_fp )) {
364+ while (!feof ($ this ->_streamSocket )) {
287365 $ line = $ this ->_receive ();
288366
289367 // PING
290368 if (strpos ($ line , 'PING ' ) === 0 ) {
291- $ this ->_send ( " PONG " );
369+ $ this ->_handlePing ( );
292370 }
293371
294372 // MSG
295373 if (strpos ($ line , 'MSG ' ) === 0 ) {
296374 $ count = $ count + 1 ;
297-
298- $ parts = explode (" " , $ line );
299- $ length = $ parts [3 ];
300- $ sid = $ parts [2 ];
301-
302- $ payload = $ this ->_receive ($ length );
303-
304- $ func = $ this ->_subscriptions [$ sid ];
305- if (is_callable ($ func )) {
306- $ func ($ payload );
307- } else {
308- return new \Exception ("not callable " );
309- }
310-
375+ $ this ->_handleMSG ($ line );
311376 if (($ quantity != 0 ) && ($ count >= $ quantity )) {
312377 return null ;
313378 }
@@ -336,7 +401,8 @@ public function reconnect()
336401 */
337402 public function close ()
338403 {
339- fclose ($ this ->_fp );
404+ fclose ($ this ->_streamSocket );
405+ $ this ->_streamSocket = null ;
340406 }
341407
342408}
0 commit comments