Skip to content

Commit 4a6e9c2

Browse files
committed
make sure engine v1 runs get synced to CH
1 parent 3349897 commit 4a6e9c2

16 files changed

Lines changed: 70 additions & 15 deletions

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { z } from "zod";
44
import { prisma } from "~/db.server";
55
import { createTag, getTagsForRunId, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
7+
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
78
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
89

910
const ParamsSchema = z.object({
@@ -95,6 +96,8 @@ export async function action({ request, params }: ActionFunctionArgs) {
9596
},
9697
});
9798

99+
emitRunStatusUpdate(taskRun.id);
100+
98101
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
99102
} catch (error) {
100103
return json(

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,8 @@ export class RunEngineTriggerTaskService {
305305
? parentRun.queueTimestamp ?? undefined
306306
: undefined,
307307
runChainState,
308+
scheduleId: options.scheduleId,
309+
scheduleInstanceId: options.scheduleInstanceId,
308310
},
309311
this.prisma
310312
);

apps/webapp/app/services/runsDashboardInstance.server.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
RunDashboardEvents,
1010
RunsDashboardService,
1111
} from "./runsDashboardService.server";
12+
import { tryCatch } from "@trigger.dev/core/utils";
1213

1314
const runDashboardEventBus: RunDashboardEventBus = new EventEmitter<RunDashboardEvents>();
1415

