Skip to content

Commit 04b2039

Browse files
committed
Restructure the run events to be more sparse
1 parent cc6695c commit 04b2039

31 files changed

Lines changed: 1907 additions & 183 deletions

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.next.runs._index/route.tsx

Lines changed: 506 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { Outlet } from "@remix-run/react";
2+
import { PageContainer } from "~/components/layout/AppLayout";
3+
4+
export default function Page() {
5+
return (
6+
<PageContainer>
7+
<Outlet />
8+
</PageContainer>
9+
);
10+
}

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { createTag, getTagsForRunId, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
7-
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
8-
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
7+
import { emitRunTagsUpdated } from "~/services/runsDashboardInstance.server";
98

109
const ParamsSchema = z.object({
1110
runId: z.string(),
@@ -96,7 +95,24 @@ export async function action({ request, params }: ActionFunctionArgs) {
9695
},
9796
});
9897

99-
emitRunStatusUpdate(taskRun.id);
98+
emitRunTagsUpdated({
99+
time: new Date(),
100+
run: {
101+
id: taskRun.id,
102+
tags: taskRun.runTags,
103+
status: taskRun.status,
104+
updatedAt: taskRun.updatedAt,
105+
},
106+
organization: {
107+
id: authenticationResult.environment.organizationId,
108+
},
109+
project: {
110+
id: authenticationResult.environment.projectId,
111+
},
112+
environment: {
113+
id: authenticationResult.environment.id,
114+
},
115+
});
100116

101117
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
102118
} catch (error) {

apps/webapp/app/runEngine/types.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import {
77
TriggerTaskRequestBody,
88
} from "@trigger.dev/core/v3";
99
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
10-
import type { TaskRunStatusUpdateEnvironment } from "~/services/runsDashboardInstance.server";
1110

1211
export type TriggerTaskServiceOptions = {
1312
idempotencyKey?: string;

apps/webapp/app/services/runsDashboardInstance.server.ts

Lines changed: 202 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,80 +6,263 @@ import { engine } from "~/v3/runEngine.server";
66
import { logger } from "./logger.server";
77
import {
88
RunDashboardEventBus,
9+
RunDashboardEventRunAttemptStarted,
10+
RunDashboardEventRunCancelled,
11+
RunDashboardEventRunDelayRescheduled,
12+
RunDashboardEventRunEnqueuedAfterDelay,
13+
RunDashboardEventRunExpired,
14+
RunDashboardEventRunFailed,
15+
RunDashboardEventRunLocked,
16+
RunDashboardEventRunRetryScheduled,
17+
RunDashboardEventRunStatusChanged,
18+
RunDashboardEventRunSucceeded,
19+
RunDashboardEventRunTagsUpdated,
920
RunDashboardEvents,
1021
RunsDashboardService,
1122
} from "./runsDashboardService.server";
1223
import { tryCatch } from "@trigger.dev/core/utils";
1324

1425
const runDashboardEventBus: RunDashboardEventBus = new EventEmitter<RunDashboardEvents>();
1526

16-
export function emitRunStatusUpdate(runId: string) {
17-
runDashboardEventBus.emit("runStatusUpdate", {
18-
time: new Date(),
27+
export function emitRunStatusChanged(event: RunDashboardEventRunStatusChanged) {
28+
runDashboardEventBus.emit("runStatusChanged", event);
29+
}
30+
31+
export function emitRunCreated(time: Date, runId: string) {
32+
runDashboardEventBus.emit("runCreated", {
33+
time,
1934
runId,
2035
});
2136
}
2237

38+
export function emitRunAttemptStarted(event: RunDashboardEventRunAttemptStarted) {
39+
runDashboardEventBus.emit("runAttemptStarted", event);
40+
}
41+
42+
export function emitRunFailed(event: RunDashboardEventRunFailed) {
43+
runDashboardEventBus.emit("runFailed", event);
44+
}
45+
46+
export function emitRunSucceeded(event: RunDashboardEventRunSucceeded) {
47+
runDashboardEventBus.emit("runSucceeded", event);
48+
}
49+
50+
export function emitRunCancelled(event: RunDashboardEventRunCancelled) {
51+
runDashboardEventBus.emit("runCancelled", event);
52+
}
53+
54+
export function emitRunRetryScheduled(event: RunDashboardEventRunRetryScheduled) {
55+
runDashboardEventBus.emit("runRetryScheduled", event);
56+
}
57+
58+
export function emitRunDelayRescheduled(event: RunDashboardEventRunDelayRescheduled) {
59+
runDashboardEventBus.emit("runDelayRescheduled", event);
60+
}
61+
62+
export function emitRunLocked(event: RunDashboardEventRunLocked) {
63+
runDashboardEventBus.emit("runLocked", event);
64+
}
65+
66+
export function emitRunExpired(event: RunDashboardEventRunExpired) {
67+
runDashboardEventBus.emit("runExpired", event);
68+
}
69+
70+
export function emitRunTagsUpdated(event: RunDashboardEventRunTagsUpdated) {
71+
runDashboardEventBus.emit("runTagsUpdated", event);
72+
}
73+
74+
export function emitRunEnqueuedAfterDelay(event: RunDashboardEventRunEnqueuedAfterDelay) {
75+
runDashboardEventBus.emit("runEnqueuedAfterDelay", event);
76+
}
77+
2378
export const runsDashboard = singleton("runsDashboard", () => {
2479
const clickhouse = ClickHouse.fromEnv();
2580

2681
const service = new RunsDashboardService(clickhouse);
2782

28-
runDashboardEventBus.on("runStatusUpdate", async (event) => {
29-
const [upsertError] = await tryCatch(upsertRun(event.time, event.runId, service));
83+
runDashboardEventBus.on("runCreated", async (event) => {
84+
const [runCreatedError] = await tryCatch(runCreated(event.time, event.runId, service));
3085

31-
if (upsertError) {
32-
logger.error("RunDashboard: runStatusUpdate: upsertRun error", {
86+
if (runCreatedError) {
87+
logger.error("RunDashboard: runCreated: runCreated error", {
3388
runId: event.runId,
34-
error: upsertError,
89+
error: runCreatedError,
3590
});
3691
}
3792
});
3893

39-
engine.eventBus.on("runStatusChanged", async (event) => {
40-
const [upsertError] = await tryCatch(upsertRun(event.time, event.runId, service));
94+
runDashboardEventBus.on("runAttemptStarted", async (event) => {
95+
const [runAttemptStartedError] = await tryCatch(service.runAttemptStarted(event));
4196

42-
if (upsertError) {
43-
logger.error("RunDashboard: runStatusChanged: upsertRun error", {
44-
runId: event.runId,
45-
error: upsertError,
97+
if (runAttemptStartedError) {
98+
logger.error("RunDashboard: runAttemptStarted: runAttemptStarted error", {
99+
runId: event.run.id,
100+
error: runAttemptStartedError,
46101
});
47102
}
48103
});
49104

105+
runDashboardEventBus.on("runStatusChanged", async (event) => {
106+
const [runStatusChangedError] = await tryCatch(service.runStatusChanged(event));
107+
108+
if (runStatusChangedError) {
109+
logger.error("RunDashboard: runStatusChanged: runStatusChanged error", {
110+
runId: event.run.id,
111+
error: runStatusChangedError,
112+
});
113+
}
114+
});
115+
116+
runDashboardEventBus.on("runFailed", async (event) => {
117+
const [runFailedError] = await tryCatch(service.runFailed(event));
118+
119+
if (runFailedError) {
120+
logger.error("RunDashboard: runFailed: runFailed error", {
121+
runId: event.run.id,
122+
error: runFailedError,
123+
});
124+
}
125+
});
126+
127+
runDashboardEventBus.on("runSucceeded", async (event) => {
128+
const [runSucceededError] = await tryCatch(service.runSucceeded(event));
129+
130+
if (runSucceededError) {
131+
logger.error("RunDashboard: runSucceeded: runSucceeded error", {
132+
runId: event.run.id,
133+
error: runSucceededError,
134+
});
135+
}
136+
});
137+
138+
runDashboardEventBus.on("runCancelled", async (event) => {
139+
const [runCancelledError] = await tryCatch(service.runCancelled(event));
140+
141+
if (runCancelledError) {
142+
logger.error("RunDashboard: runCancelled: runCancelled error", {
143+
runId: event.run.id,
144+
error: runCancelledError,
145+
});
146+
}
147+
});
148+
149+
runDashboardEventBus.on("runRetryScheduled", async (event) => {
150+
const [runRetryScheduledError] = await tryCatch(service.runRetryScheduled(event));
151+
152+
if (runRetryScheduledError) {
153+
logger.error("RunDashboard: runRetryScheduled: runRetryScheduled error", {
154+
runId: event.run.id,
155+
error: runRetryScheduledError,
156+
});
157+
}
158+
});
159+
160+
runDashboardEventBus.on("runDelayRescheduled", async (event) => {
161+
const [runDelayRescheduledError] = await tryCatch(service.runDelayRescheduled(event));
162+
163+
if (runDelayRescheduledError) {
164+
logger.error("RunDashboard: runDelayRescheduled: runDelayRescheduled error", {
165+
runId: event.run.id,
166+
error: runDelayRescheduledError,
167+
});
168+
}
169+
});
170+
171+
runDashboardEventBus.on("runLocked", async (event) => {
172+
const [runLockedError] = await tryCatch(service.runLocked(event));
173+
174+
if (runLockedError) {
175+
logger.error("RunDashboard: runLocked: runLocked error", {
176+
runId: event.run.id,
177+
error: runLockedError,
178+
});
179+
}
180+
});
181+
182+
runDashboardEventBus.on("runExpired", async (event) => {
183+
const [runExpiredError] = await tryCatch(service.runExpired(event));
184+
185+
if (runExpiredError) {
186+
logger.error("RunDashboard: runExpired: runExpired error", {
187+
runId: event.run.id,
188+
error: runExpiredError,
189+
});
190+
}
191+
});
192+
193+
engine.eventBus.on("runCreated", async (event) => {
194+
runDashboardEventBus.emit("runCreated", event);
195+
});
196+
197+
engine.eventBus.on("runAttemptStarted", async (event) => {
198+
runDashboardEventBus.emit("runAttemptStarted", event);
199+
});
200+
201+
engine.eventBus.on("runStatusChanged", async (event) => {
202+
runDashboardEventBus.emit("runStatusChanged", event);
203+
});
204+
205+
engine.eventBus.on("runFailed", async (event) => {
206+
runDashboardEventBus.emit("runFailed", event);
207+
});
208+
209+
engine.eventBus.on("runSucceeded", async (event) => {
210+
runDashboardEventBus.emit("runSucceeded", event);
211+
});
212+
213+
engine.eventBus.on("runCancelled", async (event) => {
214+
runDashboardEventBus.emit("runCancelled", event);
215+
});
216+
217+
engine.eventBus.on("runRetryScheduled", async (event) => {
218+
runDashboardEventBus.emit("runRetryScheduled", event);
219+
});
220+
221+
engine.eventBus.on("runDelayRescheduled", async (event) => {
222+
runDashboardEventBus.emit("runDelayRescheduled", event);
223+
});
224+
225+
engine.eventBus.on("runLocked", async (event) => {
226+
runDashboardEventBus.emit("runLocked", event);
227+
});
228+
229+
engine.eventBus.on("runExpired", async (event) => {
230+
runDashboardEventBus.emit("runExpired", event);
231+
});
232+
50233
return service;
51234
});
52235

53-
async function upsertRun(time: Date, runId: string, service: RunsDashboardService) {
236+
async function runCreated(time: Date, runId: string, service: RunsDashboardService) {
54237
const run = await prisma.taskRun.findFirst({
55238
where: {
56239
id: runId,
57240
},
58241
});
59242

60243
if (!run) {
61-
logger.error("RunDashboard: upsertRun: run not found", {
244+
logger.error("RunDashboard: runCreated: run not found", {
62245
runId,
63246
});
64247

65248
return;
66249
}
67250

68251
if (!run.environmentType) {
69-
logger.error("RunDashboard: upsertRun: run environment type not found", {
252+
logger.error("RunDashboard: runCreated: run environment type not found", {
70253
runId,
71254
});
72255

73256
return;
74257
}
75258

76259
if (!run.organizationId) {
77-
logger.error("RunDashboard: upsertRun: run organization id not found", {
260+
logger.error("RunDashboard: runCreated: run organization id not found", {
78261
runId,
79262
});
80263

81264
return;
82265
}
83266

84-
await service.upsertRun(time, run, run.environmentType, run.organizationId);
267+
await service.runCreated(time, run, run.environmentType, run.organizationId);
85268
}

0 commit comments

Comments
 (0)