Skip to content

Commit 9bb0e45

Browse files
committed
Reindexer: Mock reindex function to reduce parallel contention
This one's aimed at fixing an intermittently failing test case: https://github.com/riverqueue/river/actions/runs/24322049420/job/71009998068?pr=1208 --- FAIL: TestReindexer (0.00s) --- FAIL: TestReindexer/ReindexesConfiguredIndexes (10.07s) reindexer_test.go:219: Reusing idle postgres schema "maintenance_2026_04_13t01_54_14_schema_04" [user facing: "maintenance_2026_04_13t01_54_14_schema_04"] after cleaning in 26.436456ms [4 generated] [7 reused] test_signal.go:95: timed out waiting on test signal after 10s logger.go:256: time=2026-04-13T01:54:27.581Z level=INFO msg="maintenance.Reindexer: Signaled to stop during index build; attempting to clean up concurrent artifacts" riverdbtest.go:293: Checked in postgres schema "maintenance_2026_04_13t01_54_14_schema_04"; 1 idle schema(s) [4 generated] [10 reused] --- FAIL: TestReindexer/ReindexesMinimalSubsetofIndexes (10.14s) reindexer_test.go:183: Reusing idle postgres schema "maintenance_2026_04_13t01_54_14_schema_01" [user facing: "maintenance_2026_04_13t01_54_14_schema_01"] after cleaning in 28.042877ms [4 generated] [10 reused] test_signal.go:95: timed out waiting on test signal after 10s reindexer_test.go:211: Error Trace: /home/runner/work/river/river/internal/maintenance/reindexer_test.go:211 Error: Should be false Test: TestReindexer/ReindexesMinimalSubsetofIndexes logger.go:256: time=2026-04-13T01:54:28.444Z level=INFO msg="maintenance.Reindexer: Signaled to stop during index build; attempting to clean up concurrent artifacts" riverdbtest.go:293: Checked in postgres schema "maintenance_2026_04_13t01_54_14_schema_01"; 1 idle schema(s) [5 generated] [24 reused] FAIL FAIL github.com/riverqueue/river/internal/maintenance 18.764s I'm diagnosing with Claude's help here, but what appears to be happening is that although a reindex operation in Postgres is often fast, it is still a heavy operation, and can slow down even further when there's a lot of concurrent activity hammering a database. Many reindexer test cases run in parallel, and it appears that was happening here is that we got a reindex that exceeded our maximum timeout of 10x. We have some evidence this during the test run from the runtime of 10.07s and the line: Signaled to stop during index build; attempting to clean up concurrent artifacts Here, I'm putting forward a solution proposed by Claude, which is to mock out the reindex operation, especially where we have a number of reindexes going in parallel. The tests `ReindexOneSuccess` and `ReindexSkippedWithReindexArtifact` still put load on the real reindex operation, so we're not going full mock here, and should see our intermittency considerably reduced while still being confident that everything still works.
1 parent 9eaea33 commit 9bb0e45

1 file changed

Lines changed: 33 additions & 14 deletions

File tree

internal/maintenance/reindexer_test.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78
"sync/atomic"
89
"testing"
910
"time"
@@ -24,13 +25,15 @@ type reindexerExecutorMock struct {
2425
indexesExistCalls atomic.Int32
2526
indexesExistFunc func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error)
2627
indexesExistSignal chan struct{}
28+
indexReindexFunc func(ctx context.Context, params *riverdriver.IndexReindexParams) error
2729
}
2830

2931
func newReindexerExecutorMock(exec riverdriver.Executor) *reindexerExecutorMock {
3032
return &reindexerExecutorMock{
3133
Executor: exec,
3234
indexesExistFunc: exec.IndexesExist,
3335
indexesExistSignal: make(chan struct{}, 10),
36+
indexReindexFunc: exec.IndexReindex,
3437
}
3538
}
3639

@@ -45,6 +48,10 @@ func (m *reindexerExecutorMock) IndexesExist(ctx context.Context, params *riverd
4548
return m.indexesExistFunc(ctx, params)
4649
}
4750

51+
func (m *reindexerExecutorMock) IndexReindex(ctx context.Context, params *riverdriver.IndexReindexParams) error {
52+
return m.indexReindexFunc(ctx, params)
53+
}
54+
4855
func TestReindexer(t *testing.T) {
4956
t.Parallel()
5057

@@ -182,6 +189,20 @@ func TestReindexer(t *testing.T) {
182189

183190
svc, bundle := setup(t)
184191

192+
// Mock IndexReindex so the test doesn't depend on the speed of real
193+
// REINDEX CONCURRENTLY operations on the shared CI database. Track
194+
// which indexes got reindexed so we can verify the expected set.
195+
mockExec := newReindexerExecutorMock(bundle.exec)
196+
var reindexedNames []string
197+
var reindexedMu sync.Mutex
198+
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
199+
reindexedMu.Lock()
200+
defer reindexedMu.Unlock()
201+
reindexedNames = append(reindexedNames, params.Index)
202+
return nil
203+
}
204+
svc.exec = mockExec
205+
185206
svc.Config.IndexNames = []string{
186207
"river_job_kind",
187208
"river_job_prioritized_fetching_index",
@@ -198,25 +219,23 @@ func TestReindexer(t *testing.T) {
198219
case <-time.After(100 * time.Millisecond):
199220
}
200221

201-
// Make sure that no `CONCURRENTLY` artifacts exist after reindexing is
202-
// supposed to be done. Postgres creates a new index suffixed with
203-
// `_ccnew` before swapping it in as the new index. The existing index
204-
// is renamed `_ccold` before being dropped concurrently.
205-
//
206-
// https://www.postgresql.org/docs/current/sql-reindex.html#SQL-REINDEX-CONCURRENTLY
207-
for _, indexName := range svc.Config.IndexNames {
208-
for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} {
209-
indexExists, err := bundle.exec.IndexExists(ctx, &riverdriver.IndexExistsParams{Index: reindexArtifactName, Schema: bundle.schema})
210-
require.NoError(t, err)
211-
require.False(t, indexExists)
212-
}
213-
}
222+
reindexedMu.Lock()
223+
require.ElementsMatch(t, svc.Config.IndexNames, reindexedNames)
224+
reindexedMu.Unlock()
214225
})
215226

216227
t.Run("ReindexesConfiguredIndexes", func(t *testing.T) {
217228
t.Parallel()
218229

219-
svc, _ := setup(t)
230+
svc, bundle := setup(t)
231+
232+
// Mock IndexReindex so the test doesn't depend on the speed of real
233+
// REINDEX CONCURRENTLY operations on the shared CI database.
234+
mockExec := newReindexerExecutorMock(bundle.exec)
235+
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
236+
return nil
237+
}
238+
svc.exec = mockExec
220239

221240
svc.Config.ScheduleFunc = runImmediatelyThenOnceAnHour()
222241

0 commit comments

Comments
 (0)