11<?php
22namespace Nats ;
33
4+ use RandomLib \Factory ;
5+ use RandomLib \Generator ;
6+
47/**
58 * Connection Class.
69 */
@@ -13,6 +16,12 @@ class Connection
1316 */
1417 private $ pings = 0 ;
1518
19+ /**
20+ * Chunk size in bytes to use when reading with fread.
21+ * @var int
22+ */
23+ private $ chunkSize = 8192 ;
24+
1625 /**
1726 * Return the number of pings.
1827 *
@@ -91,6 +100,13 @@ public function getSubscriptions()
91100 */
92101 private $ options = null ;
93102
103+ /**
104+ * Connection timeout
105+ *
106+ * @var float
107+ */
108+ private $ timeout = null ;
109+
94110 /**
95111 * Stream File Pointer.
96112 *
@@ -105,6 +121,11 @@ public function getSubscriptions()
105121 */
106122 private $ streamWrapper ;
107123
124+ /**
125+ * @var Generator
126+ */
127+ private $ randomGenerator ;
128+
108129 /**
109130 * Constructor.
110131 *
@@ -117,6 +138,8 @@ public function __construct(ConnectionOptions $options = null)
117138 $ this ->subscriptions = [];
118139 $ this ->options = $ options ;
119140 $ this ->streamWrapper = new StreamWrapper ();
141+ $ randomFactory = new Factory ();
142+ $ this ->randomGenerator = $ randomFactory ->getLowStrengthGenerator ();
120143
121144 if (is_null ($ options )) {
122145 $ this ->options = new ConnectionOptions ();
@@ -159,7 +182,21 @@ private function receive($len = null)
159182 {
160183
161184 if ($ len ) {
162- $ line = fread ($ this ->streamSocket , $ len );
185+ $ chunkSize = $ this ->chunkSize ;
186+ $ line = null ;
187+ $ receivedBytes = 0 ;
188+ while ($ receivedBytes < $ len ) {
189+ $ bytesLeft = $ len - $ receivedBytes ;
190+ if ( $ bytesLeft < 1500 ) {
191+ $ chunkSize = $ bytesLeft ;
192+ }
193+
194+ $ line .= fread ($ this ->streamSocket , $ chunkSize );
195+ $ receivedBytes += $ chunkSize ;
196+ }
197+ if (strlen ($ line ) > 2 ) {
198+ $ line = substr ($ line , 0 , -2 );
199+ }
163200 } else {
164201 $ line = fgets ($ this ->streamSocket );
165202 }
@@ -170,7 +207,7 @@ private function receive($len = null)
170207 * Returns an stream socket to the desired server.
171208 *
172209 * @param string $address Server url string.
173- * @param integer $timeout Number of seconds until the connect() system call should timeout.
210+ * @param float $timeout Number of seconds until the connect() system call should timeout.
174211 *
175212 * @return resource
176213 * @throws \Exception Exception raised if connection fails.
@@ -206,13 +243,14 @@ public function isConnected()
206243 /**
207244 * Connect to server.
208245 *
209- * @param integer $timeout Number of seconds until the connect() system call should timeout.
246+ * @param float $timeout Number of seconds until the connect() system call should timeout.
210247 *
211248 * @throws \Exception Exception raised if connection fails.
212249 * @return void
213250 */
214251 public function connect ($ timeout = null )
215252 {
253+ $ this ->timeout = $ timeout ;
216254 $ this ->streamSocket = $ this ->getStream ($ this ->options ->getAddress (), $ timeout );
217255 $ msg = 'CONNECT ' .$ this ->options ;
218256 $ this ->send ($ msg );
@@ -271,7 +309,7 @@ public function request($subject, $payload, $callback, $wait = 1)
271309 *
272310 * @return void
273311 */
274- public function publish ($ subject , $ payload )
312+ public function publish ($ subject , $ payload = null )
275313 {
276314 $ msg = 'PUB ' .$ subject .' ' .strlen ($ payload );
277315 $ this ->send ($ msg . "\r\n" . $ payload );
@@ -288,7 +326,7 @@ public function publish($subject, $payload)
288326 */
289327 public function subscribe ($ subject , \Closure $ callback )
290328 {
291- $ sid = uniqid ( );
329+ $ sid = $ this -> randomGenerator -> generateString ( 16 );
292330 $ msg = 'SUB ' .$ subject .' ' .$ sid ;
293331 $ this ->send ($ msg );
294332 $ this ->subscriptions [$ sid ] = $ callback ;
@@ -307,7 +345,7 @@ public function subscribe($subject, \Closure $callback)
307345 */
308346 public function queueSubscribe ($ subject , $ queue , \Closure $ callback )
309347 {
310- $ sid = uniqid ( );
348+ $ sid = $ this -> randomGenerator -> generateString ( 16 );
311349 $ msg = 'SUB ' .$ subject .' ' .$ queue .' ' . $ sid ;
312350 $ this ->send ($ msg );
313351 $ this ->subscriptions [$ sid ] = $ callback ;
@@ -345,7 +383,8 @@ private function handlePING()
345383 *
346384 * @param string $line Message command from NATS.
347385 *
348- * @return \Exception|void
386+ * @return void
387+ * @throws Exception
349388 * @codeCoverageIgnore
350389 */
351390 private function handleMSG ($ line )
@@ -366,11 +405,15 @@ private function handleMSG($line)
366405 $ payload = $ this ->receive ($ length );
367406 $ msg = new Message ($ subject , $ payload , $ sid , $ this );
368407
408+ if (!isset ($ this ->subscriptions [$ sid ])) {
409+ throw new Exception ('subscription not found ' );
410+ }
411+
369412 $ func = $ this ->subscriptions [$ sid ];
370413 if (is_callable ($ func )) {
371414 $ func ($ msg );
372415 } else {
373- return new \ Exception ('not callable ' );
416+ throw new Exception ('not callable ' );
374417 }
375418
376419 return ;
@@ -413,7 +456,7 @@ public function wait($quantity = 0)
413456 /**
414457 * Set Stream Timeout.
415458 *
416- * @param integer $seconds Before timeout on stream.
459+ * @param float $seconds Before timeout on stream.
417460 *
418461 * @return boolean
419462 */
@@ -441,7 +484,14 @@ public function reconnect()
441484 {
442485 $ this ->reconnects += 1 ;
443486 $ this ->close ();
444- $ this ->connect ();
487+ $ this ->connect ($ this ->timeout );
488+ }
489+
490+ /**
491+ * @param integer $chunkSize Set byte chunk len to read when reading from wire
492+ */
493+ public function setChunkSize ($ chunkSize ){
494+ $ this ->chunkSize = $ chunkSize ;
445495 }
446496
447497 /**
0 commit comments