Skip to content

Commit 05a6721

Browse files
committed
fix: review fixes - COMPUTE checkpoint type, memory_gb standardization, OTLP endpoint, snapshot concurrency
1 parent 30df9e2 commit 05a6721

11 files changed

Lines changed: 53 additions & 26 deletions

File tree

apps/supervisor/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
},
1515
"dependencies": {
1616
"@aws-sdk/client-ecr": "^3.839.0",
17-
"@kubernetes/client-node": "^1.0.0",
1817
"@internal/compute": "workspace:*",
18+
"@kubernetes/client-node": "^1.0.0",
1919
"@trigger.dev/core": "workspace:*",
2020
"dockerode": "^4.0.6",
21+
"p-limit": "^6.2.0",
2122
"prom-client": "^15.1.0",
2223
"socket.io": "4.7.4",
2324
"std-env": "^3.8.0",

apps/supervisor/src/index.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import { PodCleaner } from "./services/podCleaner.js";
2727
import { FailedPodHandler } from "./services/failedPodHandler.js";
2828
import { getWorkerToken } from "./workerToken.js";
2929
import { OtlpTraceService } from "./services/otlpTraceService.js";
30-
import { extractTraceparent } from "./util.js";
30+
import { extractTraceparent, getRestoreRunnerId } from "./util.js";
3131

3232
if (env.METRICS_COLLECT_DEFAULTS) {
3333
collectDefaultMetrics({ register });
@@ -96,7 +96,7 @@ class ManagedSupervisor {
9696

9797
if (env.COMPUTE_TRACE_SPANS_ENABLED) {
9898
this.tracing = new OtlpTraceService({
99-
endpointUrl: env.OTEL_EXPORTER_OTLP_ENDPOINT,
99+
endpointUrl: env.COMPUTE_TRACE_OTLP_ENDPOINT,
100100
});
101101
}
102102

@@ -273,10 +273,7 @@ class ManagedSupervisor {
273273

274274
if (this.computeManager) {
275275
try {
276-
// Derive runnerId unique per restore cycle (matches iceman's pattern)
277-
const runIdShort = message.run.friendlyId.replace("run_", "");
278-
const checkpointSuffix = checkpoint.id.slice(-8);
279-
const runnerId = `runner-${runIdShort}-${checkpointSuffix}`;
276+
const runnerId = getRestoreRunnerId(message.run.friendlyId, checkpoint.id);
280277

281278
const didRestore = await this.computeManager.restore({
282279
snapshotId: checkpoint.location,

apps/supervisor/src/util.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,10 @@ export function getRunnerId(runId: string, attemptNumber?: number) {
3535

3636
return parts.join("-");
3737
}
38+
39+
/** Derive a unique runnerId for a restore cycle using the checkpoint suffix */
40+
export function getRestoreRunnerId(runFriendlyId: string, checkpointId: string) {
41+
const runIdShort = runFriendlyId.replace("run_", "");
42+
const checkpointSuffix = checkpointId.slice(-8);
43+
return `runner-${runIdShort}-${checkpointSuffix}`;
44+
}

apps/supervisor/src/workloadManager/compute.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ export class ComputeWorkloadManager implements WorkloadManager {
154154
event.error = error instanceof Error ? error.message : String(error);
155155
event.errorType =
156156
error instanceof DOMException && error.name === "TimeoutError" ? "timeout" : "fetch";
157+
// Intentional: errors are captured in the wide event, not thrown. This matches
158+
// the Docker/K8s managers. The run will eventually time out if scheduling fails.
157159
return;
158160
}
159161

@@ -293,7 +295,7 @@ export class ComputeWorkloadManager implements WorkloadManager {
293295
name: opts.runnerId,
294296
metadata,
295297
cpu: opts.machine.cpu,
296-
memory_mb: opts.machine.memory * 1024,
298+
memory_gb: opts.machine.memory,
297299
})
298300
);
299301

apps/supervisor/src/workloadServer/index.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { type Namespace, Server, type Socket } from "socket.io";
22
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
33
import EventEmitter from "node:events";
4+
import pLimit from "p-limit";
45
import { z } from "zod";
56
import {
67
type SupervisorHttpClient,
@@ -119,8 +120,10 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
119120
// hours later after a checkpoint/restore cycle. Using a capped map avoids unbounded
120121
// growth while keeping recent contexts available. Oldest entries are evicted first.
121122
private static readonly MAX_TRACE_CONTEXTS = 10_000;
123+
private static readonly SNAPSHOT_CONCURRENCY = 10;
122124
private readonly runTraceContexts = new Map<string, RunTraceContext>();
123125
private readonly snapshotDelayWheel?: TimerWheel<DelayedSnapshot>;
126+
private readonly snapshotLimit = pLimit(WorkloadServer.SNAPSHOT_CONCURRENCY);
124127

125128
constructor(opts: WorkloadServerOptions) {
126129
super();
@@ -137,7 +140,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
137140
this.snapshotDelayWheel = new TimerWheel<DelayedSnapshot>({
138141
delayMs: this.computeManager.snapshotDelayMs,
139142
onExpire: (item) => {
140-
this.dispatchComputeSnapshot(item.data).catch((error) => {
143+
this.snapshotLimit(() => this.dispatchComputeSnapshot(item.data)).catch((error) => {
141144
this.logger.error("Compute snapshot dispatch failed", {
142145
runId: item.data.runFriendlyId,
143146
runnerId: item.data.runnerId,
@@ -513,7 +516,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
513516
body: {
514517
success: true,
515518
checkpoint: {
516-
type: "KUBERNETES",
519+
type: "COMPUTE",
517520
location: body.snapshot_id,
518521
},
519522
},
@@ -820,7 +823,10 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
820823
}
821824

822825
registerRunTraceContext(runFriendlyId: string, ctx: RunTraceContext) {
823-
// Evict oldest entries if we've hit the cap
826+
// Evict oldest entries if we've hit the cap. This is best-effort: on a busy
827+
// supervisor, entries for long-lived runs may be evicted before their snapshot
828+
// callback arrives, causing those snapshot spans to be silently dropped.
829+
// That's acceptable - trace spans are observability sugar, not correctness.
824830
if (this.runTraceContexts.size >= WorkloadServer.MAX_TRACE_CONTEXTS) {
825831
const firstKey = this.runTraceContexts.keys().next().value;
826832
if (firstKey) {

apps/webapp/app/v3/services/computeTemplateCreation.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ export class ComputeTemplateCreationService {
169169
await this.client.templates.create({
170170
image: stripImageDigest(imageReference),
171171
cpu: machine.cpu,
172-
memory_mb: machine.memory * 1024,
172+
memory_gb: machine.memory,
173173
background: options?.background,
174174
});
175175
return { success: true };

internal-packages/compute/src/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { z } from "zod";
55
export const TemplateCreateRequestSchema = z.object({
66
image: z.string(),
77
cpu: z.number(),
8-
memory_mb: z.number(),
8+
memory_gb: z.number(),
99
background: z.boolean().optional(),
1010
callback: z
1111
.object({
@@ -58,6 +58,6 @@ export const SnapshotRestoreRequestSchema = z.object({
5858
name: z.string(),
5959
metadata: z.record(z.string()),
6060
cpu: z.number(),
61-
memory_mb: z.number(),
61+
memory_gb: z.number(),
6262
});
6363
export type SnapshotRestoreRequest = z.infer<typeof SnapshotRestoreRequestSchema>;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterEnum
2+
ALTER TYPE "TaskRunCheckpointType" ADD VALUE 'COMPUTE';

internal-packages/database/prisma/schema.prisma

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,6 +1100,7 @@ model TaskRunCheckpoint {
11001100
enum TaskRunCheckpointType {
11011101
DOCKER
11021102
KUBERNETES
1103+
COMPUTE
11031104
}
11041105

11051106
/// A Waitpoint blocks a run from continuing until it's completed

packages/core/src/v3/schemas/runEngine.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,8 @@ export type CompleteRunAttemptResult = z.infer<typeof CompleteRunAttemptResult>;
177177
export const CheckpointTypeEnum = {
178178
DOCKER: "DOCKER",
179179
KUBERNETES: "KUBERNETES",
180-
} satisfies Enum<DB_TYPES.CheckpointType>;
180+
COMPUTE: "COMPUTE",
181+
} satisfies Enum<DB_TYPES.TaskRunCheckpointType>;
181182
export type CheckpointTypeEnum = (typeof CheckpointTypeEnum)[keyof typeof CheckpointTypeEnum];
182183

183184
export const CheckpointType = z.enum(Object.values(CheckpointTypeEnum) as [CheckpointTypeEnum]);

0 commit comments

Comments
 (0)