Skip to content

Commit abd5143

Browse files
committed
add queue-state count regression benchmark
This adds a benchmark on top of #1203 to make the queue-state count query regression easy to reproduce and discuss. The query in that branch now groups by `(queue, state)` while filtering only on `queue`, which no longer lines up with the existing `(state, queue, priority, scheduled_at, id)` index on `river_job`. The new benchmark seeds a migrated `river_job` table and compares the current `JobCountByQueueAndState` implementation against the legacy query shape for a few queue-list sizes. That gives us a durable way to show the planner behavior and quantify the difference before deciding whether to reshape the SQL or add another index.
1 parent 3d3c21a commit abd5143

File tree

1 file changed

+180
-0
lines changed

1 file changed

+180
-0
lines changed
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
package riverpgxv5_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/jackc/pgx/v5/pgxpool"
9+
"github.com/stretchr/testify/require"
10+
11+
"github.com/riverqueue/river/riverdbtest"
12+
"github.com/riverqueue/river/riverdriver"
13+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
14+
"github.com/riverqueue/river/rivershared/riversharedtest"
15+
)
16+
17+
func BenchmarkJobCountByQueueAndState(b *testing.B) {
18+
ctx := context.Background()
19+
20+
dbPool := riversharedtest.DBPool(ctx, b)
21+
driver := riverpgxv5.New(dbPool)
22+
schema := riverdbtest.TestSchema(ctx, b, driver, nil)
23+
24+
seedQueueStateCountBenchmarkData(ctx, b, dbPool, schema)
25+
26+
queueNamesTwo := []string{"queue_001", "queue_002"}
27+
queueNamesTen := []string{
28+
"queue_001", "queue_002", "queue_003", "queue_004", "queue_005",
29+
"queue_006", "queue_007", "queue_008", "queue_009", "queue_010",
30+
}
31+
32+
for _, benchmarkCase := range []struct {
33+
name string
34+
queueNames []string
35+
}{
36+
{name: "TwoQueues", queueNames: queueNamesTwo},
37+
{name: "TenQueues", queueNames: queueNamesTen},
38+
} {
39+
b.Run("Current/"+benchmarkCase.name, func(b *testing.B) {
40+
b.ReportAllocs()
41+
42+
params := &riverdriver.JobCountByQueueAndStateParams{
43+
QueueNames: benchmarkCase.queueNames,
44+
Schema: schema,
45+
}
46+
47+
b.ResetTimer()
48+
for range b.N {
49+
results, err := driver.GetExecutor().JobCountByQueueAndState(ctx, params)
50+
require.NoError(b, err)
51+
require.NotEmpty(b, results)
52+
}
53+
})
54+
55+
b.Run("Legacy/"+benchmarkCase.name, func(b *testing.B) {
56+
b.ReportAllocs()
57+
58+
query := legacyJobCountByQueueAndStateQuery(schema)
59+
60+
b.ResetTimer()
61+
for range b.N {
62+
rows, err := dbPool.Query(ctx, query, benchmarkCase.queueNames)
63+
require.NoError(b, err)
64+
65+
var numRows int
66+
for rows.Next() {
67+
var (
68+
countAvailable int64
69+
countRunning int64
70+
queue string
71+
)
72+
73+
require.NoError(b, rows.Scan(&queue, &countAvailable, &countRunning))
74+
numRows++
75+
}
76+
77+
rows.Close()
78+
require.NoError(b, rows.Err())
79+
require.Equal(b, len(benchmarkCase.queueNames), numRows)
80+
}
81+
})
82+
}
83+
}
84+
85+
func legacyJobCountByQueueAndStateQuery(schema string) string {
86+
return fmt.Sprintf(`
87+
WITH all_queues AS (
88+
SELECT DISTINCT unnest($1::text[])::text AS queue
89+
),
90+
91+
running_job_counts AS (
92+
SELECT
93+
queue,
94+
COUNT(*) AS count
95+
FROM %s.river_job
96+
WHERE queue = ANY($1::text[])
97+
AND state = 'running'
98+
GROUP BY queue
99+
),
100+
101+
available_job_counts AS (
102+
SELECT
103+
queue,
104+
COUNT(*) AS count
105+
FROM %s.river_job
106+
WHERE queue = ANY($1::text[])
107+
AND state = 'available'
108+
GROUP BY queue
109+
)
110+
111+
SELECT
112+
all_queues.queue,
113+
COALESCE(available_job_counts.count, 0) AS count_available,
114+
COALESCE(running_job_counts.count, 0) AS count_running
115+
FROM
116+
all_queues
117+
LEFT JOIN
118+
running_job_counts ON all_queues.queue = running_job_counts.queue
119+
LEFT JOIN
120+
available_job_counts ON all_queues.queue = available_job_counts.queue
121+
ORDER BY all_queues.queue ASC
122+
`, schema, schema)
123+
}
124+
125+
func seedQueueStateCountBenchmarkData(ctx context.Context, b *testing.B, dbPool *pgxpool.Pool, schema string) {
126+
b.Helper()
127+
128+
query := fmt.Sprintf(`
129+
WITH generated_jobs AS (
130+
SELECT
131+
CASE gs %% 8
132+
WHEN 0 THEN 'running'
133+
WHEN 1 THEN 'available'
134+
WHEN 2 THEN 'completed'
135+
WHEN 3 THEN 'cancelled'
136+
WHEN 4 THEN 'discarded'
137+
WHEN 5 THEN 'retryable'
138+
WHEN 6 THEN 'scheduled'
139+
ELSE 'pending'
140+
END AS state,
141+
now() - ((gs %% 100000)::text || ' seconds')::interval AS scheduled_at,
142+
'queue_' || lpad(((gs %% 100) + 1)::text, 3, '0') AS queue
143+
FROM generate_series(1, 200000) AS gs
144+
)
145+
INSERT INTO %s.river_job (
146+
args,
147+
finalized_at,
148+
kind,
149+
max_attempts,
150+
metadata,
151+
queue,
152+
scheduled_at,
153+
state
154+
)
155+
SELECT
156+
'{}'::jsonb,
157+
CASE
158+
WHEN state IN ('cancelled', 'completed', 'discarded') THEN scheduled_at + interval '1 second'
159+
ELSE NULL
160+
END AS finalized_at,
161+
'benchmark',
162+
25,
163+
'{}'::jsonb,
164+
queue,
165+
scheduled_at,
166+
state::%s.river_job_state
167+
FROM generated_jobs;
168+
169+
ANALYZE %s.river_job;
170+
`, schema, schema, schema)
171+
172+
_, err := dbPool.Exec(ctx, query)
173+
require.NoError(b, err)
174+
175+
row := dbPool.QueryRow(ctx, "SELECT count(*) FROM "+schema+".river_job")
176+
177+
var numRows int
178+
require.NoError(b, row.Scan(&numRows))
179+
require.Equal(b, 200000, numRows)
180+
}

0 commit comments

Comments
 (0)