@@ -157,17 +157,13 @@ private function send($payload)
157157 */
158158 private function receive ($ len = null )
159159 {
160+
160161 if ($ len ) {
161- $ line = fgets ($ this ->streamSocket , $ len + 1 );
162+ $ line = fread ($ this ->streamSocket , $ len );
162163 } else {
163164 $ line = fgets ($ this ->streamSocket );
164165 }
165-
166- if ($ line === false ) {
167- return $ line ;
168- } else {
169- return trim ($ line );
170- }
166+ return $ line ;
171167 }
172168
173169 /**
@@ -300,6 +296,25 @@ public function subscribe($subject, \Closure $callback)
300296 return $ sid ;
301297 }
302298
299+ /**
300+ * Subscribes to an specific event given a subject and a queue.
301+ *
302+ * @param string $subject Message topic.
303+ * @param string $queue Queue name.
304+ * @param \Closure $callback Closure to be executed as callback.
305+ *
306+ * @return string
307+ */
308+ public function queueSubscribe ($ subject , $ queue , \Closure $ callback )
309+ {
310+ $ sid = uniqid ();
311+ $ msg = 'SUB ' .$ subject .' ' .$ queue .' ' . $ sid ;
312+ $ this ->send ($ msg );
313+ $ this ->subscriptions [$ sid ] = $ callback ;
314+
315+ return $ sid ;
316+ }
317+
303318 /**
304319 * Unsubscribe from a event given a subject.
305320 *
@@ -337,14 +352,14 @@ private function handleMSG($line)
337352 {
338353 $ parts = explode (' ' , $ line );
339354 $ subject = null ;
340- $ length = $ parts [3 ];
355+ $ length = trim ( $ parts [3 ]) ;
341356 $ sid = $ parts [2 ];
342357
343358 if (count ($ parts ) == 5 ) {
344- $ length = $ parts [4 ];
359+ $ length = trim ( $ parts [4 ]) ;
345360 $ subject = $ parts [3 ];
346361 } elseif (count ($ parts ) == 4 ) {
347- $ length = $ parts [3 ];
362+ $ length = trim ( $ parts [3 ]) ;
348363 $ subject = $ parts [1 ];
349364 }
350365
@@ -373,6 +388,7 @@ public function wait($quantity = 0)
373388 $ count = 0 ;
374389 while (!feof ($ this ->streamSocket )) {
375390 $ line = $ this ->receive ();
391+
376392 if ($ line === false ) {
377393 return null ;
378394 }
0 commit comments