Skip to content

Commit 1ef59a7

Browse files
perf(run-engine): merge dequeue snapshot creation into taskRun.update transaction
Nest the TaskRunExecutionSnapshot create inside the preceding taskRun.update() call in the dequeue flow, reducing 2 explicit BEGIN/COMMIT transactions to 1 per dequeue operation. This follows the same pattern already used in the completion path (runAttemptSystem.ts:735) and trigger path (engine/index.ts:674-686). Side effects (heartbeat enqueue, executionSnapshotCreated event) are kept outside the transaction and fed the result of the merged write. Also adds a public enqueueHeartbeatIfNeeded() method to ExecutionSnapshotSystem for reuse by other flows that will adopt the same merged pattern. Refs: TRI-8450 Co-Authored-By: Eric Allam <eallam@icloud.com>
1 parent 7c95207 commit 1ef59a7

File tree

3 files changed

+69
-24
lines changed

3 files changed

+69
-24
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Merge execution snapshot creation into the dequeue taskRun.update transaction, reducing 2 DB commits to 1 per dequeue operation

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 42 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -435,6 +435,32 @@ export class DequeueSystem {
435435
cliVersion: result.worker.cliVersion,
436436
maxDurationInSeconds,
437437
maxAttempts: maxAttempts ?? undefined,
438+
executionSnapshots: {
439+
create: {
440+
engine: "V2",
441+
executionStatus: "PENDING_EXECUTING",
442+
description: "Run was dequeued for execution",
443+
// Map DEQUEUED -> PENDING for backwards compatibility with older runners
444+
runStatus: "PENDING",
445+
attemptNumber: result.run.attemptNumber ?? undefined,
446+
previousSnapshotId: snapshot.id,
447+
environmentId: snapshot.environmentId,
448+
environmentType: snapshot.environmentType,
449+
projectId: snapshot.projectId,
450+
organizationId: snapshot.organizationId,
451+
checkpointId: snapshot.checkpointId ?? undefined,
452+
batchId: snapshot.batchId ?? undefined,
453+
completedWaitpoints: {
454+
connect: snapshot.completedWaitpoints.map((w) => ({ id: w.id })),
455+
},
456+
completedWaitpointOrder: snapshot.completedWaitpoints
457+
.filter((c) => c.index !== undefined)
458+
.sort((a, b) => a.index! - b.index!)
459+
.map((w) => w.id),
460+
workerId,
461+
runnerId,
462+
},
463+
},
438464
},
439465
include: {
440466
runtimeEnvironment: true,
@@ -516,30 +542,22 @@ export class DequeueSystem {
516542
hasPrivateLink = billingResult.val.hasPrivateLink;
517543
}
518544

519-
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(
520-
prisma,
521-
{
522-
run: {
523-
id: runId,
524-
status: lockedTaskRun.status,
525-
attemptNumber: lockedTaskRun.attemptNumber,
526-
},
527-
snapshot: {
528-
executionStatus: "PENDING_EXECUTING",
529-
description: "Run was dequeued for execution",
530-
},
531-
previousSnapshotId: snapshot.id,
532-
environmentId: snapshot.environmentId,
533-
environmentType: snapshot.environmentType,
534-
projectId: snapshot.projectId,
535-
organizationId: snapshot.organizationId,
536-
checkpointId: snapshot.checkpointId ?? undefined,
537-
batchId: snapshot.batchId ?? undefined,
538-
completedWaitpoints: snapshot.completedWaitpoints,
539-
workerId,
540-
runnerId,
541-
}
542-
);
545+
// Snapshot was created as part of the taskRun.update above (single transaction).
546+
// Fetch the enhanced snapshot and handle side effects (heartbeat + event) manually.
547+
const newSnapshot = await getLatestExecutionSnapshot(prisma, runId);
548+
549+
this.$.eventBus.emit("executionSnapshotCreated", {
550+
time: newSnapshot.createdAt,
551+
run: {
552+
id: newSnapshot.runId,
553+
},
554+
snapshot: {
555+
...newSnapshot,
556+
completedWaitpointIds: newSnapshot.completedWaitpoints.map((wp) => wp.id),
557+
},
558+
});
559+
560+
await this.executionSnapshotSystem.enqueueHeartbeatIfNeeded(newSnapshot);
543561

544562
return {
545563
version: "1" as const,

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,6 +518,27 @@ export class ExecutionSnapshotSystem {
518518
return executionResultFromSnapshot(latestSnapshot);
519519
}
520520

521+
/**
522+
* Enqueues a heartbeat job for a snapshot if the execution status requires one.
523+
* Use this after nesting a snapshot create inside a taskRun.update() to replicate
524+
* the heartbeat side effect that createExecutionSnapshot normally handles.
525+
*/
526+
public async enqueueHeartbeatIfNeeded(snapshot: {
527+
id: string;
528+
runId: string;
529+
executionStatus: TaskRunExecutionStatus;
530+
}) {
531+
const intervalMs = this.#getHeartbeatIntervalMs(snapshot.executionStatus);
532+
if (intervalMs !== null) {
533+
await this.$.worker.enqueue({
534+
id: `heartbeatSnapshot.${snapshot.runId}`,
535+
job: "heartbeatSnapshot",
536+
payload: { snapshotId: snapshot.id, runId: snapshot.runId },
537+
availableAt: new Date(Date.now() + intervalMs),
538+
});
539+
}
540+
}
541+
521542
#getHeartbeatIntervalMs(status: TaskRunExecutionStatus): number | null {
522543
switch (status) {
523544
case "PENDING_EXECUTING": {

0 commit comments

Comments
 (0)