Skip to content
This repository was archived by the owner on Oct 30, 2024. It is now read-only.

Commit c56df00

Browse files
committed
- Ability to resubscribe on reconnect
1 parent b430f95 commit c56df00

3 files changed

Lines changed: 13 additions & 4 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ Now let's tell composer about our project's dependancies, in this case, PHPNats.
3030
```
3131
{
3232
"require": {
33-
"workfront/nats": "2.2.0"
33+
"workfront/nats": "2.2.1"
3434
}
3535
}
3636
```

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"type": "library",
55
"minimum-stability": "dev",
66
"license": "MIT",
7-
"version": "2.2.0",
7+
"version": "2.2.1",
88
"require": {
99
"php": "^7.1"
1010
},

src/Nats/Connection.php

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,7 @@ public function subscribe($subject, \Closure $callback)
488488
$msg = 'SUB '.$subject.' '.$sid;
489489
$this->send($msg);
490490
$this->subscriptions[$sid] = $callback;
491+
$this->registeredSubscriptions[$subject] = [null, $callback];
491492
return $sid;
492493
}
493494

@@ -505,6 +506,7 @@ public function queueSubscribe($subject, $queue, \Closure $callback)
505506
$msg = 'SUB '.$subject.' '.$queue.' '.$sid;
506507
$this->send($msg);
507508
$this->subscriptions[$sid] = $callback;
509+
$this->registeredSubscriptions[$subject] = [$queue, $callback];
508510
return $sid;
509511
}
510512

@@ -596,16 +598,23 @@ public function wait($quantity = 0)
596598
*/
597599
public function reconnect($resubscribe = false)
598600
{
601+
$sids = [];
599602
$this->reconnects += 1;
600603
$this->close();
601604
$this->connect($this->timeout);
602605
if ($resubscribe) {
603606
if ($this->isConnected()) {
604-
foreach ($this->registeredSubscriptions as $subject => $callback) {
605-
$this->subscribe($subject, $callback);
607+
foreach ($this->registeredSubscriptions as $subject => $info) {
608+
if ($info[0] === null) {
609+
$sids[$subject] = $this->subscribe($subject, $info[1]);
610+
}
611+
else {
612+
$sids[$subject] = $this->queueSubscribe($subject, $info[0], $info[1]);
613+
}
606614
}
607615
}
608616
}
617+
return $sids;
609618
}
610619

611620
/**

0 commit comments

Comments
 (0)