@@ -85,7 +85,7 @@ public function getSubscriptions()
8585 }
8686
8787 /**
88- * Connection options object
88+ * Connection options object.
8989 *
9090 * @var ConnectionOptions|null
9191 */
@@ -118,6 +118,7 @@ public function __construct(ConnectionOptions $options = null)
118118 * Sends data thought the stream.
119119 *
120120 * @param string $payload Message data.
121+ *
121122 * @return void
122123 */
123124 private function send ($ payload )
@@ -147,6 +148,7 @@ private function receive($len = null)
147148 *
148149 * @param string $address Server url string.
149150 * @param integer $timeout Number of seconds until the connect() system call should timeout.
151+ *
150152 * @return resource
151153 * @throws \Exception Exception raised if connection fails.
152154 */
@@ -177,6 +179,7 @@ public function isConnected()
177179 * Connect to server.
178180 *
179181 * @param integer $timeout Number of seconds until the connect() system call should timeout.
182+ *
180183 * @throws \Exception Exception raised if connection fails.
181184 * @return void
182185 */
@@ -199,11 +202,34 @@ public function ping()
199202 $ this ->pings += 1 ;
200203 }
201204
205+ /**
206+ * Request does a request and executes a callback with the response.
207+ *
208+ * @param string $subject Message topic.
209+ * @param string $payload Message data.
210+ * @param mixed $callback Closure to be executed as callback.
211+ * @param integer $wait Number of messages to wait for.
212+ *
213+ * @return void
214+ */
215+ public function request ($ subject , $ payload , $ callback , $ wait = 1 )
216+ {
217+ $ inbox = uniqid ('_INBOX. ' );
218+ $ this ->subscribe ($ inbox , $ callback );
219+
220+ $ msg = 'PUB ' .$ subject .' ' .$ inbox .' ' .strlen ($ payload );
221+ $ this ->send ($ msg . "\r\n" . $ payload );
222+ $ this ->pubs += 1 ;
223+
224+ $ this ->wait ($ wait );
225+ }
226+
202227 /**
203228 * Publish publishes the data argument to the given subject.
204229 *
205230 * @param string $subject Message topic.
206231 * @param string $payload Message data.
232+ *
207233 * @return void
208234 */
209235 public function publish ($ subject , $ payload )
@@ -216,8 +242,9 @@ public function publish($subject, $payload)
216242 /**
217243 * Subscribes to an specific event given a subject.
218244 *
219- * @param string $subject Message topic.
220- * @param resource $callback Closure to be executed as callback.
245+ * @param string $subject Message topic.
246+ * @param mixed $callback Closure to be executed as callback.
247+ *
221248 * @return string
222249 */
223250 public function subscribe ($ subject , $ callback )
@@ -234,6 +261,7 @@ public function subscribe($subject, $callback)
234261 * Unsubscribe from a event given a subject.
235262 *
236263 * @param string $sid Subscription ID.
264+ *
237265 * @return void
238266 */
239267 public function unsubscribe ($ sid )
@@ -260,29 +288,38 @@ private function handlePING()
260288 * @param string $line Message command from NATS.
261289 *
262290 * @return \Exception|void
291+ * @codeCoverageIgnore
263292 */
264293 private function handleMSG ($ line )
265294 {
266295 $ parts = explode (' ' , $ line );
296+ $ subject = null ;
267297 $ length = $ parts [3 ];
268298 $ sid = $ parts [2 ];
269299
300+ if (count ($ parts ) == 5 ) {
301+ $ length = $ parts [5 ];
302+ $ subject = $ parts [3 ];
303+ }
304+
270305 $ payload = $ this ->receive ($ length );
306+ $ msg = new Message ($ subject , $ payload , $ sid , $ this );
271307
272308 $ func = $ this ->subscriptions [$ sid ];
273309 if (is_callable ($ func )) {
274- $ func ($ payload );
310+ $ func ($ msg );
275311 } else {
276312 return new \Exception ('not callable ' );
277313 }
278-
314+
279315 return ;
280316 }
281317
282318 /**
283319 * Waits for messages.
284320 *
285321 * @param integer $quantity Number of messages to wait for.
322+ *
286323 * @return resource $connection Connection object
287324 */
288325 public function wait ($ quantity = 0 )
0 commit comments