Skip to content

Commit ae600c4

Browse files
committed
publish leader state before signaling
A newly elected leader could still look like a follower when `GainedLeadership` was observed. The elector did not publish `isLeader = true` until `keepLeadershipLoop`, so a `request_resign` notification arriving in that transition window could be dropped as if no leader existed. Move leadership publication to the point immediately after election succeeds and before the gained-leadership signal is emitted. That makes the signal truthful: once observers see it, subscriber state and `isLeader` already reflect leadership. Add a DB-backed regression that keeps the initial subscriber notification buffered and verifies `GainedLeadership` waits for leader state publication. The immediate-resign regression comments now document the intended ordering.
1 parent 7db0510 commit ae600c4

2 files changed

Lines changed: 37 additions & 8 deletions

File tree

internal/leadership/elector.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ func (e *Elector) Start(ctx context.Context) error {
196196
return
197197
}
198198

199+
e.notifySubscribers(true)
200+
199201
e.Logger.DebugContext(ctx, e.Name+": Gained leadership", "client_id", e.config.ClientID)
200202
e.testSignals.GainedLeadership.Signal(struct{}{})
201203

@@ -312,9 +314,6 @@ func (e *Elector) handleLeadershipNotification(ctx context.Context, topic notifi
312314
var errLostLeadershipReelection = errors.New("lost leadership with no error")
313315

314316
func (e *Elector) keepLeadershipLoop(ctx context.Context) error {
315-
// notify all subscribers that we're the leader
316-
e.notifySubscribers(true)
317-
318317
// On the way out clear any another item that may have been added to
319318
// requestResignChan. Having isLeader set to false will prevent additional
320319
// items from being queued after this one.

internal/leadership/elector_test.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,37 @@ func testElector[TElectorBundle any](
260260
require.False(t, notification.IsLeader)
261261
})
262262

263+
t.Run("GainedLeadershipWaitsForLeadershipStatePublication", func(t *testing.T) {
264+
t.Parallel()
265+
266+
elector, _ := setup(t, nil)
267+
268+
sub := elector.Listen()
269+
t.Cleanup(sub.Unlisten)
270+
271+
startElector(ctx, t, elector)
272+
273+
var gainedLeadershipTooEarly bool
274+
275+
select {
276+
case <-elector.testSignals.GainedLeadership.WaitC():
277+
gainedLeadershipTooEarly = true
278+
case <-time.After(250 * time.Millisecond):
279+
}
280+
281+
notification := riversharedtest.WaitOrTimeout(t, sub.ch)
282+
require.False(t, notification.IsLeader)
283+
284+
notification = riversharedtest.WaitOrTimeout(t, sub.ch)
285+
require.True(t, notification.IsLeader)
286+
287+
if gainedLeadershipTooEarly {
288+
require.FailNow(t, "expected gained leadership signal to wait for leadership state publication")
289+
}
290+
291+
elector.testSignals.GainedLeadership.WaitOrTimeout()
292+
})
293+
263294
t.Run("SustainsLeadership", func(t *testing.T) {
264295
t.Parallel()
265296

@@ -472,11 +503,10 @@ func testElector[TElectorBundle any](
472503
elector.testSignals.GainedLeadership.WaitOrTimeout()
473504

474505
// Send a resign request immediately after gaining leadership.
475-
// GainedLeadership is signaled _before_ keepLeadershipLoop is
476-
// entered, so the resign request arrives before the loop's
477-
// select. This only works if requestResignChan is buffered;
478-
// with an unbuffered channel the send would be dropped by the
479-
// default case since nobody is receiving yet.
506+
// This only works if the elector has already published its
507+
// leader state before signaling GainedLeadership, and if
508+
// requestResignChan is buffered while keepLeadershipLoop is
509+
// still entering its select.
480510
payload, err := json.Marshal(DBNotification{
481511
Action: DBNotificationKindRequestResign,
482512
})

0 commit comments

Comments
 (0)