Skip to content

Commit 20287ed

Browse files
authored
match job kind and queue name by substring, not just prefix (#1018)
* match job kind and queue name by substring, not just prefix If you have long segmented job kinds, it's frustrating to have to type out a long shared prefix to get to the unique part. * golangci-lint v2.4.0
1 parent 9c54d2b commit 20287ed

18 files changed

Lines changed: 137 additions & 95 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ jobs:
251251
name: lint
252252
runs-on: ubuntu-latest
253253
env:
254-
GOLANGCI_LINT_VERSION: v2.3.0
254+
GOLANGCI_LINT_VERSION: v2.4.0
255255
permissions:
256256
contents: read
257257
# allow read access to pull request. Use with `only-new-issues` option.

client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ func (c *Config) validate() error {
503503
kind := workerInfo.jobArgs.Kind()
504504
if !rivercommon.UserSpecifiedIDOrKindRE.MatchString(kind) {
505505
if c.SkipJobKindValidation {
506-
c.Logger.Warn("job kind should match regex; this will be an error in future versions", //nolint:noctx
506+
c.Logger.Warn("job kind should match regex; this will be an error in future versions",
507507
slog.String("kind", kind),
508508
slog.String("regex", rivercommon.UserSpecifiedIDOrKindRE.String()),
509509
)
@@ -829,7 +829,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
829829
client.services = append(client.services, client.notifier)
830830
}
831831
} else {
832-
config.Logger.Info("Driver does not support listener; entering poll only mode") //nolint:noctx
832+
config.Logger.Info("Driver does not support listener; entering poll only mode")
833833
}
834834

835835
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{

producer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,9 @@ func (p *producer) Start(ctx context.Context) error {
260260
}
261261

262262
func (p *producer) Stop() {
263-
p.Logger.Debug(p.Name+": Stopping", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load())) //nolint:noctx
263+
p.Logger.Debug(p.Name+": Stopping", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load()))
264264
p.BaseStartStop.Stop()
265-
p.Logger.Debug(p.Name+": Stop returned", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load())) //nolint:noctx
265+
p.Logger.Debug(p.Name+": Stop returned", slog.String("queue", p.config.Queue), slog.Int64("id", p.id.Load()))
266266
}
267267

268268
// Start starts the producer. It backgrounds a goroutine which is stopped when

riverdriver/river_driver_interface.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ type Executor interface {
215215
JobInsertFastManyNoReturning(ctx context.Context, params *JobInsertFastManyParams) (int, error)
216216
JobInsertFull(ctx context.Context, params *JobInsertFullParams) (*rivertype.JobRow, error)
217217
JobInsertFullMany(ctx context.Context, jobs *JobInsertFullManyParams) ([]*rivertype.JobRow, error)
218-
JobKindListByPrefix(ctx context.Context, params *JobKindListByPrefixParams) ([]string, error)
218+
JobKindList(ctx context.Context, params *JobKindListParams) ([]string, error)
219219
JobList(ctx context.Context, params *JobListParams) ([]*rivertype.JobRow, error)
220220
JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error)
221221
JobRetry(ctx context.Context, params *JobRetryParams) (*rivertype.JobRow, error)
@@ -261,7 +261,7 @@ type Executor interface {
261261
QueueDeleteExpired(ctx context.Context, params *QueueDeleteExpiredParams) ([]string, error)
262262
QueueGet(ctx context.Context, params *QueueGetParams) (*rivertype.Queue, error)
263263
QueueList(ctx context.Context, params *QueueListParams) ([]*rivertype.Queue, error)
264-
QueueNameListByPrefix(ctx context.Context, params *QueueNameListByPrefixParams) ([]string, error)
264+
QueueNameList(ctx context.Context, params *QueueNameListParams) ([]string, error)
265265
QueuePause(ctx context.Context, params *QueuePauseParams) error
266266
QueueResume(ctx context.Context, params *QueueResumeParams) error
267267
QueueUpdate(ctx context.Context, params *QueueUpdateParams) (*rivertype.Queue, error)
@@ -481,11 +481,11 @@ type JobInsertFullManyParams struct {
481481
Schema string
482482
}
483483

484-
type JobKindListByPrefixParams struct {
484+
type JobKindListParams struct {
485485
After string
486486
Exclude []string
487+
Match string
487488
Max int
488-
Prefix string
489489
Schema string
490490
}
491491

@@ -782,11 +782,11 @@ type QueueListParams struct {
782782
Schema string
783783
}
784784

785-
type QueueNameListByPrefixParams struct {
785+
type QueueNameListParams struct {
786786
After string
787787
Exclude []string
788+
Match string
788789
Max int
789-
Prefix string
790790
Schema string
791791
}
792792

riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdatabasesql/internal/dbsqlc/river_queue.sql.go

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

riverdriver/riverdatabasesql/river_database_sql_driver.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -541,12 +541,12 @@ func (e *Executor) JobInsertFullMany(ctx context.Context, params *riverdriver.Jo
541541
return sliceutil.MapError(items, jobRowFromInternal)
542542
}
543543

544-
func (e *Executor) JobKindListByPrefix(ctx context.Context, params *riverdriver.JobKindListByPrefixParams) ([]string, error) {
545-
kinds, err := dbsqlc.New().JobKindListByPrefix(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobKindListByPrefixParams{
544+
func (e *Executor) JobKindList(ctx context.Context, params *riverdriver.JobKindListParams) ([]string, error) {
545+
kinds, err := dbsqlc.New().JobKindList(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.JobKindListParams{
546546
After: params.After,
547547
Exclude: params.Exclude,
548+
Match: params.Match,
548549
Max: int32(params.Max), //nolint:gosec
549-
Prefix: params.Prefix,
550550
})
551551
if err != nil {
552552
return nil, interpretError(err)
@@ -883,12 +883,12 @@ func (e *Executor) QueueList(ctx context.Context, params *riverdriver.QueueListP
883883
return sliceutil.Map(queues, queueFromInternal), nil
884884
}
885885

886-
func (e *Executor) QueueNameListByPrefix(ctx context.Context, params *riverdriver.QueueNameListByPrefixParams) ([]string, error) {
887-
queueNames, err := dbsqlc.New().QueueNameListByPrefix(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueNameListByPrefixParams{
886+
func (e *Executor) QueueNameList(ctx context.Context, params *riverdriver.QueueNameListParams) ([]string, error) {
887+
queueNames, err := dbsqlc.New().QueueNameList(schemaTemplateParam(ctx, params.Schema), e.dbtx, &dbsqlc.QueueNameListParams{
888888
After: params.After,
889889
Exclude: params.Exclude,
890+
Match: params.Match,
890891
Max: int32(min(params.Max, math.MaxInt32)), //nolint:gosec
891-
Prefix: params.Prefix,
892892
})
893893
if err != nil {
894894
return nil, interpretError(err)

riverdriver/riverdrivertest/riverdrivertest.go

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2356,10 +2356,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
23562356
assertJobEqualsInput(t, results[1], jobParams2)
23572357
})
23582358

2359-
t.Run("JobKindListByPrefix", func(t *testing.T) { //nolint:dupl
2359+
t.Run("JobKindList", func(t *testing.T) {
23602360
t.Parallel()
23612361

2362-
t.Run("ListsJobKindsInOrderWithMaxLimit", func(t *testing.T) {
2362+
t.Run("ListsJobKindsInOrderWithMaxLimit", func(t *testing.T) { //nolint:dupl
23632363
t.Parallel()
23642364

23652365
exec, _ := setup(ctx, t)
@@ -2369,11 +2369,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
23692369
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("job_bbb")})
23702370
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("different_prefix_job")})
23712371

2372-
jobKinds, err := exec.JobKindListByPrefix(ctx, &riverdriver.JobKindListByPrefixParams{
2372+
jobKinds, err := exec.JobKindList(ctx, &riverdriver.JobKindListParams{
23732373
After: "job2",
23742374
Exclude: nil,
2375+
Match: "job",
23752376
Max: 2,
2376-
Prefix: "job",
23772377
Schema: "",
23782378
})
23792379
require.NoError(t, err)
@@ -2389,16 +2389,37 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
23892389
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("job_aaa")})
23902390
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("job_bbb")})
23912391

2392-
jobKinds, err := exec.JobKindListByPrefix(ctx, &riverdriver.JobKindListByPrefixParams{
2392+
jobKinds, err := exec.JobKindList(ctx, &riverdriver.JobKindListParams{
23932393
After: "job2",
23942394
Exclude: []string{job2.Kind},
23952395
Max: 2,
2396-
Prefix: "job",
2396+
Match: "job",
23972397
Schema: "",
23982398
})
23992399
require.NoError(t, err)
24002400
require.Equal(t, []string{job3.Kind, job1.Kind}, jobKinds)
24012401
})
2402+
2403+
t.Run("ListsJobKindsWithSubstringMatch", func(t *testing.T) { //nolint:dupl
2404+
t.Parallel()
2405+
2406+
exec, _ := setup(ctx, t)
2407+
2408+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("mid_job_kind")})
2409+
job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("prefix_job")})
2410+
job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("suffix_job")})
2411+
_ = testfactory.Job(ctx, t, exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("nojobhere")})
2412+
2413+
jobKinds, err := exec.JobKindList(ctx, &riverdriver.JobKindListParams{
2414+
After: "",
2415+
Exclude: nil,
2416+
Match: "fix",
2417+
Max: 3,
2418+
Schema: "",
2419+
})
2420+
require.NoError(t, err)
2421+
require.Equal(t, []string{job2.Kind, job3.Kind}, jobKinds)
2422+
})
24022423
})
24032424

24042425
t.Run("JobList", func(t *testing.T) {
@@ -4155,10 +4176,10 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
41554176
requireQueuesEqual(t, queue3, queues[2])
41564177
})
41574178

4158-
t.Run("QueueNameListByPrefix", func(t *testing.T) { //nolint:dupl
4179+
t.Run("QueueNameList", func(t *testing.T) {
41594180
t.Parallel()
41604181

4161-
t.Run("ListsQueuesInOrderWithMaxLimit", func(t *testing.T) {
4182+
t.Run("ListsQueuesInOrderWithMaxLimit", func(t *testing.T) { //nolint:dupl
41624183
t.Parallel()
41634184

41644185
exec, _ := setup(ctx, t)
@@ -4168,11 +4189,11 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
41684189
queue3 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("queue_bbb")})
41694190
_ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("different_prefix_queue")})
41704191

4171-
queueNames, err := exec.QueueNameListByPrefix(ctx, &riverdriver.QueueNameListByPrefixParams{
4192+
queueNames, err := exec.QueueNameList(ctx, &riverdriver.QueueNameListParams{
41724193
After: "queue2",
41734194
Exclude: nil,
4195+
Match: "queue",
41744196
Max: 2,
4175-
Prefix: "queue",
41764197
Schema: "",
41774198
})
41784199
require.NoError(t, err)
@@ -4188,16 +4209,36 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
41884209
queue2 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("queue_aaa")})
41894210
queue3 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("queue_bbb")})
41904211

4191-
queueNames, err := exec.QueueNameListByPrefix(ctx, &riverdriver.QueueNameListByPrefixParams{
4212+
queueNames, err := exec.QueueNameList(ctx, &riverdriver.QueueNameListParams{
41924213
After: "queue2",
41934214
Exclude: []string{queue2.Name},
4215+
Match: "queue",
41944216
Max: 2,
4195-
Prefix: "queue",
41964217
Schema: "",
41974218
})
41984219
require.NoError(t, err)
41994220
require.Equal(t, []string{queue3.Name, queue1.Name}, queueNames)
42004221
})
4222+
4223+
t.Run("ListsQueuesWithSubstringMatch", func(t *testing.T) {
4224+
t.Parallel()
4225+
4226+
exec, _ := setup(ctx, t)
4227+
4228+
queue1 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("prefix_queue")})
4229+
_ = testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("another_queue")})
4230+
queue3 := testfactory.Queue(ctx, t, exec, &testfactory.QueueOpts{Name: ptrutil.Ptr("suffix_queue")})
4231+
4232+
queueNames, err := exec.QueueNameList(ctx, &riverdriver.QueueNameListParams{
4233+
After: "",
4234+
Exclude: nil,
4235+
Match: "fix",
4236+
Max: 3,
4237+
Schema: "",
4238+
})
4239+
require.NoError(t, err)
4240+
require.Equal(t, []string{queue1.Name, queue3.Name}, queueNames)
4241+
})
42014242
})
42024243

42034244
t.Run("QueuePause", func(t *testing.T) {

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -458,10 +458,10 @@ SELECT
458458
FROM raw_job_data
459459
RETURNING *;
460460

461-
-- name: JobKindListByPrefix :many
461+
-- name: JobKindList :many
462462
SELECT DISTINCT ON (kind) kind
463463
FROM /* TEMPLATE: schema */river_job
464-
WHERE (@prefix = '' OR kind ILIKE @prefix || '%')
464+
WHERE (@match = '' OR kind ILIKE '%' || @match || '%')
465465
AND (@after = '' OR kind > @after)
466466
AND (@exclude::text[] IS NULL OR kind != ALL(@exclude))
467467
ORDER BY kind ASC

riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)