Skip to content

Commit 27a4c30

Browse files
authored
Buffered requestResignChan (#1207)
Aims to fix another intermittent test failure: https://github.com/riverqueue/river/actions/runs/24312716933/job/70985012980?pr=1205 --- FAIL: TestElector_WithNotifier (0.00s) --- FAIL: TestElector_WithNotifier/RequestResignStress (20.41s) elector_test.go:350: Generated postgres schema "leadership_2026_04_12t17_52_43_schema_04" with migrations [1 2 3 4 5 6] on line "main" in 229.626141ms [4 generated] [0 reused] elector_test.go:352: Starting test_client_id elector_test.go:375: Requesting leadership resign elector_test.go:375: Requesting leadership resign elector_test.go:375: Requesting leadership resign elector_test.go:375: Requesting leadership resign elector_test.go:375: Requesting leadership resign test_signal.go:95: timed out waiting on test signal after 10s test_signal.go:95: timed out waiting on test signal after 10s riverdbtest.go:293: Checked in postgres schema "leadership_2026_04_12t17_52_43_schema_04"; 1 idle schema(s) [5 generated] [3 reused] FAIL FAIL github.com/riverqueue/river/internal/leadership 20.948s The problem is that when `requestResignChan` is unbuffered, if `keepLeadershipLoop` hasn't yet entered its `select`, then the `default` statement on the `select` below will cause all senders (we have 5 competing senders in the `RequestResignStress` test case) to fall through without sending anything: select { case <-ctx.Done(): case e.requestResignChan <- struct{}{}: default: // if context is not done and requestResignChan has an item in it // already, do nothing } By making the channel buffered, we guarantee at least one sender gets a message through, and we don't end up hanging the test.
1 parent 3f55dd5 commit 27a4c30

2 files changed

Lines changed: 28 additions & 1 deletion

File tree

internal/leadership/elector.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ func (e *Elector) Start(ctx context.Context) error {
139139
// We'll send to this channel anytime a leader resigns on the key with `name`
140140
e.leaderResignedChan = make(chan struct{})
141141

142-
e.requestResignChan = make(chan struct{})
142+
// Buffered to 1 so a send from handleLeadershipNotification doesn't block
143+
// if keepLeadershipLoop hasn't entered its select yet.
144+
e.requestResignChan = make(chan struct{}, 1)
143145

144146
var sub *notifier.Subscription
145147
if e.notifier == nil {

internal/leadership/elector_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,31 @@ func testElector[TElectorBundle any](
344344
startstoptest.Stress(ctx, t, elector)
345345
})
346346

347+
t.Run("RequestResignImmediatelyAfterElection", func(t *testing.T) {
348+
t.Parallel()
349+
350+
elector, _ := setup(t, nil)
351+
352+
startElector(ctx, t, elector)
353+
354+
elector.testSignals.GainedLeadership.WaitOrTimeout()
355+
356+
// Send a resign request immediately after gaining leadership.
357+
// GainedLeadership is signaled _before_ keepLeadershipLoop is
358+
// entered, so the resign request arrives before the loop's
359+
// select. This only works if requestResignChan is buffered;
360+
// with an unbuffered channel the send would be dropped by the
361+
// default case since nobody is receiving yet.
362+
payload, err := json.Marshal(DBNotification{
363+
Action: DBNotificationKindRequestResign,
364+
})
365+
require.NoError(t, err)
366+
elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(payload))
367+
368+
elector.testSignals.ResignedLeadership.WaitOrTimeout()
369+
elector.testSignals.GainedLeadership.WaitOrTimeout()
370+
})
371+
347372
t.Run("RequestResignStress", func(t *testing.T) {
348373
t.Parallel()
349374

0 commit comments

Comments
 (0)