@@ -25,11 +26,25 @@ export const runsDashboard = singleton("runsDashboard", () => {
2526
const service = new RunsDashboardService(clickhouse);
2627

2728
runDashboardEventBus.on("runStatusUpdate", async (event) => {
28-
await upsertRun(event.time, event.runId, service);
29+
const [upsertError] = await tryCatch(upsertRun(event.time, event.runId, service));
30+
31+
if (upsertError) {
32+
logger.error("RunDashboard: runStatusUpdate: upsertRun error", {
33+
runId: event.runId,
34+
error: upsertError,
35+
});
36+
}
2937
});
3038

3139
engine.eventBus.on("runStatusChanged", async (event) => {
32-
await upsertRun(event.time, event.runId, service);
40+
const [upsertError] = await tryCatch(upsertRun(event.time, event.runId, service));
41+
42+
if (upsertError) {
43+
logger.error("RunDashboard: runStatusChanged: upsertRun error", {
44+
runId: event.runId,
45+
error: upsertError,
46+
});
47+
}
3348
});
3449

3550
return service;

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
2323
import { DevSubscriber, devPubSub } from "./devPubSub.server";
2424
import { findQueueInEnvironment, sanitizeQueueName } from "~/models/taskQueue.server";
2525
import { createRedisClient, RedisClient } from "~/redis.server";
26+
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
2627

2728
const MessageBody = z.discriminatedUnion("type", [
2829
z.object({
@@ -539,6 +540,8 @@ export class DevQueueConsumer {
539540
messageId: message.messageId,
540541
});
541542

543+
emitRunStatusUpdate(lockedTaskRun.id);
544+
542545
this._inProgressRuns.set(lockedTaskRun.friendlyId, message.messageId);
543546
} catch (e) {
544547
if (e instanceof Error) {

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import {
6666
import { tracer } from "../tracer.server";
6767
import { getMaxDuration } from "../utils/maxDuration";
6868
import { MessagePayload } from "./types";
69+
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
6970

7071
const WithTraceContext = z.object({
7172
traceparent: z.string().optional(),
@@ -841,6 +842,8 @@ export class SharedQueueConsumer {
841842
};
842843
}
843844

845+
emitRunStatusUpdate(lockedTaskRun.id);
846+
844847
return {
845848
action: "noop",
846849
reason: "restored_checkpoint",
@@ -922,6 +925,8 @@ export class SharedQueueConsumer {
922925
},
923926
});
924927

928+
emitRunStatusUpdate(lockedTaskRun.id);
929+
925930
return {
926931
action: "noop",
927932
reason: "scheduled_attempt",
@@ -1430,14 +1435,16 @@ export class SharedQueueConsumer {
14301435
async #markRunAsWaitingForDeploy(runId: string) {
14311436
logger.debug("Marking run as waiting for deploy", { runId });
14321437

1433-
return await prisma.taskRun.update({
1438+
await prisma.taskRun.update({
14341439
where: {
14351440
id: runId,
14361441
},
14371442
data: {
14381443
status: "WAITING_FOR_DEPLOY",
14391444
},
14401445
});
1446+
1447+
emitRunStatusUpdate(runId);
14411448
}
14421449

14431450
async #resolveCompletedAttemptsForResumeMessage(

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
3434
import { RetryAttemptService } from "./retryAttempt.server";
3535
import { getTaskEventStoreTableForRun } from "../taskEventStore.server";
3636
import { socketIo } from "../handleSocketIo.server";
37+
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
3738

3839
type FoundAttempt = Awaited<ReturnType<typeof findAttempt>>;
3940

@@ -614,6 +615,8 @@ export class CompleteAttemptService extends BaseService {
614615
},
615616
});
616617

618+
emitRunStatusUpdate(taskRunAttempt.taskRunId);
619+
617620
if (environment.type === "DEVELOPMENT") {
618621
await marqs.requeueMessage(taskRunAttempt.taskRunId, {}, executionRetry.timestamp, "retry");
619622

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { CrashTaskRunService } from "./crashTaskRun.server";
1212
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
1313
import { findQueueInEnvironment } from "~/models/taskQueue.server";
1414
import { FINAL_RUN_STATUSES } from "../taskStatus";
15+
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
1516

1617
export class CreateTaskRunAttemptService extends BaseService {
1718
public async call({
@@ -180,6 +181,8 @@ export class CreateTaskRunAttemptService extends BaseService {
180181
});
181182
}
182183

184+
emitRunStatusUpdate(taskRun.id);
185+
183186
const machinePreset =
184187
machinePresetFromRun(taskRun) ?? machinePresetFromConfig(lockedBy.machineConfig ?? {});
185188

apps/webapp/app/v3/services/enqueueDelayedRun.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
77
import { commonWorker } from "../commonWorker.server";
88
import { workerQueue } from "~/services/worker.server";
99
import { enqueueRun } from "./enqueueRun.server";
10+
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
1011

1112
export class EnqueueDelayedRunService extends BaseService {
1213
public static async enqueue(runId: string, runAt?: Date) {
@@ -102,6 +103,8 @@ export class EnqueueDelayedRunService extends BaseService {
102103
}
103104
});
104105

106+
emitRunStatusUpdate(run.id);
107+
105108
await enqueueRun({
106109
env: run.runtimeEnvironment,
107110
run: run,

apps/webapp/app/v3/services/executeTasksWaitingForDeploy.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { marqs } from "~/v3/marqs/index.server";
44
import { BaseService } from "./baseService.server";
55
import { logger } from "~/services/logger.server";
66
import { env } from "~/env.server";
7+
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
78

89
export class ExecuteTasksWaitingForDeployService extends BaseService {
910
public async call(backgroundWorkerId: string) {
@@ -78,6 +79,10 @@ export class ExecuteTasksWaitingForDeployService extends BaseService {
7879
});
7980
}
8081

82+
for (const run of runsWaitingForDeploy) {
83+
emitRunStatusUpdate(run.id);
84+
}
85+
8186
for (const run of runsWaitingForDeploy) {
8287
await marqs?.enqueueMessage(
8388
backgroundWorker.runtimeEnvironment,

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { completeBatchTaskRunItemV3 } from "./batchTriggerV3.server";
1919
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
2020
import { ResumeBatchRunService } from "./resumeBatchRun.server";
2121
import { ResumeDependentParentsService } from "./resumeDependentParents.server";
22+
import { emitRunStatusUpdate } from "~/services/runsDashboardInstance.server";
2223

2324
type BaseInput = {
2425
id: string;
@@ -100,6 +101,8 @@ export class FinalizeTaskRunService extends BaseService {
100101
...(include ? { include } : {}),
101102
});
102103

104+
emitRunStatusUpdate(run.id);
105+
103106
if (run.ttl) {
104107
await ExpireEnqueuedRunService.ack(run.id);
105108
}

0 commit comments

Comments
 (0)