Skip to content

Commit 680f156

Browse files
committed
refactor: extract ComputeSnapshotService from workload server, fix zod version in compute package
1 parent cacee1e commit 680f156

6 files changed

Lines changed: 290 additions & 229 deletions

File tree

Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
import pLimit from "p-limit";
2+
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
3+
import { parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
4+
import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers";
5+
import {
6+
SnapshotCallbackPayloadSchema,
7+
type SnapshotCallbackPayload,
8+
} from "@internal/compute";
9+
import type { ComputeWorkloadManager } from "../workloadManager/compute.js";
10+
import { TimerWheel } from "./timerWheel.js";
11+
import type { OtlpTraceService } from "./otlpTraceService.js";
12+
13+
export { SnapshotCallbackPayloadSchema, type SnapshotCallbackPayload };
14+
15+
type DelayedSnapshot = {
16+
runnerId: string;
17+
runFriendlyId: string;
18+
snapshotFriendlyId: string;
19+
};
20+
21+
export type RunTraceContext = {
22+
traceparent: string;
23+
envId: string;
24+
orgId: string;
25+
projectId: string;
26+
};
27+
28+
export type ComputeSnapshotServiceOptions = {
29+
computeManager: ComputeWorkloadManager;
30+
workerClient: SupervisorHttpClient;
31+
tracing?: OtlpTraceService;
32+
};
33+
34+
export class ComputeSnapshotService {
35+
private readonly logger = new SimpleStructuredLogger("compute-snapshot-service");
36+
37+
private static readonly MAX_TRACE_CONTEXTS = 10_000;
38+
private readonly runTraceContexts = new Map<string, RunTraceContext>();
39+
private readonly timerWheel: TimerWheel<DelayedSnapshot>;
40+
private readonly dispatchLimit: ReturnType<typeof pLimit>;
41+
42+
private readonly computeManager: ComputeWorkloadManager;
43+
private readonly workerClient: SupervisorHttpClient;
44+
private readonly tracing?: OtlpTraceService;
45+
46+
constructor(opts: ComputeSnapshotServiceOptions) {
47+
this.computeManager = opts.computeManager;
48+
this.workerClient = opts.workerClient;
49+
this.tracing = opts.tracing;
50+
51+
this.dispatchLimit = pLimit(this.computeManager.snapshotDispatchLimit);
52+
this.timerWheel = new TimerWheel<DelayedSnapshot>({
53+
delayMs: this.computeManager.snapshotDelayMs,
54+
onExpire: (item) => {
55+
this.dispatchLimit(() => this.dispatch(item.data)).catch((error) => {
56+
this.logger.error("Snapshot dispatch failed", {
57+
runId: item.data.runFriendlyId,
58+
runnerId: item.data.runnerId,
59+
error,
60+
});
61+
});
62+
},
63+
});
64+
this.timerWheel.start();
65+
}
66+
67+
/** Schedule a delayed snapshot for a run. Replaces any pending snapshot for the same run. */
68+
schedule(runFriendlyId: string, data: DelayedSnapshot) {
69+
this.timerWheel.submit(runFriendlyId, data);
70+
this.logger.debug("Snapshot scheduled", {
71+
runFriendlyId,
72+
snapshotFriendlyId: data.snapshotFriendlyId,
73+
delayMs: this.computeManager.snapshotDelayMs,
74+
});
75+
}
76+
77+
/** Cancel a pending delayed snapshot. Returns true if one was cancelled. */
78+
cancel(runFriendlyId: string): boolean {
79+
const cancelled = this.timerWheel.cancel(runFriendlyId);
80+
if (cancelled) {
81+
this.logger.debug("Snapshot cancelled", { runFriendlyId });
82+
}
83+
return cancelled;
84+
}
85+
86+
/** Handle the callback from the gateway after a snapshot completes or fails. */
87+
async handleCallback(body: SnapshotCallbackPayload) {
88+
this.logger.debug("Snapshot callback", {
89+
snapshotId: body.snapshot_id,
90+
instanceId: body.instance_id,
91+
status: body.status,
92+
error: body.error,
93+
metadata: body.metadata,
94+
durationMs: body.duration_ms,
95+
});
96+
97+
const runId = body.metadata?.runId;
98+
const snapshotFriendlyId = body.metadata?.snapshotFriendlyId;
99+
100+
if (!runId || !snapshotFriendlyId) {
101+
this.logger.error("Snapshot callback missing metadata", { body });
102+
return { ok: false as const, status: 400 };
103+
}
104+
105+
this.#emitSnapshotSpan(runId, body.duration_ms, body.snapshot_id);
106+
107+
if (body.status === "completed") {
108+
const result = await this.workerClient.submitSuspendCompletion({
109+
runId,
110+
snapshotId: snapshotFriendlyId,
111+
body: {
112+
success: true,
113+
checkpoint: {
114+
type: "COMPUTE",
115+
location: body.snapshot_id,
116+
},
117+
},
118+
});
119+
120+
if (result.success) {
121+
this.logger.debug("Suspend completion submitted", {
122+
runId,
123+
instanceId: body.instance_id,
124+
snapshotId: body.snapshot_id,
125+
});
126+
} else {
127+
this.logger.error("Failed to submit suspend completion", {
128+
runId,
129+
snapshotFriendlyId,
130+
error: result.error,
131+
});
132+
}
133+
} else {
134+
const result = await this.workerClient.submitSuspendCompletion({
135+
runId,
136+
snapshotId: snapshotFriendlyId,
137+
body: {
138+
success: false,
139+
error: body.error ?? "Snapshot failed",
140+
},
141+
});
142+
143+
if (!result.success) {
144+
this.logger.error("Failed to submit suspend failure", {
145+
runId,
146+
snapshotFriendlyId,
147+
error: result.error,
148+
});
149+
}
150+
}
151+
152+
return { ok: true as const, status: 200 };
153+
}
154+
155+
registerTraceContext(runFriendlyId: string, ctx: RunTraceContext) {
156+
// Evict oldest entries if we've hit the cap. This is best-effort: on a busy
157+
// supervisor, entries for long-lived runs may be evicted before their snapshot
158+
// callback arrives, causing those snapshot spans to be silently dropped.
159+
// That's acceptable - trace spans are observability sugar, not correctness.
160+
if (this.runTraceContexts.size >= ComputeSnapshotService.MAX_TRACE_CONTEXTS) {
161+
const firstKey = this.runTraceContexts.keys().next().value;
162+
if (firstKey) {
163+
this.runTraceContexts.delete(firstKey);
164+
}
165+
}
166+
167+
this.runTraceContexts.set(runFriendlyId, ctx);
168+
}
169+
170+
/** Stop the timer wheel, dropping pending snapshots. */
171+
stop(): string[] {
172+
// Intentionally drop pending snapshots rather than dispatching them. The supervisor
173+
// is shutting down, so our callback URL will be dead by the time the gateway responds.
174+
// Runners detect the supervisor is gone and reconnect to a new instance, which
175+
// re-triggers the snapshot workflow. Snapshots are an optimization, not a correctness
176+
// requirement - runs continue fine without them.
177+
const remaining = this.timerWheel.stop();
178+
const droppedRuns = remaining.map((item) => item.key);
179+
180+
if (droppedRuns.length > 0) {
181+
this.logger.info("Stopped, dropped pending snapshots", { count: droppedRuns.length });
182+
this.logger.debug("Dropped snapshot details", { runs: droppedRuns });
183+
}
184+
185+
return droppedRuns;
186+
}
187+
188+
/** Dispatch a snapshot request to the gateway. */
189+
private async dispatch(snapshot: DelayedSnapshot): Promise<void> {
190+
const result = await this.computeManager.snapshot({
191+
runnerId: snapshot.runnerId,
192+
metadata: {
193+
runId: snapshot.runFriendlyId,
194+
snapshotFriendlyId: snapshot.snapshotFriendlyId,
195+
},
196+
});
197+
198+
if (!result) {
199+
this.logger.error("Failed to request snapshot", {
200+
runId: snapshot.runFriendlyId,
201+
runnerId: snapshot.runnerId,
202+
});
203+
}
204+
}
205+
206+
#emitSnapshotSpan(runFriendlyId: string, durationMs?: number, snapshotId?: string) {
207+
if (!this.tracing) return;
208+
209+
const ctx = this.runTraceContexts.get(runFriendlyId);
210+
if (!ctx) return;
211+
212+
const parsed = parseTraceparent(ctx.traceparent);
213+
if (!parsed) return;
214+
215+
const endEpochMs = Date.now();
216+
const startEpochMs = durationMs ? endEpochMs - durationMs : endEpochMs;
217+
218+
const spanAttributes: Record<string, string | number | boolean> = {
219+
"compute.type": "snapshot",
220+
};
221+
222+
if (durationMs !== undefined) {
223+
spanAttributes["compute.total_ms"] = durationMs;
224+
}
225+
226+
if (snapshotId) {
227+
spanAttributes["compute.snapshot_id"] = snapshotId;
228+
}
229+
230+
this.tracing.emit({
231+
traceId: parsed.traceId,
232+
parentSpanId: parsed.spanId,
233+
spanName: "compute.snapshot",
234+
startTimeMs: startEpochMs,
235+
endTimeMs: endEpochMs,
236+
resourceAttributes: {
237+
"ctx.environment.id": ctx.envId,
238+
"ctx.organization.id": ctx.orgId,
239+
"ctx.project.id": ctx.projectId,
240+
"ctx.run.id": runFriendlyId,
241+
},
242+
spanAttributes,
243+
});
244+
}
245+
}

0 commit comments

Comments
 (0)