@@ -15,18 +15,31 @@ class Connection
1515{
1616
1717 /**
18- * Number of PINGs.
18+ * Show DEBUG info?
1919 *
20- * @var integer number of pings .
20+ * @var boolean $debug If debug is enabled .
2121 */
22- private $ pings = 0 ;
22+ private $ debug = false ;
23+
2324
2425 /**
25- * Chunk size in bytes to use when reading an stream of data .
26+ * Enable or disable debug mode .
2627 *
27- * @var integer size of chunk.
28+ * @param boolean $debug If debug is enabled.
29+ *
30+ * @return void
2831 */
29- private $ chunkSize = 1500 ;
32+ public function setDebug ($ debug )
33+ {
34+ $ this ->debug = $ debug ;
35+ }
36+
37+ /**
38+ * Number of PINGs.
39+ *
40+ * @var integer number of pings.
41+ */
42+ private $ pings = 0 ;
3043
3144
3245 /**
@@ -39,6 +52,13 @@ public function pingsCount()
3952 return $ this ->pings ;
4053 }
4154
55+ /**
56+ * Chunk size in bytes to use when reading an stream of data.
57+ *
58+ * @var integer size of chunk.
59+ */
60+ private $ chunkSize = 1500 ;
61+
4262 /**
4363 * Number of messages published.
4464 *
@@ -319,6 +339,10 @@ private function send($payload)
319339 break ;
320340 }
321341 }
342+
343+ if ($ this ->debug === true ) {
344+ printf ('>>>> %s ' , $ msg );
345+ }
322346 }
323347
324348 /**
@@ -348,6 +372,10 @@ private function receive($len = 0)
348372 $ line = fgets ($ this ->streamSocket );
349373 }
350374
375+ if ($ this ->debug === true ) {
376+ printf ('<<<< %s\r\n ' , $ line );
377+ }
378+
351379 return $ line ;
352380 }
353381
@@ -460,14 +488,12 @@ public function ping()
460488 public function request ($ subject , $ payload , \Closure $ callback )
461489 {
462490 $ inbox = uniqid ('_INBOX. ' );
463- $ this ->subscribe (
491+ $ sid = $ this ->subscribe (
464492 $ inbox ,
465493 $ callback
466494 );
467- $ msg = 'PUB ' .$ subject .' ' .$ inbox .' ' .strlen ($ payload );
468- $ this ->send ($ msg ."\r\n" .$ payload );
469- $ this ->pubs += 1 ;
470-
495+ $ this ->unsubscribe ($ sid , 1 );
496+ $ this ->publish ($ subject , $ payload , $ inbox );
471497 $ this ->wait (1 );
472498 }
473499
@@ -509,30 +535,43 @@ public function queueSubscribe($subject, $queue, \Closure $callback)
509535 /**
510536 * Unsubscribe from a event given a subject.
511537 *
512- * @param string $sid Subscription ID.
538+ * @param string $sid Subscription ID.
539+ * @param integer $quantity Quantity of messages.
513540 *
514541 * @return void
515542 */
516- public function unsubscribe ($ sid )
543+ public function unsubscribe ($ sid, $ quantity = null )
517544 {
518545 $ msg = 'UNSUB ' .$ sid ;
546+ if ($ quantity !== null ) {
547+ $ msg = $ msg .' ' .$ quantity ;
548+ }
549+
519550 $ this ->send ($ msg );
520- unset($ this ->subscriptions [$ sid ]);
551+ if ($ quantity === null ) {
552+ unset($ this ->subscriptions [$ sid ]);
553+ }
521554 }
522555
523556 /**
524557 * Publish publishes the data argument to the given subject.
525558 *
526559 * @param string $subject Message topic.
527560 * @param string $payload Message data.
561+ * @param string $inbox Message inbox.
528562 *
529563 * @return void
530-
564+ *
531565 * @throws Exception If subscription not found.
532566 */
533- public function publish ($ subject , $ payload = null )
567+ public function publish ($ subject , $ payload = null , $ inbox = null )
534568 {
535- $ msg = 'PUB ' .$ subject .' ' .strlen ($ payload );
569+ $ msg = 'PUB ' .$ subject ;
570+ if ($ inbox !== null ) {
571+ $ msg = $ msg .' ' .$ inbox ;
572+ }
573+
574+ $ msg = $ msg .' ' .strlen ($ payload );
536575 $ this ->send ($ msg ."\r\n" .$ payload );
537576 $ this ->pubs += 1 ;
538577 }
0 commit comments