Skip to content

Commit 7db0510

Browse files
committed
coalesce resigned wakeups
A follower receiving `resigned` notifications could block the shared notifier delivery goroutine if its wakeup channel was already full. That stalls notification delivery beyond leader election, because the notifier invokes callbacks synchronously on one goroutine. Treat follower resignation wakeups the same way as forced-resign wakeups: coalesced, buffered signals instead of preserved blocking sends. The elector still retries promptly on the pending wakeup, and its normal election timer remains the backstop if multiple resignations collapse together. Regression coverage now asserts the second `resigned` notification is coalesced instead of blocking, and that a full follower wakeup channel does not stall notification handling.
1 parent e6b35e6 commit 7db0510

2 files changed

Lines changed: 20 additions & 31 deletions

File tree

internal/leadership/elector.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -305,10 +305,7 @@ func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifi
305305
return
306306
}
307307

308-
select {
309-
case <-ctx.Done():
310-
case e.leaderResignedChan <- struct{}{}:
311-
}
308+
trySendWakeup(ctx, e.leaderResignedChan)
312309
}
313310
}
314311

internal/leadership/elector_test.go

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -395,19 +395,24 @@ func testElector[TElectorBundle any](
395395
require.Equal(t, elector2.config.ClientID, leader2.LeaderID)
396396
})
397397

398-
t.Run("RetriesOncePerResignation", func(t *testing.T) {
398+
t.Run("CoalescesResignedWakeups", func(t *testing.T) {
399399
t.Parallel()
400400

401401
elector, _ := setup(t, nil)
402-
elector.config.ElectInterval = time.Hour
402+
elector.config.ElectInterval = 100 * time.Millisecond
403+
elector.config.ElectIntervalJitter = 10 * time.Millisecond
403404
elector.leaderResignedChan = make(chan struct{}, 1)
404405

405-
var attempt int
406+
var (
407+
attempt int
408+
attemptTimes []time.Time
409+
)
406410

407411
elector.exec = &leaderAttemptScriptExecutorMock{
408412
Executor: elector.exec,
409413
LeaderAttemptElectFunc: func(ctx context.Context, params *riverdriver.LeaderElectParams) (bool, error) {
410414
attempt++
415+
attemptTimes = append(attemptTimes, time.Now())
411416

412417
switch attempt {
413418
case 1, 2:
@@ -431,22 +436,20 @@ func testElector[TElectorBundle any](
431436

432437
select {
433438
case <-secondNotificationDone:
434-
require.Fail(t, "expected second resignation notification to wait for wakeup channel capacity")
435439
case <-time.After(50 * time.Millisecond):
440+
require.Fail(t, "expected second resignation notification to be coalesced instead of blocking")
436441
}
437442

438-
attemptCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
443+
attemptCtx, cancel := context.WithTimeout(ctx, time.Second)
439444
defer cancel()
440445

446+
start := time.Now()
441447
require.NoError(t, elector.attemptGainLeadershipLoop(attemptCtx))
442-
443-
select {
444-
case <-secondNotificationDone:
445-
case <-time.After(100 * time.Millisecond):
446-
require.Fail(t, "expected second resignation notification to complete after wakeup channel was drained")
447-
}
448+
require.GreaterOrEqual(t, time.Since(start), 75*time.Millisecond)
448449

449450
require.Equal(t, 3, attempt)
451+
require.Len(t, attemptTimes, 3)
452+
require.GreaterOrEqual(t, attemptTimes[2].Sub(attemptTimes[1]), 75*time.Millisecond)
450453
})
451454

452455
t.Run("StartStopStress", func(t *testing.T) {
@@ -731,36 +734,25 @@ func TestElectorHandleLeadershipNotification(t *testing.T) {
731734
require.Empty(t, elector.leaderResignedChan)
732735
})
733736

734-
t.Run("SignalsLeadershipChangeWaitsForWakeupCapacity", func(t *testing.T) {
737+
t.Run("SignalsLeadershipChangeDoesNotBlockOnFullWakeup", func(t *testing.T) {
735738
t.Parallel()
736739

737740
elector, _ := setup(t)
738741
elector.leaderResignedChan <- struct{}{}
739742

740-
ctx, cancel := context.WithCancel(context.Background())
741-
defer cancel()
742-
743743
done := make(chan struct{})
744744

745745
go func() {
746746
defer close(done)
747-
elector.handleLeadershipNotification(ctx, notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange())))
747+
elector.handleLeadershipNotification(context.Background(), notifier.NotificationTopicLeadership, string(mustMarshalJSON(t, validLeadershipChange())))
748748
}()
749749

750-
select {
751-
case <-done:
752-
require.Fail(t, "expected leadership notification to wait for wakeup channel capacity")
753-
case <-time.After(50 * time.Millisecond):
754-
}
755-
756-
<-elector.leaderResignedChan
757-
758750
select {
759751
case <-done:
760752
case <-time.After(100 * time.Millisecond):
761-
cancel()
762-
<-done
763-
require.Fail(t, "expected leadership notification to proceed after wakeup channel was drained")
753+
require.Fail(t, "expected leadership notification to coalesce the wakeup instead of blocking")
764754
}
755+
756+
require.Len(t, elector.leaderResignedChan, 1)
765757
})
766758
}

0 commit comments

Comments
 (0)