Skip to content

Commit e6b35e6

Browse files
committed
isolate poll-only elector fixtures
The poll-only leadership suite was creating parallel subtests from fixtures that were supposed to be independent, but they still reused the same shared test schema. That meant separate bundles could contend on the same singleton `river_leader` row, turning one case's election work into another case's timeout. When that happened, subtests like `CompetingElectors`, `LosesLeadership`, and `RequestResignStress` failed with election errors such as `timeout: context deadline exceeded`, followed by cleanup errors like `failed to deallocate cached statement(s): conn closed`: https://github.com/riverqueue/river/actions/runs/24406465152 Add a regression that starts two independently created poll-only bundles side by side and verifies that each bundle sees its own elected leader. Then switch only the poll-only setup to build transactions with `TestTxPgxDriver` and `DisableSchemaSharing`. That keeps the shared transaction wrapper inside each subtest, so multiple electors in one case can still coordinate on a single transaction, while truly separate subtests stop contending on the same leader row.
1 parent 7354804 commit e6b35e6

1 file changed

Lines changed: 31 additions & 1 deletion

File tree

internal/leadership/elector_test.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ func TestElector_PollOnly(t *testing.T) {
6868
func(ctx context.Context, t *testing.T, stress bool) *electorBundle {
6969
t.Helper()
7070

71-
tx := riverdbtest.TestTxPgx(ctx, t)
71+
tx, _ := riverdbtest.TestTxPgxDriver(ctx, t, riverpgxv5.New(riversharedtest.DBPool(ctx, t)), &riverdbtest.TestTxOpts{
72+
DisableSchemaSharing: true,
73+
})
7274

7375
// We'll put multiple electors on one transaction. Make sure they can
7476
// live with each other in relative harmony.
@@ -365,6 +367,34 @@ func testElector[TElectorBundle any](
365367
require.ErrorIs(t, err, rivertype.ErrNotFound)
366368
})
367369

370+
t.Run("IndependentBundlesAreIsolated", func(t *testing.T) {
371+
t.Parallel()
372+
373+
elector1, bundle1 := setup(t, nil)
374+
elector1.config.ClientID = "elector1"
375+
376+
elector2, bundle2 := setup(t, nil)
377+
elector2.config.ClientID = "elector2"
378+
379+
startElector(ctx, t, elector1)
380+
startElector(ctx, t, elector2)
381+
382+
elector1.testSignals.GainedLeadership.WaitOrTimeout()
383+
elector2.testSignals.GainedLeadership.WaitOrTimeout()
384+
385+
leader1, err := bundle1.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{
386+
Schema: elector1.config.Schema,
387+
})
388+
require.NoError(t, err)
389+
require.Equal(t, elector1.config.ClientID, leader1.LeaderID)
390+
391+
leader2, err := bundle2.exec.LeaderGetElectedLeader(ctx, &riverdriver.LeaderGetElectedLeaderParams{
392+
Schema: elector2.config.Schema,
393+
})
394+
require.NoError(t, err)
395+
require.Equal(t, elector2.config.ClientID, leader2.LeaderID)
396+
})
397+
368398
t.Run("RetriesOncePerResignation", func(t *testing.T) {
369399
t.Parallel()
370400

0 commit comments

Comments
 (0)