Skip to content

Commit 7bf8cf2

Browse files
authored
Try to start the queue maintainer multiple times with backoff (#1184)
This one's aimed at addressing #1161. `HookPeriodicJobsStart.Start` may return an error that causes the queue maintainer not to start, and there are a few other intermittent errors that may cause it not to start (say in the case of a transient DB problem). If this were to occur, the course of action currently is for the client to to just spit an error to logs and not try any additional remediation, which could have the effect of leaving the queue maintainer offline for extended periods. Here, try to address this broadly by allowing the queue maintainer a few attempts at starting, and with our standard exponential backoff (1s, 2s, 4s, 8s, etc.). In case a queue maintainer fails to start completely, the client requests resignation and hands leadership off to another client to see if it can start successfully. I think this is an okay compromise because in case of a non-transient fundamental error (say `HookPeriodicJobsStart.Start` always returns an error), we don't go into a hot loop that starts hammering things. Instead, we'll get a reasonably responsible slow back off that gives things a chance to recover, and which should be very visible in logs. Fixes #1161.
1 parent a0d45b0 commit 7bf8cf2

5 files changed

Lines changed: 199 additions & 48 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [Unreleased]
99

10+
### Fixed
11+
12+
- Upon a client gaining leadership, its queue maintainer is given more than one opportunity to start. [PR #1184](https://github.com/riverqueue/river/pull/1184).
13+
1014
## [0.33.0] - 2026-04-03
1115

1216
### Changed

client.go

Lines changed: 136 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/riverqueue/river/rivershared/testsignal"
3434
"github.com/riverqueue/river/rivershared/util/dbutil"
3535
"github.com/riverqueue/river/rivershared/util/maputil"
36+
"github.com/riverqueue/river/rivershared/util/serviceutil"
3637
"github.com/riverqueue/river/rivershared/util/sliceutil"
3738
"github.com/riverqueue/river/rivershared/util/testutil"
3839
"github.com/riverqueue/river/rivershared/util/valutil"
@@ -606,11 +607,20 @@ type Client[TTx any] struct {
606607
pilot riverpilot.Pilot
607608
producersByQueueName map[string]*producer
608609
queueMaintainer *maintenance.QueueMaintainer
609-
queues *QueueBundle
610-
services []startstop.Service
611-
stopped <-chan struct{}
612-
subscriptionManager *subscriptionManager
613-
testSignals clientTestSignals
610+
611+
// queueMaintainerEpoch is incremented each time leadership is gained,
612+
// giving each tryStartQueueMaintainer goroutine a term number.
613+
// queueMaintainerMu serializes epoch checks with Stop calls so that a
614+
// stale goroutine from an older term cannot tear down a maintainer
615+
// started by a newer term.
616+
queueMaintainerEpoch int64
617+
queueMaintainerMu sync.Mutex
618+
619+
queues *QueueBundle
620+
services []startstop.Service
621+
stopped <-chan struct{}
622+
subscriptionManager *subscriptionManager
623+
testSignals clientTestSignals
614624

615625
// workCancel cancels the context used for all work goroutines. Normal Stop
616626
// does not cancel that context.
@@ -619,7 +629,9 @@ type Client[TTx any] struct {
619629

620630
// Test-only signals.
621631
type clientTestSignals struct {
622-
electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader
632+
electedLeader testsignal.TestSignal[struct{}] // notifies when elected leader
633+
queueMaintainerStartError testsignal.TestSignal[error] // notifies on each failed queue maintainer start attempt
634+
queueMaintainerStartRetriesExhausted testsignal.TestSignal[struct{}] // notifies when leader resignation is requested after all queue maintainer start retries have been exhausted
623635

624636
jobCleaner *maintenance.JobCleanerTestSignals
625637
jobRescuer *maintenance.JobRescuerTestSignals
@@ -631,6 +643,8 @@ type clientTestSignals struct {
631643

632644
func (ts *clientTestSignals) Init(tb testutil.TestingTB) {
633645
ts.electedLeader.Init(tb)
646+
ts.queueMaintainerStartError.Init(tb)
647+
ts.queueMaintainerStartRetriesExhausted.Init(tb)
634648

635649
if ts.jobCleaner != nil {
636650
ts.jobCleaner.Init(tb)
@@ -1279,26 +1293,6 @@ func (c *Client[TTx]) logStatsLoop(ctx context.Context, shouldStart bool, starte
12791293
}
12801294

12811295
func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStart bool, started, stopped func()) error {
1282-
handleLeadershipChange := func(ctx context.Context, notification *leadership.Notification) {
1283-
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received",
1284-
slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader))
1285-
1286-
switch {
1287-
case notification.IsLeader:
1288-
// Starting the queue maintainer can take a little time so send to
1289-
// this test signal _first_ so tests waiting on it can finish,
1290-
// cancel the queue maintainer start, and overall run much faster.
1291-
c.testSignals.electedLeader.Signal(struct{}{})
1292-
1293-
if err := c.queueMaintainer.Start(ctx); err != nil {
1294-
c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error()))
1295-
}
1296-
1297-
default:
1298-
c.queueMaintainer.Stop()
1299-
}
1300-
}
1301-
13021296
if !shouldStart {
13031297
return nil
13041298
}
@@ -1310,20 +1304,135 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
13101304
sub := c.elector.Listen()
13111305
defer sub.Unlisten()
13121306

1307+
// Cancel function for an in-progress tryStartQueueMaintainer. If
1308+
// leadership is lost while the start process is still retrying, used to
1309+
// abort it promptly instead of waiting for retries to finish.
1310+
var cancelQueueMaintainerStart context.CancelCauseFunc = func(_ error) {}
1311+
13131312
for {
13141313
select {
13151314
case <-ctx.Done():
1315+
cancelQueueMaintainerStart(context.Cause(ctx))
13161316
return
13171317

13181318
case notification := <-sub.C():
1319-
handleLeadershipChange(ctx, notification)
1319+
c.baseService.Logger.DebugContext(ctx, c.baseService.Name+": Election change received",
1320+
slog.String("client_id", c.config.ID), slog.Bool("is_leader", notification.IsLeader))
1321+
1322+
switch {
1323+
case notification.IsLeader:
1324+
// Starting the queue maintainer takes time, so send the
1325+
// test signal first. Tests waiting on it can receive it,
1326+
// cancel the queue maintainer start, and finish faster.
1327+
c.testSignals.electedLeader.Signal(struct{}{})
1328+
1329+
// Start the queue maintainer with retries and exponential
1330+
// backoff in a separate goroutine so the leadership change
1331+
// loop remains responsive to new notifications. startCtx is
1332+
// used for cancellation in case leadership is lost while
1333+
// retries are in progress.
1334+
//
1335+
// Epoch is incremented so stale tryStartQueueMaintainer
1336+
// goroutines from a previous term cannot call Stop after a
1337+
// new term has begun.
1338+
var startCtx context.Context
1339+
startCtx, cancelQueueMaintainerStart = context.WithCancelCause(ctx)
1340+
1341+
c.queueMaintainerMu.Lock()
1342+
c.queueMaintainerEpoch++
1343+
epoch := c.queueMaintainerEpoch
1344+
c.queueMaintainerMu.Unlock()
1345+
1346+
go c.tryStartQueueMaintainer(startCtx, epoch)
1347+
1348+
default:
1349+
// Cancel any in-progress start attempts before stopping.
1350+
// Send a startstop.ErrStop to make sure services like
1351+
// Reindexer run any specific cleanup code for stops.
1352+
cancelQueueMaintainerStart(startstop.ErrStop)
1353+
cancelQueueMaintainerStart = func(_ error) {}
1354+
1355+
c.queueMaintainer.Stop()
1356+
}
13201357
}
13211358
}
13221359
}()
13231360

13241361
return nil
13251362
}
13261363

1364+
// Tries to start the queue maintainer after gaining leadership. We allow some
1365+
// retries with exponential backoff in case of failure, and in case the queue
1366+
// maintainer can't be started, we request resignation to allow another client
1367+
// to try and take over.
1368+
func (c *Client[TTx]) tryStartQueueMaintainer(ctx context.Context, epoch int64) {
1369+
const maxStartAttempts = 3
1370+
1371+
ctxCancelled := func() bool {
1372+
if ctx.Err() != nil {
1373+
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Queue maintainer start cancelled")
1374+
return true
1375+
}
1376+
return false
1377+
}
1378+
1379+
// stopIfCurrentEpoch atomically checks whether this goroutine's epoch is
1380+
// still the active one and calls Stop only if it is. Combined with the
1381+
// epoch increment in handleLeadershipChangeLoop, prevents stale goroutine
1382+
// from stopping a maintainer started by a newer leadership term.
1383+
stopIfCurrentEpoch := func() bool {
1384+
c.queueMaintainerMu.Lock()
1385+
defer c.queueMaintainerMu.Unlock()
1386+
1387+
if c.queueMaintainerEpoch != epoch {
1388+
return false
1389+
}
1390+
1391+
c.queueMaintainer.Stop()
1392+
return true
1393+
}
1394+
1395+
var lastErr error
1396+
for attempt := 1; attempt <= maxStartAttempts; attempt++ {
1397+
if ctxCancelled() {
1398+
return
1399+
}
1400+
1401+
if lastErr = c.queueMaintainer.Start(ctx); lastErr == nil {
1402+
return
1403+
}
1404+
1405+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error starting queue maintainer",
1406+
slog.String("err", lastErr.Error()), slog.Int("attempt", attempt))
1407+
1408+
c.testSignals.queueMaintainerStartError.Signal(lastErr)
1409+
1410+
// Stop the queue maintainer to fully reset its state (and any
1411+
// sub-services) before retrying. The epoch check ensures a stale
1412+
// goroutine cannot stop a maintainer from a newer leadership term.
1413+
if !stopIfCurrentEpoch() {
1414+
return
1415+
}
1416+
1417+
if attempt < maxStartAttempts {
1418+
serviceutil.CancellableSleep(ctx, serviceutil.ExponentialBackoff(attempt, serviceutil.MaxAttemptsBeforeResetDefault))
1419+
}
1420+
}
1421+
1422+
if ctxCancelled() {
1423+
return
1424+
}
1425+
1426+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Queue maintainer failed to start after all attempts, requesting leader resignation",
1427+
slog.String("err", lastErr.Error()))
1428+
1429+
c.testSignals.queueMaintainerStartRetriesExhausted.Signal(struct{}{})
1430+
1431+
if err := c.clientNotifyBundle.RequestResign(ctx); err != nil {
1432+
c.baseService.Logger.ErrorContext(ctx, c.baseService.Name+": Error requesting leader resignation", slog.String("err", err.Error()))
1433+
}
1434+
}
1435+
13271436
// Driver exposes the underlying driver used by the client.
13281437
//
13291438
// API is not stable. DO NOT USE.

