Skip to content

Commit 9a1fd64

Browse files
committed
WIP runs dashboard service
1 parent 5cd2d20 commit 9a1fd64

15 files changed

Lines changed: 565 additions & 211 deletions

File tree

apps/webapp/app/runEngine/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
TriggerTaskRequestBody,
88
} from "@trigger.dev/core/v3";
99
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
10-
import { z } from "zod";
10+
import type { TaskRunStatusUpdateEnvironment } from "~/services/runsDashboardInstance.server";
1111

1212
export type TriggerTaskServiceOptions = {
1313
idempotencyKey?: string;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { singleton } from "~/utils/singleton";
2+
import { ClickHouse } from "@internal/clickhouse";
3+
import {
4+
RunDashboardEventBus,
5+
RunDashboardEvents,
6+
RunsDashboardService,
7+
} from "./runsDashboardService.server";
8+
import { EventEmitter } from "node:events";
9+
import { RuntimeEnvironmentType, TaskRun } from "@trigger.dev/database";
10+
11+
const runDashboardEventBus: RunDashboardEventBus = new EventEmitter<RunDashboardEvents>();
12+
13+
export type TaskRunStatusUpdateEnvironment = {
14+
type: RuntimeEnvironmentType;
15+
organizationId: string;
16+
};
17+
18+
export function emitRunStatusUpdate(run: TaskRun, environment: TaskRunStatusUpdateEnvironment) {
19+
runDashboardEventBus.emit("runStatusUpdate", {
20+
run,
21+
environment,
22+
organization: { id: environment.organizationId },
23+
});
24+
}
25+
26+
export const runsDashboard = singleton("runsDashboard", () => {
27+
const clickhouse = ClickHouse.fromEnv();
28+
29+
const service = new RunsDashboardService(clickhouse);
30+
31+
runDashboardEventBus.on("runStatusUpdate", async (event) => {
32+
await service.upsertRun(event.run, event.environment.type, event.organization.id);
33+
});
34+
35+
return service;
36+
});
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import type { ClickHouse } from "@internal/clickhouse";
2+
import { TaskRunError } from "@trigger.dev/core/v3/schemas";
3+
import { RuntimeEnvironmentType, TaskRun } from "@trigger.dev/database";
4+
import { logger } from "./logger.server";
5+
import { EventEmitter } from "node:events";
6+
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
7+
8+
export class RunsDashboardService {
9+
constructor(private readonly clickhouse: ClickHouse) {}
10+
11+
private readonly logger = logger.child({
12+
service: "RunsDashboardService",
13+
});
14+
15+
async upsertRun(
16+
taskRun: TaskRun,
17+
environmentType: RuntimeEnvironmentType,
18+
organizationId: string
19+
) {
20+
const [payload, output] = await Promise.all([
21+
this.#preparePayload(taskRun),
22+
this.#prepareOutput(taskRun),
23+
]);
24+
25+
const [insertError, insertResult] = await this.clickhouse.runEvents.insert({
26+
environment_id: taskRun.runtimeEnvironmentId,
27+
environment_type: environmentType,
28+
organization_id: organizationId,
29+
project_id: taskRun.projectId,
30+
run_id: taskRun.id,
31+
friendly_id: taskRun.friendlyId,
32+
attempt: taskRun.attemptNumber ?? 1,
33+
engine: taskRun.engine,
34+
status: taskRun.status,
35+
task_identifier: taskRun.taskIdentifier,
36+
queue: taskRun.queue,
37+
schedule_id: taskRun.scheduleId ?? undefined,
38+
batch_id: taskRun.batchId ?? undefined,
39+
event_time: Date.now(),
40+
created_at: taskRun.createdAt.getTime(),
41+
updated_at: taskRun.updatedAt.getTime(),
42+
completed_at: taskRun.completedAt ? taskRun.completedAt.getTime() : undefined,
43+
started_at: taskRun.startedAt ? taskRun.startedAt.getTime() : undefined,
44+
executed_at: taskRun.executedAt ? taskRun.executedAt.getTime() : undefined,
45+
delay_until: taskRun.delayUntil ? taskRun.delayUntil.getTime() : undefined,
46+
queued_at: taskRun.queuedAt ? taskRun.queuedAt.getTime() : undefined,
47+
expired_at: taskRun.expiredAt ? taskRun.expiredAt.getTime() : undefined,
48+
usage_duration_ms: taskRun.usageDurationMs,
49+
tags: taskRun.runTags,
50+
payload: payload,
51+
output: output,
52+
error: taskRun.error ? (taskRun.error as TaskRunError) : undefined,
53+
task_version: taskRun.taskVersion ?? undefined,
54+
sdk_version: taskRun.sdkVersion ?? undefined,
55+
cli_version: taskRun.cliVersion ?? undefined,
56+
machine_preset: taskRun.machinePreset ?? undefined,
57+
is_test: taskRun.isTest ?? false,
58+
root_run_id: taskRun.rootTaskRunId ?? undefined,
59+
parent_run_id: taskRun.parentTaskRunId ?? undefined,
60+
depth: taskRun.depth ?? 0,
61+
span_id: taskRun.spanId ?? undefined,
62+
trace_id: taskRun.traceId ?? undefined,
63+
idempotency_key: taskRun.idempotencyKey ?? undefined,
64+
expiration_ttl: taskRun.ttl ?? undefined,
65+
cost_in_cents: taskRun.costInCents ?? undefined,
66+
base_cost_in_cents: taskRun.baseCostInCents ?? undefined,
67+
});
68+
69+
if (insertError) {
70+
this.logger.error("RunsDashboardService: upsertRun", {
71+
error: insertError,
72+
taskRun,
73+
});
74+
} else {
75+
this.logger.info("RunsDashboardService: upsertRun", {
76+
id: taskRun.id,
77+
friendlyId: taskRun.friendlyId,
78+
status: taskRun.status,
79+
});
80+
}
81+
82+
return insertResult?.executed === true;
83+
}
84+
85+
async #preparePayload(run: TaskRun): Promise<unknown | undefined> {
86+
if (run.status !== "PENDING" && run.status !== "DELAYED") {
87+
return undefined;
88+
}
89+
90+
if (run.payloadType !== "application/json" && run.payloadType !== "application/super+json") {
91+
return undefined;
92+
}
93+
94+
const packet = {
95+
data: run.payload,
96+
dataType: run.payloadType,
97+
};
98+
99+
return await parsePacket(packet);
100+
}
101+
102+
async #prepareOutput(run: TaskRun): Promise<unknown | undefined> {
103+
if (!run.output) {
104+
return undefined;
105+
}
106+
107+
if (run.outputType !== "application/json" && run.outputType !== "application/super+json") {
108+
return undefined;
109+
}
110+
111+
const packet = {
112+
data: run.output,
113+
dataType: run.outputType,
114+
};
115+
116+
return await parsePacket(packet);
117+
}
118+
}
119+
120+
export type RunDashboardEvents = {
121+
runStatusUpdate: [
122+
{
123+
run: TaskRun;
124+
organization: {
125+
id: string;
126+
};
127+
environment: {
128+
type: RuntimeEnvironmentType;
129+
};
130+
}
131+
];
132+
};
133+
134+
export type RunDashboardEventArgs<T extends keyof RunDashboardEvents> = RunDashboardEvents[T];
135+
export type RunDashboardEventBus = EventEmitter<RunDashboardEvents>;

apps/webapp/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@
193193
},
194194
"devDependencies": {
195195
"@internal/testcontainers": "workspace:*",
196+
"@internal/clickhouse": "workspace:*",
196197
"@remix-run/dev": "2.1.0",
197198
"@remix-run/eslint-config": "2.1.0",
198199
"@remix-run/testing": "^2.1.0",
@@ -258,4 +259,4 @@
258259
"engines": {
259260
"node": ">=16.0.0"
260261
}
261-
}
262+
}

0 commit comments

Comments
 (0)