@@ -199,6 +199,26 @@ public function ping()
199199 $ this ->pings += 1 ;
200200 }
201201
202+ /**
203+ * Request does a request and executes a callback with the response.
204+ *
205+ * @param string $subject Message topic.
206+ * @param string $payload Message data.
207+ * @param resource $callback Closure to be executed as callback.
208+ * @param integer $quantity Number of messages to wait for.
209+ */
210+ public function request ($ subject , $ payload , $ callback , $ wait = 1 )
211+ {
212+ $ inbox = uniqid ('_INBOX. ' );
213+ $ this ->subscribe ($ inbox , $ callback );
214+
215+ $ msg = 'PUB ' .$ subject .' ' .$ inbox .' ' .strlen ($ payload );
216+ $ this ->send ($ msg . "\r\n" . $ payload );
217+ $ this ->pubs += 1 ;
218+
219+ $ this ->wait ($ wait );
220+ }
221+
202222 /**
203223 * Publish publishes the data argument to the given subject.
204224 *
@@ -264,18 +284,25 @@ private function handlePING()
264284 private function handleMSG ($ line )
265285 {
266286 $ parts = explode (' ' , $ line );
287+ $ subject = null ;
267288 $ length = $ parts [3 ];
268289 $ sid = $ parts [2 ];
269290
291+ if (count ($ parts ) == 5 ) {
292+ $ length = $ parts [5 ];
293+ $ subject = $ parts [3 ];
294+ }
295+
270296 $ payload = $ this ->receive ($ length );
297+ $ msg = new Message ($ subject , $ payload , $ sid , $ this );
271298
272299 $ func = $ this ->subscriptions [$ sid ];
273300 if (is_callable ($ func )) {
274- $ func ($ payload );
301+ $ func ($ msg );
275302 } else {
276303 return new \Exception ('not callable ' );
277304 }
278-
305+
279306 return ;
280307 }
281308
0 commit comments