@@ -114,13 +114,6 @@ public function getSubscriptions()
114114 */
115115 private $ streamSocket ;
116116
117- /**
118- * Stream wrapper for testing purposes.
119- *
120- * @var mixed StreamWrapper.
121- */
122- private $ streamWrapper ;
123-
124117 /**
125118 * @var Generator
126119 */
@@ -137,7 +130,6 @@ public function __construct(ConnectionOptions $options = null)
137130 $ this ->pubs = 0 ;
138131 $ this ->subscriptions = [];
139132 $ this ->options = $ options ;
140- $ this ->streamWrapper = new StreamWrapper ();
141133 $ randomFactory = new Factory ();
142134 $ this ->randomGenerator = $ randomFactory ->getLowStrengthGenerator ();
143135
@@ -146,18 +138,6 @@ public function __construct(ConnectionOptions $options = null)
146138 }
147139 }
148140
149- /**
150- * Setter for $streamWrapper. For testing purposes.
151- *
152- * @param StreamWrapper $streamWrapper StreamWrapper for testing purposes.
153- *
154- * @return void
155- */
156- public function setStreamWrapper (StreamWrapper $ streamWrapper )
157- {
158- $ this ->streamWrapper = $ streamWrapper ;
159- }
160-
161141 /**
162142 * Sends data thought the stream.
163143 *
@@ -187,7 +167,7 @@ private function receive($len = null)
187167 $ receivedBytes = 0 ;
188168 while ($ receivedBytes < $ len ) {
189169 $ bytesLeft = $ len - $ receivedBytes ;
190- if ( $ bytesLeft < 1500 ) {
170+ if ( $ bytesLeft < $ this -> chunkSize ) {
191171 $ chunkSize = $ bytesLeft ;
192172 }
193173
@@ -220,13 +200,16 @@ private function getStream($address, $timeout = null)
220200 $ errno = null ;
221201 $ errstr = null ;
222202
223- $ fp = $ this ->streamWrapper ->getStreamSocketClient ($ address , $ errno , $ errstr , $ timeout , STREAM_CLIENT_CONNECT );
203+ $ fp = stream_socket_client ($ address , $ errno , $ errstr , $ timeout , STREAM_CLIENT_CONNECT );
204+ $ timeout = number_format ($ timeout , 3 );
205+ $ seconds = floor ($ timeout );
206+ $ microseconds = ($ timeout - $ seconds ) * 1000 ;
207+ stream_set_timeout ($ fp , $ seconds , $ microseconds );
224208
225209 if (!$ fp ) {
226210 throw new \Exception ($ errstr , $ errno );
227211 }
228212
229- //stream_set_blocking($fp, 0);
230213 return $ fp ;
231214 }
232215
@@ -250,19 +233,22 @@ public function isConnected()
250233 */
251234 public function connect ($ timeout = null )
252235 {
236+
253237 $ this ->timeout = $ timeout ;
254238 $ this ->streamSocket = $ this ->getStream ($ this ->options ->getAddress (), $ timeout );
239+
255240 $ msg = 'CONNECT ' .$ this ->options ;
256241 $ this ->send ($ msg );
257-
258- $ response = $ this ->receive ();
242+ $ connect_response = $ this ->receive ();
243+ if (strpos ($ connect_response , '-ERR ' )!== false ) {
244+ throw new \Exception ("Failing connection: $ connect_response " );
245+ }
259246
260247 $ this ->ping ();
261- $ response = $ this ->receive ();
262-
263- if ($ response !== "PONG " ) {
264- if (strpos ($ response , '-ERR ' )!== false ) {
265- throw new \Exception ("Failing connection: $ response " );
248+ $ ping_response = $ this ->receive ();
249+ if ($ ping_response !== "PONG " ) {
250+ if (strpos ($ ping_response , '-ERR ' )!== false ) {
251+ throw new \Exception ("Failing on first ping: $ ping_response " );
266252 }
267253 }
268254 }
@@ -381,7 +367,7 @@ private function handlePING()
381367 /**
382368 * Handles MSG command.
383369 *
384- * @param string $line Message command from NATS .
370+ * @param string $line Message command from Nats .
385371 *
386372 * @return void
387373 * @throws Exception
@@ -471,7 +457,6 @@ public function setStreamTimeout($seconds)
471457 }
472458 }
473459 }
474-
475460 return false ;
476461 }
477462
0 commit comments