Skip to content

Commit 9d72ae2

Browse files
committed
refactor: remove env import from compute workload manager
1 parent 8711f5b commit 9d72ae2

2 files changed

Lines changed: 202 additions & 163 deletions

File tree

apps/supervisor/src/index.ts

Lines changed: 152 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { register } from "./metrics.js";
2626
import { PodCleaner } from "./services/podCleaner.js";
2727
import { FailedPodHandler } from "./services/failedPodHandler.js";
2828
import { getWorkerToken } from "./workerToken.js";
29+
import { OtlpTraceService } from "./services/otlpTraceService.js";
2930

3031
if (env.METRICS_COLLECT_DEFAULTS) {
3132
collectDefaultMetrics({ register });
@@ -40,7 +41,6 @@ class ManagedSupervisor {
4041
private readonly logger = new SimpleStructuredLogger("managed-supervisor");
4142
private readonly resourceMonitor: ResourceMonitor;
4243
private readonly checkpointClient?: CheckpointClient;
43-
private readonly isComputeMode: boolean;
4444

4545
private readonly podCleaner?: PodCleaner;
4646
private readonly failedPodHandler?: FailedPodHandler;
@@ -85,14 +85,33 @@ class ManagedSupervisor {
8585
: new DockerResourceMonitor(new Docker())
8686
: new NoopResourceMonitor();
8787

88-
this.isComputeMode = !!env.COMPUTE_GATEWAY_URL;
89-
9088
if (env.COMPUTE_GATEWAY_URL) {
89+
if (!env.TRIGGER_WORKLOAD_API_DOMAIN) {
90+
throw new Error("TRIGGER_WORKLOAD_API_DOMAIN is not set, cannot create compute manager");
91+
}
92+
93+
const callbackUrl = `${env.TRIGGER_WORKLOAD_API_PROTOCOL}://${env.TRIGGER_WORKLOAD_API_DOMAIN}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}/api/v1/compute/snapshot-complete`;
94+
9195
const computeManager = new ComputeWorkloadManager({
9296
...workloadManagerOptions,
93-
gatewayUrl: env.COMPUTE_GATEWAY_URL,
94-
gatewayAuthToken: env.COMPUTE_GATEWAY_AUTH_TOKEN,
95-
gatewayTimeoutMs: env.COMPUTE_GATEWAY_TIMEOUT_MS,
97+
gateway: {
98+
url: env.COMPUTE_GATEWAY_URL,
99+
authToken: env.COMPUTE_GATEWAY_AUTH_TOKEN,
100+
timeoutMs: env.COMPUTE_GATEWAY_TIMEOUT_MS,
101+
},
102+
snapshots: {
103+
enabled: env.COMPUTE_SNAPSHOTS_ENABLED,
104+
delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS,
105+
callbackUrl,
106+
},
107+
tracing: env.COMPUTE_TRACE_SPANS_ENABLED
108+
? new OtlpTraceService({ endpointUrl: env.OTEL_EXPORTER_OTLP_ENDPOINT })
109+
: undefined,
110+
runner: {
111+
instanceName: env.TRIGGER_WORKER_INSTANCE_NAME,
112+
otelEndpoint: env.OTEL_EXPORTER_OTLP_ENDPOINT,
113+
prettyLogs: env.RUNNER_PRETTY_LOGS,
114+
},
96115
});
97116
this.computeManager = computeManager;
98117
this.workloadManager = computeManager;
@@ -208,158 +227,164 @@ class ManagedSupervisor {
208227
this.workloadServer.notifyRun({ run });
209228
});
210229

211-
this.workerSession.on("runQueueMessage", async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => {
212-
this.logger.verbose(`Received message with timestamp ${time.toLocaleString()}`, message);
230+
this.workerSession.on(
231+
"runQueueMessage",
232+
async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => {
233+
this.logger.verbose(`Received message with timestamp ${time.toLocaleString()}`, message);
213234

214-
if (message.completedWaitpoints.length > 0) {
215-
this.logger.debug("Run has completed waitpoints", {
216-
runId: message.run.id,
217-
completedWaitpoints: message.completedWaitpoints.length,
218-
});
219-
}
235+
if (message.completedWaitpoints.length > 0) {
236+
this.logger.debug("Run has completed waitpoints", {
237+
runId: message.run.id,
238+
completedWaitpoints: message.completedWaitpoints.length,
239+
});
240+
}
220241

221-
if (!message.image) {
222-
this.logger.error("Run has no image", { runId: message.run.id });
223-
return;
224-
}
242+
if (!message.image) {
243+
this.logger.error("Run has no image", { runId: message.run.id });
244+
return;
245+
}
225246

226-
const { checkpoint, ...rest } = message;
227-
228-
// Register trace context early so snapshot spans work for all paths
229-
// (cold create, restore, warm start). Re-registration on restore is safe
230-
// since dequeue always provides fresh context.
231-
if (this.isComputeMode && env.COMPUTE_TRACE_SPANS_ENABLED) {
232-
const traceparent =
233-
message.run.traceContext &&
234-
"traceparent" in message.run.traceContext &&
235-
typeof message.run.traceContext.traceparent === "string"
236-
? message.run.traceContext.traceparent
237-
: undefined;
238-
239-
if (traceparent) {
240-
this.workloadServer.registerRunTraceContext(message.run.friendlyId, {
241-
traceparent,
242-
envId: message.environment.id,
243-
orgId: message.organization.id,
244-
projectId: message.project.id,
245-
});
247+
const { checkpoint, ...rest } = message;
248+
249+
// Register trace context early so snapshot spans work for all paths
250+
// (cold create, restore, warm start). Re-registration on restore is safe
251+
// since dequeue always provides fresh context.
252+
if (this.computeManager?.traceSpansEnabled) {
253+
const traceparent =
254+
message.run.traceContext &&
255+
"traceparent" in message.run.traceContext &&
256+
typeof message.run.traceContext.traceparent === "string"
257+
? message.run.traceContext.traceparent
258+
: undefined;
259+
260+
if (traceparent) {
261+
this.workloadServer.registerRunTraceContext(message.run.friendlyId, {
262+
traceparent,
263+
envId: message.environment.id,
264+
orgId: message.organization.id,
265+
projectId: message.project.id,
266+
});
267+
}
246268
}
247-
}
248269

249-
if (checkpoint) {
250-
this.logger.debug("Restoring run", { runId: message.run.id });
270+
if (checkpoint) {
271+
this.logger.debug("Restoring run", { runId: message.run.id });
272+
273+
if (this.computeManager) {
274+
try {
275+
// Derive runnerId unique per restore cycle (matches iceman's pattern)
276+
const runIdShort = message.run.friendlyId.replace("run_", "");
277+
const checkpointSuffix = checkpoint.id.slice(-8);
278+
const runnerId = `runner-${runIdShort}-${checkpointSuffix}`;
279+
280+
const didRestore = await this.computeManager.restore({
281+
snapshotId: checkpoint.location,
282+
runnerId,
283+
runFriendlyId: message.run.friendlyId,
284+
snapshotFriendlyId: message.snapshot.friendlyId,
285+
machine: message.run.machine,
286+
traceContext: message.run.traceContext,
287+
envId: message.environment.id,
288+
orgId: message.organization.id,
289+
projectId: message.project.id,
290+
dequeuedAt: message.dequeuedAt,
291+
});
292+
293+
if (didRestore) {
294+
this.logger.debug("Compute restore successful", {
295+
runId: message.run.id,
296+
runnerId,
297+
});
298+
} else {
299+
this.logger.error("Compute restore failed", { runId: message.run.id, runnerId });
300+
}
301+
} catch (error) {
302+
this.logger.error("Failed to restore run (compute)", { error });
303+
}
304+
305+
return;
306+
}
307+
308+
if (!this.checkpointClient) {
309+
this.logger.error("No checkpoint client", { runId: message.run.id });
310+
return;
311+
}
251312

252-
if (this.isComputeMode && this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) {
253313
try {
254-
// Derive runnerId unique per restore cycle (matches iceman's pattern)
255-
const runIdShort = message.run.friendlyId.replace("run_", "");
256-
const checkpointSuffix = checkpoint.id.slice(-8);
257-
const runnerId = `runner-${runIdShort}-${checkpointSuffix}`;
258-
259-
const didRestore = await this.computeManager.restore({
260-
snapshotId: checkpoint.location,
261-
runnerId,
314+
const didRestore = await this.checkpointClient.restoreRun({
262315
runFriendlyId: message.run.friendlyId,
263316
snapshotFriendlyId: message.snapshot.friendlyId,
264-
machine: message.run.machine,
265-
traceContext: message.run.traceContext,
266-
envId: message.environment.id,
267-
orgId: message.organization.id,
268-
projectId: message.project.id,
269-
dequeuedAt: message.dequeuedAt,
317+
body: {
318+
...rest,
319+
checkpoint,
320+
},
270321
});
271322

272323
if (didRestore) {
273-
this.logger.debug("Compute restore successful", { runId: message.run.id, runnerId });
324+
this.logger.debug("Restore successful", { runId: message.run.id });
274325
} else {
275-
this.logger.error("Compute restore failed", { runId: message.run.id, runnerId });
326+
this.logger.error("Restore failed", { runId: message.run.id });
276327
}
277328
} catch (error) {
278-
this.logger.error("Failed to restore run (compute)", { error });
329+
this.logger.error("Failed to restore run", { error });
279330
}
280331

281332
return;
282333
}
283334

284-
if (!this.checkpointClient) {
285-
this.logger.error("No checkpoint client", { runId: message.run.id });
335+
this.logger.debug("Scheduling run", { runId: message.run.id });
336+
337+
const warmStartStart = performance.now();
338+
const didWarmStart = await this.tryWarmStart(message);
339+
const warmStartCheckMs = Math.round(performance.now() - warmStartStart);
340+
341+
if (didWarmStart) {
342+
this.logger.debug("Warm start successful", { runId: message.run.id });
286343
return;
287344
}
288345

289346
try {
290-
const didRestore = await this.checkpointClient.restoreRun({
347+
if (!message.deployment.friendlyId) {
348+
// mostly a type guard, deployments always exists for deployed environments
349+
// a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
350+
throw new Error("Deployment is missing");
351+
}
352+
353+
await this.workloadManager.create({
354+
dequeuedAt: message.dequeuedAt,
355+
dequeueResponseMs,
356+
pollingIntervalMs,
357+
warmStartCheckMs,
358+
envId: message.environment.id,
359+
envType: message.environment.type,
360+
image: message.image,
361+
machine: message.run.machine,
362+
orgId: message.organization.id,
363+
projectId: message.project.id,
364+
deploymentFriendlyId: message.deployment.friendlyId,
365+
deploymentVersion: message.backgroundWorker.version,
366+
runId: message.run.id,
291367
runFriendlyId: message.run.friendlyId,
368+
version: message.version,
369+
nextAttemptNumber: message.run.attemptNumber,
370+
snapshotId: message.snapshot.id,
292371
snapshotFriendlyId: message.snapshot.friendlyId,
293-
body: {
294-
...rest,
295-
checkpoint,
296-
},
372+
placementTags: message.placementTags,
373+
traceContext: message.run.traceContext,
374+
annotations: message.run.annotations,
375+
hasPrivateLink: message.organization.hasPrivateLink,
297376
});
298377

299-
if (didRestore) {
300-
this.logger.debug("Restore successful", { runId: message.run.id });
301-
} else {
302-
this.logger.error("Restore failed", { runId: message.run.id });
303-
}
378+
// Disabled for now
379+
// this.resourceMonitor.blockResources({
380+
// cpu: message.run.machine.cpu,
381+
// memory: message.run.machine.memory,
382+
// });
304383
} catch (error) {
305-
this.logger.error("Failed to restore run", { error });
384+
this.logger.error("Failed to create workload", { error });
306385
}
307-
308-
return;
309386
}
310-
311-
this.logger.debug("Scheduling run", { runId: message.run.id });
312-
313-
const warmStartStart = performance.now();
314-
const didWarmStart = await this.tryWarmStart(message);
315-
const warmStartCheckMs = Math.round(performance.now() - warmStartStart);
316-
317-
if (didWarmStart) {
318-
this.logger.debug("Warm start successful", { runId: message.run.id });
319-
return;
320-
}
321-
322-
try {
323-
if (!message.deployment.friendlyId) {
324-
// mostly a type guard, deployments always exists for deployed environments
325-
// a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
326-
throw new Error("Deployment is missing");
327-
}
328-
329-
await this.workloadManager.create({
330-
dequeuedAt: message.dequeuedAt,
331-
dequeueResponseMs,
332-
pollingIntervalMs,
333-
warmStartCheckMs,
334-
envId: message.environment.id,
335-
envType: message.environment.type,
336-
image: message.image,
337-
machine: message.run.machine,
338-
orgId: message.organization.id,
339-
projectId: message.project.id,
340-
deploymentFriendlyId: message.deployment.friendlyId,
341-
deploymentVersion: message.backgroundWorker.version,
342-
runId: message.run.id,
343-
runFriendlyId: message.run.friendlyId,
344-
version: message.version,
345-
nextAttemptNumber: message.run.attemptNumber,
346-
snapshotId: message.snapshot.id,
347-
snapshotFriendlyId: message.snapshot.friendlyId,
348-
placementTags: message.placementTags,
349-
traceContext: message.run.traceContext,
350-
annotations: message.run.annotations,
351-
hasPrivateLink: message.organization.hasPrivateLink,
352-
});
353-
354-
// Disabled for now
355-
// this.resourceMonitor.blockResources({
356-
// cpu: message.run.machine.cpu,
357-
// memory: message.run.machine.memory,
358-
// });
359-
} catch (error) {
360-
this.logger.error("Failed to create workload", { error });
361-
}
362-
});
387+
);
363388

364389
if (env.METRICS_ENABLED) {
365390
this.metricsServer = new HttpServer({

0 commit comments

Comments
 (0)