Skip to content

Commit cacee1e

Browse files
committed
fix: make snapshot dispatch limit configurable via COMPUTE_SNAPSHOT_DISPATCH_LIMIT
1 parent 05a6721 commit cacee1e

4 files changed

Lines changed: 11 additions & 3 deletions

File tree

apps/supervisor/src/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ const Env = z
8686
COMPUTE_TRACE_SPANS_ENABLED: BoolEnv.default(true),
8787
COMPUTE_TRACE_OTLP_ENDPOINT: z.string().url().optional(), // Override for span export (derived from TRIGGER_API_URL if unset)
8888
COMPUTE_SNAPSHOT_DELAY_MS: z.coerce.number().int().min(0).max(60_000).default(5_000),
89+
COMPUTE_SNAPSHOT_DISPATCH_LIMIT: z.coerce.number().int().min(1).max(100).default(10),
8990

9091
// Kubernetes settings
9192
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ class ManagedSupervisor {
110110
snapshots: {
111111
enabled: env.COMPUTE_SNAPSHOTS_ENABLED,
112112
delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS,
113+
dispatchLimit: env.COMPUTE_SNAPSHOT_DISPATCH_LIMIT,
113114
callbackUrl,
114115
},
115116
tracing: this.tracing,

apps/supervisor/src/workloadManager/compute.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type ComputeWorkloadManagerOptions = WorkloadManagerOptions & {
2020
snapshots: {
2121
enabled: boolean;
2222
delayMs: number;
23+
dispatchLimit: number;
2324
callbackUrl: string;
2425
};
2526
tracing?: OtlpTraceService;
@@ -56,6 +57,10 @@ export class ComputeWorkloadManager implements WorkloadManager {
5657
return this.opts.snapshots.delayMs;
5758
}
5859

60+
get snapshotDispatchLimit(): number {
61+
return this.opts.snapshots.dispatchLimit;
62+
}
63+
5964
get traceSpansEnabled(): boolean {
6065
return !!this.opts.tracing;
6166
}

apps/supervisor/src/workloadServer/index.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,9 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
120120
// hours later after a checkpoint/restore cycle. Using a capped map avoids unbounded
121121
// growth while keeping recent contexts available. Oldest entries are evicted first.
122122
private static readonly MAX_TRACE_CONTEXTS = 10_000;
123-
private static readonly SNAPSHOT_CONCURRENCY = 10;
124123
private readonly runTraceContexts = new Map<string, RunTraceContext>();
125124
private readonly snapshotDelayWheel?: TimerWheel<DelayedSnapshot>;
126-
private readonly snapshotLimit = pLimit(WorkloadServer.SNAPSHOT_CONCURRENCY);
125+
private readonly snapshotLimit?: ReturnType<typeof pLimit>;
127126

128127
constructor(opts: WorkloadServerOptions) {
129128
super();
@@ -137,10 +136,12 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
137136
this.tracing = opts.tracing;
138137

139138
if (this.computeManager?.snapshotsEnabled) {
139+
const snapshotLimit = pLimit(this.computeManager.snapshotDispatchLimit);
140+
this.snapshotLimit = snapshotLimit;
140141
this.snapshotDelayWheel = new TimerWheel<DelayedSnapshot>({
141142
delayMs: this.computeManager.snapshotDelayMs,
142143
onExpire: (item) => {
143-
this.snapshotLimit(() => this.dispatchComputeSnapshot(item.data)).catch((error) => {
144+
snapshotLimit(() => this.dispatchComputeSnapshot(item.data)).catch((error) => {
144145
this.logger.error("Compute snapshot dispatch failed", {
145146
runId: item.data.runFriendlyId,
146147
runnerId: item.data.runnerId,

0 commit comments

Comments
 (0)