-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Expand file tree
/
Copy pathbatchSystem.ts
More file actions
120 lines (106 loc) · 3.52 KB
/
batchSystem.ts
File metadata and controls
120 lines (106 loc) · 3.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import { startSpan } from "@internal/tracing";
import { isFinalRunStatus } from "../statuses.js";
import { SystemResources } from "./systems.js";
import { WaitpointSystem } from "./waitpointSystem.js";
export type BatchSystemOptions = {
resources: SystemResources;
waitpointSystem: WaitpointSystem;
};
export class BatchSystem {
private readonly $: SystemResources;
private readonly waitpointSystem: WaitpointSystem;
constructor(private readonly options: BatchSystemOptions) {
this.$ = options.resources;
this.waitpointSystem = options.waitpointSystem;
}
public async scheduleCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
await this.$.worker.enqueue({
//this will debounce the call
id: `tryCompleteBatch:${batchId}`,
job: "tryCompleteBatch",
payload: { batchId: batchId },
//200ms in the future
availableAt: new Date(Date.now() + 200),
});
}
public async performCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
await this.#tryCompleteBatch({ batchId });
}
/**
* Checks to see if all runs for a BatchTaskRun are completed, if they are then update the status.
* This isn't used operationally, but it's used for the Batches dashboard page.
*/
async #tryCompleteBatch({ batchId }: { batchId: string }) {
return startSpan(this.$.tracer, "#tryCompleteBatch", async (span) => {
const batch = await this.$.prisma.batchTaskRun.findFirst({
select: {
status: true,
runtimeEnvironmentId: true,
processingJobsCount: true,
runCount: true,
},
where: {
id: batchId,
},
});
if (!batch) {
this.$.logger.error("#tryCompleteBatch batch doesn't exist", { batchId });
return;
}
if (batch.status === "COMPLETED") {
this.$.logger.debug("#tryCompleteBatch: Batch already completed", { batchId });
return;
}
if (batch.processingJobsCount < batch.runCount) {
this.$.logger.debug("#tryCompleteBatch: Not all runs are created yet", {
batchId,
processingJobsCount: batch.processingJobsCount,
runCount: batch.runCount,
});
return;
}
const runs = await this.$.prisma.taskRun.findMany({
select: {
id: true,
status: true,
},
where: {
batchId,
runtimeEnvironmentId: batch.runtimeEnvironmentId,
},
});
if (runs.every((r) => isFinalRunStatus(r.status))) {
this.$.logger.debug("#tryCompleteBatch: All runs are completed", { batchId });
await this.$.prisma.batchTaskRun.update({
where: {
id: batchId,
},
data: {
status: "COMPLETED",
},
});
//get waitpoint (if there is one)
const waitpoint = await this.$.prisma.waitpoint.findFirst({
where: {
completedByBatchId: batchId,
},
});
if (!waitpoint) {
this.$.logger.debug(
"RunEngine.unblockRunForBatch(): Waitpoint not found. This is ok, because only batchTriggerAndWait has waitpoints",
{
batchId,
}
);
return;
}
await this.waitpointSystem.completeWaitpoint({
id: waitpoint.id,
output: { value: "Batch waitpoint completed", isError: false },
});
} else {
this.$.logger.debug("#tryCompleteBatch: Not all runs are completed", { batchId });
}
});
}
}