Skip to content
This repository was archived by the owner on Oct 30, 2024. It is now read-only.

Commit 59f7fdc

Browse files
committed
Fixed request <-> reply bug and correctly getting the response
1 parent e9bb6da commit 59f7fdc

3 files changed

Lines changed: 62 additions & 22 deletions

File tree

src/Nats/Connection.php

Lines changed: 57 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use RandomLib\Factory;
55
use RandomLib\Generator;
6+
use Symfony\Component\Validator\Constraints\True;
67

78
/**
89
* Connection Class.
@@ -15,18 +16,31 @@ class Connection
1516
{
1617

1718
/**
18-
* Number of PINGs.
19+
* Show DEBUG info?
1920
*
20-
* @var integer number of pings.
21+
* @var boolean $debug If debug is enabled.
2122
*/
22-
private $pings = 0;
23+
private $debug = false;
24+
2325

2426
/**
25-
* Chunk size in bytes to use when reading an stream of data.
27+
* Enable or disable debug mode.
2628
*
27-
* @var integer size of chunk.
29+
* @param boolean $debug If debug is enabled.
30+
*
31+
* @return void
2832
*/
29-
private $chunkSize = 1500;
33+
public function setDebug($debug)
34+
{
35+
$this->debug = $debug;
36+
}
37+
38+
/**
39+
* Number of PINGs.
40+
*
41+
* @var integer number of pings.
42+
*/
43+
private $pings = 0;
3044

3145

3246
/**
@@ -39,6 +53,13 @@ public function pingsCount()
3953
return $this->pings;
4054
}
4155

56+
/**
57+
* Chunk size in bytes to use when reading an stream of data.
58+
*
59+
* @var integer size of chunk.
60+
*/
61+
private $chunkSize = 1500;
62+
4263
/**
4364
* Number of messages published.
4465
*
@@ -319,6 +340,10 @@ private function send($payload)
319340
break;
320341
}
321342
}
343+
344+
if ($this->debug === true) {
345+
printf('>>>> %s', $msg);
346+
}
322347
}
323348

324349
/**
@@ -348,6 +373,10 @@ private function receive($len = 0)
348373
$line = fgets($this->streamSocket);
349374
}
350375

376+
if ($this->debug === true) {
377+
printf('<<<< %s\r\n', $line);
378+
}
379+
351380
return $line;
352381
}
353382

@@ -460,15 +489,12 @@ public function ping()
460489
public function request($subject, $payload, \Closure $callback)
461490
{
462491
$inbox = uniqid('_INBOX.');
463-
$this->subscribe(
492+
$sid = $this->subscribe(
464493
$inbox,
465494
$callback
466495
);
467-
468-
$msg = 'PUB '.$subject.' '.$inbox.' '.strlen($payload);
469-
$this->send($msg."\r\n".$payload);
470-
$this->pubs += 1;
471-
496+
$this->unsubscribe($sid, 1);
497+
$this->publish($subject, $payload, $inbox);
472498
$this->wait(1);
473499
}
474500

@@ -510,30 +536,43 @@ public function queueSubscribe($subject, $queue, \Closure $callback)
510536
/**
511537
* Unsubscribe from a event given a subject.
512538
*
513-
* @param string $sid Subscription ID.
539+
* @param string $sid Subscription ID.
540+
* @param integer $quantity Quantity of messages.
514541
*
515542
* @return void
516543
*/
517-
public function unsubscribe($sid)
544+
public function unsubscribe($sid, $quantity = null)
518545
{
519546
$msg = 'UNSUB '.$sid;
547+
if ($quantity !== null) {
548+
$msg = $msg.' '.$quantity;
549+
}
550+
520551
$this->send($msg);
521-
unset($this->subscriptions[$sid]);
552+
if ($quantity === null) {
553+
unset($this->subscriptions[$sid]);
554+
}
522555
}
523556

524557
/**
525558
* Publish publishes the data argument to the given subject.
526559
*
527560
* @param string $subject Message topic.
528561
* @param string $payload Message data.
562+
* @param string $inbox Message inbox.
529563
*
530564
* @return void
531-
565+
*
532566
* @throws Exception If subscription not found.
533567
*/
534-
public function publish($subject, $payload = null)
568+
public function publish($subject, $payload = null, $inbox = null)
535569
{
536-
$msg = 'PUB '.$subject.' '.strlen($payload);
570+
$msg = 'PUB '.$subject;
571+
if ($inbox !== null) {
572+
$msg = $msg.' '.$inbox;
573+
}
574+
575+
$msg = $msg.' '.strlen($payload);
537576
$this->send($msg."\r\n".$payload);
538577
$this->pubs += 1;
539578
}

src/Nats/EncodedConnection.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,14 @@ public function request($subject, $payload, \Closure $callback)
5151
*
5252
* @param string $subject Message topic.
5353
* @param string $payload Message data.
54+
* @param string $inbox Message inbox.
5455
*
5556
* @return void
5657
*/
57-
public function publish($subject, $payload = null)
58+
public function publish($subject, $payload = null, $inbox = null)
5859
{
5960
$payload = $this->encoder->encode($payload);
60-
parent::publish($subject, $payload);
61+
parent::publish($subject, $payload, $inbox);
6162
}
6263

6364
/**

src/Nats/Message.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ public function getConn()
176176
*/
177177
public function reply($body)
178178
{
179-
$this->getConn()->publish(
180-
$this->getSubject(),
179+
$this->conn->publish(
180+
$this->subject,
181181
$body
182182
);
183183
}

0 commit comments

Comments
 (0)