Skip to content

Commit 74790ee

Browse files
committed
Reindexer: Mock reindex function to reduce parallel contention
This one's aimed at fixing an intermittently failing test case: https://github.com/riverqueue/river/actions/runs/24322049420/job/71009998068?pr=1208 --- FAIL: TestReindexer (0.00s) --- FAIL: TestReindexer/ReindexesConfiguredIndexes (10.07s) reindexer_test.go:219: Reusing idle postgres schema "maintenance_2026_04_13t01_54_14_schema_04" [user facing: "maintenance_2026_04_13t01_54_14_schema_04"] after cleaning in 26.436456ms [4 generated] [7 reused] test_signal.go:95: timed out waiting on test signal after 10s logger.go:256: time=2026-04-13T01:54:27.581Z level=INFO msg="maintenance.Reindexer: Signaled to stop during index build; attempting to clean up concurrent artifacts" riverdbtest.go:293: Checked in postgres schema "maintenance_2026_04_13t01_54_14_schema_04"; 1 idle schema(s) [4 generated] [10 reused] --- FAIL: TestReindexer/ReindexesMinimalSubsetofIndexes (10.14s) reindexer_test.go:183: Reusing idle postgres schema "maintenance_2026_04_13t01_54_14_schema_01" [user facing: "maintenance_2026_04_13t01_54_14_schema_01"] after cleaning in 28.042877ms [4 generated] [10 reused] test_signal.go:95: timed out waiting on test signal after 10s reindexer_test.go:211: Error Trace: /home/runner/work/river/river/internal/maintenance/reindexer_test.go:211 Error: Should be false Test: TestReindexer/ReindexesMinimalSubsetofIndexes logger.go:256: time=2026-04-13T01:54:28.444Z level=INFO msg="maintenance.Reindexer: Signaled to stop during index build; attempting to clean up concurrent artifacts" riverdbtest.go:293: Checked in postgres schema "maintenance_2026_04_13t01_54_14_schema_01"; 1 idle schema(s) [5 generated] [24 reused] FAIL FAIL github.com/riverqueue/river/internal/maintenance 18.764s I'm diagnosing with Claude's help here, but what appears to be happening is that although a reindex operation in Postgres is often fast, it is still a heavy operation, and can slow down even further when there's a lot of concurrent activity hammering a database. Many reindexer test cases run in parallel, and it appears that was happening here is that we got a reindex that exceeded our maximum timeout of 10x. We have some evidence this during the test run from the runtime of 10.07s and the line: Signaled to stop during index build; attempting to clean up concurrent artifacts Here, I'm putting forward a solution proposed by Claude, which is to mock out the reindex operation, especially where we have a number of reindexes going in parallel. The tests `ReindexOneSuccess` and `ReindexSkippedWithReindexArtifact` still put load on the real reindex operation, so we're not going full mock here, and should see our intermittency considerably reduced while still being confident that everything still works.
1 parent 9eaea33 commit 74790ee

1 file changed

Lines changed: 34 additions & 14 deletions

File tree

internal/maintenance/reindexer_test.go

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78
"sync/atomic"
89
"testing"
910
"time"
@@ -24,13 +25,16 @@ type reindexerExecutorMock struct {
2425
indexesExistCalls atomic.Int32
2526
indexesExistFunc func(ctx context.Context, params *riverdriver.IndexesExistParams) (map[string]bool, error)
2627
indexesExistSignal chan struct{}
28+
29+
indexReindexFunc func(ctx context.Context, params *riverdriver.IndexReindexParams) error
2730
}
2831

2932
func newReindexerExecutorMock(exec riverdriver.Executor) *reindexerExecutorMock {
3033
return &reindexerExecutorMock{
3134
Executor: exec,
3235
indexesExistFunc: exec.IndexesExist,
3336
indexesExistSignal: make(chan struct{}, 10),
37+
indexReindexFunc: exec.IndexReindex,
3438
}
3539
}
3640

@@ -45,6 +49,10 @@ func (m *reindexerExecutorMock) IndexesExist(ctx context.Context, params *riverd
4549
return m.indexesExistFunc(ctx, params)
4650
}
4751

52+
func (m *reindexerExecutorMock) IndexReindex(ctx context.Context, params *riverdriver.IndexReindexParams) error {
53+
return m.indexReindexFunc(ctx, params)
54+
}
55+
4856
func TestReindexer(t *testing.T) {
4957
t.Parallel()
5058

@@ -182,6 +190,20 @@ func TestReindexer(t *testing.T) {
182190

183191
svc, bundle := setup(t)
184192

193+
// Mock IndexReindex so the test doesn't depend on the speed of real
194+
// REINDEX CONCURRENTLY operations on the shared CI database. Track
195+
// which indexes got reindexed so we can verify the expected set.
196+
mockExec := newReindexerExecutorMock(bundle.exec)
197+
var reindexedNames []string
198+
var reindexedMu sync.Mutex
199+
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
200+
reindexedMu.Lock()
201+
defer reindexedMu.Unlock()
202+
reindexedNames = append(reindexedNames, params.Index)
203+
return nil
204+
}
205+
svc.exec = mockExec
206+
185207
svc.Config.IndexNames = []string{
186208
"river_job_kind",
187209
"river_job_prioritized_fetching_index",
@@ -198,25 +220,23 @@ func TestReindexer(t *testing.T) {
198220
case <-time.After(100 * time.Millisecond):
199221
}
200222

201-
// Make sure that no `CONCURRENTLY` artifacts exist after reindexing is
202-
// supposed to be done. Postgres creates a new index suffixed with
203-
// `_ccnew` before swapping it in as the new index. The existing index
204-
// is renamed `_ccold` before being dropped concurrently.
205-
//
206-
// https://www.postgresql.org/docs/current/sql-reindex.html#SQL-REINDEX-CONCURRENTLY
207-
for _, indexName := range svc.Config.IndexNames {
208-
for _, reindexArtifactName := range []string{indexName + "_ccnew", indexName + "_ccold"} {
209-
indexExists, err := bundle.exec.IndexExists(ctx, &riverdriver.IndexExistsParams{Index: reindexArtifactName, Schema: bundle.schema})
210-
require.NoError(t, err)
211-
require.False(t, indexExists)
212-
}
213-
}
223+
reindexedMu.Lock()
224+
require.ElementsMatch(t, svc.Config.IndexNames, reindexedNames)
225+
reindexedMu.Unlock()
214226
})
215227

216228
t.Run("ReindexesConfiguredIndexes", func(t *testing.T) {
217229
t.Parallel()
218230

219-
svc, _ := setup(t)
231+
svc, bundle := setup(t)
232+
233+
// Mock IndexReindex so the test doesn't depend on the speed of real
234+
// REINDEX CONCURRENTLY operations on the shared CI database.
235+
mockExec := newReindexerExecutorMock(bundle.exec)
236+
mockExec.indexReindexFunc = func(ctx context.Context, params *riverdriver.IndexReindexParams) error {
237+
return nil
238+
}
239+
svc.exec = mockExec
220240

221241
svc.Config.ScheduleFunc = runImmediatelyThenOnceAnHour()
222242

0 commit comments

Comments
 (0)