client_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5145,6 +5145,31 @@ func Test_Client_Maintenance(t *testing.T) {
51455145
require.True(t, svc.RemoveByID("new_periodic_job"))
51465146
})
51475147

5148+
t.Run("QueueMaintainerStartRetriesAndResigns", func(t *testing.T) {
5149+
t.Parallel()
5150+
5151+
config := newTestConfig(t, "")
5152+
config.Hooks = []rivertype.Hook{
5153+
HookPeriodicJobsStartFunc(func(ctx context.Context, params *rivertype.HookPeriodicJobsStartParams) error {
5154+
return errors.New("hook start error")
5155+
}),
5156+
}
5157+
5158+
client, _ := setup(t, config)
5159+
5160+
startClient(ctx, t, client)
5161+
client.testSignals.electedLeader.WaitOrTimeout()
5162+
5163+
// Wait for all 3 retry attempts to fail.
5164+
for range 3 {
5165+
err := client.testSignals.queueMaintainerStartError.WaitOrTimeout()
5166+
require.EqualError(t, err, "hook start error")
5167+
}
5168+
5169+
// After all retries exhausted, the client should request resignation.
5170+
client.testSignals.queueMaintainerStartRetriesExhausted.WaitOrTimeout()
5171+
})
5172+
51485173
t.Run("PeriodicJobEnqueuerWithInsertOpts", func(t *testing.T) {
51495174
t.Parallel()
51505175

internal/maintenance/periodic_job_enqueuer.go

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -318,31 +318,42 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
318318

319319
s.StaggerStart(ctx)
320320

321-
initialPeriodicJobs, err := s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{
322-
Schema: s.Config.Schema,
323-
})
324-
if err != nil {
325-
return err
326-
}
321+
var (
322+
initialPeriodicJobs []*riverpilot.PeriodicJob
323+
subServices []startstop.Service
324+
)
325+
if err := func() error {
326+
var err error
327+
initialPeriodicJobs, err = s.Config.Pilot.PeriodicJobGetAll(ctx, s.exec, &riverpilot.PeriodicJobGetAllParams{
328+
Schema: s.Config.Schema,
329+
})
330+
if err != nil {
331+
return err
332+
}
327333

328-
for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) {
329-
if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert
330-
DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob {
331-
return (*rivertype.DurablePeriodicJob)(job)
332-
}),
333-
}); err != nil {
334+
for _, hook := range s.Config.HookLookupGlobal.ByHookKind(hooklookup.HookKindPeriodicJobsStart) {
335+
if err := hook.(rivertype.HookPeriodicJobsStart).Start(ctx, &rivertype.HookPeriodicJobsStartParams{ //nolint:forcetypeassert
336+
DurableJobs: sliceutil.Map(initialPeriodicJobs, func(job *riverpilot.PeriodicJob) *rivertype.DurablePeriodicJob {
337+
return (*rivertype.DurablePeriodicJob)(job)
338+
}),
339+
}); err != nil {
340+
return err
341+
}
342+
}
343+
344+
subServices = []startstop.Service{
345+
startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically),
346+
}
347+
stopServicesOnError := func() {
348+
startstop.StopAllParallel(subServices...)
349+
}
350+
if err := startstop.StartAll(ctx, subServices...); err != nil {
351+
stopServicesOnError()
334352
return err
335353
}
336-
}
337354

338-
subServices := []startstop.Service{
339-
startstop.StartStopFunc(s.periodicJobKeepAliveAndReapPeriodically),
340-
}
341-
stopServicesOnError := func() {
342-
startstop.StopAllParallel(subServices...)
343-
}
344-
if err := startstop.StartAll(ctx, subServices...); err != nil {
345-
stopServicesOnError()
355+
return nil
356+
}(); err != nil {
346357
stopped()
347358
return err
348359
}

internal/maintenance/queue_maintainer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ func (m *QueueMaintainer) Start(ctx context.Context) error {
5050

5151
for _, service := range m.servicesByName {
5252
if err := service.Start(ctx); err != nil {
53+
startstop.StopAllParallel(maputil.Values(m.servicesByName)...)
54+
stopped()
5355
return err
5456
}
5557
}

0 commit comments

Comments
 (0)