@@ -115,7 +115,7 @@ public function getSubscriptions()
115115 private $ streamSocket ;
116116
117117 /**
118- * @var Generator
118+ * @var Generator|Php71RandomGenerator
119119 */
120120 private $ randomGenerator ;
121121
@@ -130,8 +130,12 @@ public function __construct(ConnectionOptions $options = null)
130130 $ this ->pubs = 0 ;
131131 $ this ->subscriptions = [];
132132 $ this ->options = $ options ;
133- $ randomFactory = new Factory ();
134- $ this ->randomGenerator = $ randomFactory ->getLowStrengthGenerator ();
133+ if (version_compare (phpversion (), '7.0 ' , '> ' )){
134+ $ this ->randomGenerator = new Php71RandomGenerator ();
135+ } else {
136+ $ randomFactory = new Factory ();
137+ $ this ->randomGenerator = $ randomFactory ->getLowStrengthGenerator ();
138+ }
135139
136140 if (is_null ($ options )) {
137141 $ this ->options = new ConnectionOptions ();
@@ -148,7 +152,21 @@ public function __construct(ConnectionOptions $options = null)
148152 private function send ($ payload )
149153 {
150154 $ msg = $ payload ."\r\n" ;
151- fwrite ($ this ->streamSocket , $ msg , strlen ($ msg ));
155+ $ len = strlen ($ msg );
156+ while (true ) {
157+ if (false === ($ written = @fwrite ($ this ->streamSocket , $ msg ))) {
158+ throw new \Exception ('Error sending data ' );
159+ }
160+ if ($ written === 0 ) {
161+ throw new \Exception ('Broken pipe or closed connection ' );
162+ }
163+ $ len = $ len - $ written ;
164+ if ($ len > 0 ) {
165+ $ msg = substr ($ msg , 0 - $ len );
166+ } else {
167+ break ;
168+ }
169+ }
152170 }
153171
154172 /**
@@ -195,16 +213,19 @@ private function getStream($address, $timeout)
195213 $ errno = null ;
196214 $ errstr = null ;
197215
216+ set_error_handler (function (){return true ;});
198217 $ fp = stream_socket_client ($ address , $ errno , $ errstr , $ timeout , STREAM_CLIENT_CONNECT );
218+ restore_error_handler ();
219+
220+ if (!$ fp ) {
221+ throw Exception::forStreamSocketClientError ($ errstr , $ errno );
222+ }
223+
199224 $ timeout = number_format ($ timeout , 3 );
200225 $ seconds = floor ($ timeout );
201226 $ microseconds = ($ timeout - $ seconds ) * 1000 ;
202227 stream_set_timeout ($ fp , $ seconds , $ microseconds );
203228
204- if (!$ fp ) {
205- throw new \Exception ($ errstr , $ errno );
206- }
207-
208229 return $ fp ;
209230 }
210231
@@ -239,16 +260,16 @@ public function connect($timeout = null)
239260 $ msg = 'CONNECT ' .$ this ->options ;
240261 $ this ->send ($ msg );
241262 $ connect_response = $ this ->receive ();
242- if (strpos ($ connect_response , '-ERR ' )!== false ) {
243- throw new \Exception ("Failing connection: $ connect_response " );
263+
264+ if ($ this ->isErrorResponse ($ connect_response )) {
265+ throw Exception::forFailedConnection ($ connect_response );
244266 }
245267
246268 $ this ->ping ();
247269 $ ping_response = $ this ->receive ();
248- if ($ ping_response !== "PONG " ) {
249- if (strpos ($ ping_response , '-ERR ' )!== false ) {
250- throw new \Exception ("Failing on first ping: $ ping_response " );
251- }
270+
271+ if ($ this ->isErrorResponse ($ ping_response )) {
272+ throw Exception::forFailedPing ($ ping_response );
252273 }
253274 }
254275
@@ -391,14 +412,14 @@ private function handleMSG($line)
391412 $ msg = new Message ($ subject , $ payload , $ sid , $ this );
392413
393414 if (!isset ($ this ->subscriptions [$ sid ])) {
394- throw new Exception ( ' subscription not found ' );
415+ throw Exception:: forSubscriptionNotFound ( $ sid );
395416 }
396417
397418 $ func = $ this ->subscriptions [$ sid ];
398419 if (is_callable ($ func )) {
399420 $ func ($ msg );
400421 } else {
401- throw new Exception ( ' not callable ' );
422+ throw Exception:: forSubscriptionCallbackInvalid ( $ sid );
402423 }
403424
404425 return ;
@@ -492,6 +513,9 @@ public function setChunkSize($chunkSize)
492513 */
493514 public function close ()
494515 {
516+ if ($ this ->streamSocket === null ) {
517+ return ;
518+ }
495519 fclose ($ this ->streamSocket );
496520 $ this ->streamSocket = null ;
497521 }
@@ -504,4 +528,15 @@ public function streamSocket()
504528 {
505529 return $ this ->streamSocket ;
506530 }
531+
532+ /**
533+ * Indicates whether $response is an error response.
534+ *
535+ * @param string $response The Nats Server response.
536+ * @return boolean
537+ */
538+ private function isErrorResponse ($ response )
539+ {
540+ return false !== strpos ('-ERR ' , $ response );
541+ }
507542}
0 commit comments