Skip to content

Commit 18eb7bb

Browse files
committed
refactor: use OtlpTraceService in workload server
1 parent 9d72ae2 commit 18eb7bb

1 file changed

Lines changed: 23 additions & 31 deletions

File tree

  • apps/supervisor/src/workloadServer

apps/supervisor/src/workloadServer/index.ts

Lines changed: 23 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ import { env } from "../env.js";
2727
import type { ComputeWorkloadManager } from "../workloadManager/compute.js";
2828
import { TimerWheel } from "../services/timerWheel.js";
2929
import { parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
30-
import { buildOtlpTracePayload } from "../otlpPayload.js";
31-
import { sendOtlpTrace } from "../otlpTrace.js";
30+
import type { OtlpTraceService } from "../services/otlpTraceService.js";
3231

3332
// Use the official export when upgrading to socket.io@4.8.0
3433
interface DefaultEventsMap {
@@ -86,11 +85,13 @@ type WorkloadServerOptions = {
8685
workerClient: SupervisorHttpClient;
8786
checkpointClient?: CheckpointClient;
8887
computeManager?: ComputeWorkloadManager;
88+
tracing?: OtlpTraceService;
8989
};
9090

9191
export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
9292
private checkpointClient?: CheckpointClient;
9393
private computeManager?: ComputeWorkloadManager;
94+
private readonly tracing?: OtlpTraceService;
9495

9596
private readonly logger = new SimpleStructuredLogger("workload-server");
9697

@@ -130,10 +131,11 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
130131
this.workerClient = opts.workerClient;
131132
this.checkpointClient = opts.checkpointClient;
132133
this.computeManager = opts.computeManager;
134+
this.tracing = opts.tracing;
133135

134-
if (this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) {
136+
if (this.computeManager?.snapshotsEnabled) {
135137
this.snapshotDelayWheel = new TimerWheel<DelayedSnapshot>({
136-
delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS,
138+
delayMs: this.computeManager.snapshotDelayMs,
137139
onExpire: (item) => {
138140
this.dispatchComputeSnapshot(item.data).catch((error) => {
139141
this.logger.error("Compute snapshot dispatch failed", {
@@ -286,7 +288,12 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
286288
const deploymentVersion = this.deploymentVersionFromRequest(req);
287289
const projectRef = this.projectRefFromRequest(req);
288290

289-
this.logger.debug("Suspend request", { params, runnerId, deploymentVersion, projectRef });
291+
this.logger.debug("Suspend request", {
292+
params,
293+
runnerId,
294+
deploymentVersion,
295+
projectRef,
296+
});
290297

291298
if (!runnerId || !deploymentVersion || !projectRef) {
292299
this.logger.error("Invalid headers for suspend request", {
@@ -306,22 +313,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
306313
return;
307314
}
308315

309-
if (this.snapshotDelayWheel && this.computeManager && env.COMPUTE_SNAPSHOTS_ENABLED) {
310-
if (!env.TRIGGER_WORKLOAD_API_DOMAIN) {
311-
this.logger.error(
312-
"TRIGGER_WORKLOAD_API_DOMAIN is not set, cannot create snapshot callback URL"
313-
);
314-
reply.json(
315-
{
316-
ok: false,
317-
error: "Snapshot callbacks not configured",
318-
} satisfies WorkloadSuspendRunResponseBody,
319-
false,
320-
500
321-
);
322-
return;
323-
}
324-
316+
if (this.snapshotDelayWheel) {
325317
// Compute mode: delay snapshot to avoid wasted work on short-lived waitpoints.
326318
// If the run continues before the delay expires, the snapshot is cancelled.
327319
reply.json({ ok: true } satisfies WorkloadSuspendRunResponseBody, false, 202);
@@ -333,8 +325,9 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
333325
});
334326

335327
this.logger.debug("Snapshot delayed", {
336-
runId: params.runFriendlyId,
337-
delayMs: env.COMPUTE_SNAPSHOT_DELAY_MS,
328+
runFriendlyId: params.runFriendlyId,
329+
snapshotFriendlyId: params.snapshotFriendlyId,
330+
delayMs: this.computeManager?.snapshotDelayMs,
338331
});
339332

340333
return;
@@ -666,7 +659,11 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
666659
}
667660

668661
socket.on("disconnecting", (reason, description) => {
669-
socketLogger.verbose("Socket disconnecting", { ...getSocketMetadata(), reason, description });
662+
socketLogger.verbose("Socket disconnecting", {
663+
...getSocketMetadata(),
664+
reason,
665+
description,
666+
});
670667

671668
if (socket.data.runFriendlyId) {
672669
runDisconnected(socket.data.runFriendlyId);
@@ -766,11 +763,8 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
766763
private async dispatchComputeSnapshot(snapshot: DelayedSnapshot): Promise<void> {
767764
if (!this.computeManager) return;
768765

769-
const callbackUrl = `${env.TRIGGER_WORKLOAD_API_PROTOCOL}://${env.TRIGGER_WORKLOAD_API_DOMAIN}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}/api/v1/compute/snapshot-complete`;
770-
771766
const result = await this.computeManager.snapshot({
772767
runnerId: snapshot.runnerId,
773-
callbackUrl,
774768
metadata: {
775769
runId: snapshot.runFriendlyId,
776770
snapshotFriendlyId: snapshot.snapshotFriendlyId,
@@ -786,7 +780,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
786780
}
787781

788782
#emitSnapshotSpan(runFriendlyId: string, durationMs?: number, snapshotId?: string) {
789-
if (!env.COMPUTE_TRACE_SPANS_ENABLED) return;
783+
if (!this.tracing) return;
790784

791785
const ctx = this.runTraceContexts.get(runFriendlyId);
792786
if (!ctx) return;
@@ -809,7 +803,7 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
809803
spanAttributes["compute.snapshot_id"] = snapshotId;
810804
}
811805

812-
const payload = buildOtlpTracePayload({
806+
this.tracing?.emit({
813807
traceId: parsed.traceId,
814808
parentSpanId: parsed.spanId,
815809
spanName: "compute.snapshot",
@@ -823,8 +817,6 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
823817
},
824818
spanAttributes,
825819
});
826-
827-
sendOtlpTrace(payload);
828820
}
829821

830822
registerRunTraceContext(runFriendlyId: string, ctx: RunTraceContext) {

0 commit comments

Comments
 (0)