Skip to content

Commit 8b77b4c

Browse files
committed
Implement organization-scoped ClickHouse instances
The only way to get a ClickHouse client now is through the factory. Refactored all existing code to use that and pass in an org. The runReplication and otlpExporter are the hot paths here which need special attention in reviews.
1 parent 7d82041 commit 8b77b4c

39 files changed

Lines changed: 959 additions & 234 deletions

File tree

.cursor/mcp.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
11
{
2-
"mcpServers": {}
2+
"mcpServers": {
3+
"linear": {
4+
"url": "https://mcp.linear.app/mcp"
5+
}
6+
}
37
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Organization-scoped ClickHouse routing enables customers with HIPAA and other data security requirements to use dedicated database instances

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { type Project, type RuntimeEnvironment, type TaskRunStatus } from "@trig
99
import assertNever from "assert-never";
1010
import { z } from "zod";
1111
import { API_VERSIONS, RunStatusUnspecifiedApiVersion } from "~/api/versions";
12-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
12+
import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
1313
import { logger } from "~/services/logger.server";
1414
import { CoercedDate } from "~/utils/zod";
1515
import { ServiceValidationError } from "~/v3/services/baseService.server";
@@ -259,7 +259,8 @@ export class ApiRunListPresenter extends BasePresenter {
259259
options.machines = searchParams["filter[machine]"];
260260
}
261261

262-
const presenter = new NextRunListPresenter(this._replica, clickhouseClient);
262+
const clickhouse = await getClickhouseForOrganization(organizationId, "standard");
263+
const presenter = new NextRunListPresenter(this._replica, clickhouse);
263264

264265
logger.debug("Calling RunListPresenter", { options });
265266

apps/webapp/app/presenters/v3/CreateBulkActionPresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { type PrismaClient } from "@trigger.dev/database";
22
import { CreateBulkActionSearchParams } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction";
3-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
3+
import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
44
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
55
import { getRunFiltersFromRequest } from "../RunFilters.server";
66
import { BasePresenter } from "./basePresenter.server";
@@ -24,8 +24,9 @@ export class CreateBulkActionPresenter extends BasePresenter {
2424
Object.fromEntries(new URL(request.url).searchParams)
2525
);
2626

27+
const clickhouse = await getClickhouseForOrganization(organizationId, "standard");
2728
const runsRepository = new RunsRepository({
28-
clickhouse: clickhouseClient,
29+
clickhouse,
2930
prisma: this._replica as PrismaClient,
3031
});
3132

apps/webapp/app/presenters/v3/RunTagListPresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
22
import { BasePresenter } from "./basePresenter.server";
3-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
3+
import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
44
import { type PrismaClient } from "@trigger.dev/database";
55
import { timeFilters } from "~/components/runs/v3/SharedFilters";
66

@@ -37,8 +37,9 @@ export class RunTagListPresenter extends BasePresenter {
3737
}: TagListOptions) {
3838
const hasFilters = Boolean(name?.trim());
3939

40+
const clickhouse = await getClickhouseForOrganization(organizationId, "standard");
4041
const runsRepository = new RunsRepository({
41-
clickhouse: clickhouseClient,
42+
clickhouse,
4243
prisma: this._replica as PrismaClient,
4344
});
4445

apps/webapp/app/presenters/v3/TaskListPresenter.server.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
type TaskTriggerSource,
55
} from "@trigger.dev/database";
66
import { $replica } from "~/db.server";
7-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
7+
import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
88
import {
99
type AverageDurations,
1010
ClickHouseEnvironmentMetricsRepository,
@@ -25,10 +25,7 @@ export type TaskListItem = {
2525
export type TaskActivity = DailyTaskActivity[string];
2626

2727
export class TaskListPresenter {
28-
constructor(
29-
private readonly environmentMetricsRepository: EnvironmentMetricsRepository,
30-
private readonly _replica: PrismaClientOrTransaction
31-
) {}
28+
constructor(private readonly _replica: PrismaClientOrTransaction) {}
3229

3330
public async call({
3431
organizationId,
@@ -76,25 +73,31 @@ export class TaskListPresenter {
7673

7774
const slugs = tasks.map((t) => t.slug);
7875

76+
// Create org-specific environment metrics repository
77+
const clickhouse = await getClickhouseForOrganization(organizationId, "standard");
78+
const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({
79+
clickhouse,
80+
});
81+
7982
// IMPORTANT: Don't await these, we want to return the promises
8083
// so we can defer the loading of the data
81-
const activity = this.environmentMetricsRepository.getDailyTaskActivity({
84+
const activity = environmentMetricsRepository.getDailyTaskActivity({
8285
organizationId,
8386
projectId,
8487
environmentId,
8588
days: 6, // This actually means 7 days, because we want to show the current day too
8689
tasks: slugs,
8790
});
8891

89-
const runningStats = this.environmentMetricsRepository.getCurrentRunningStats({
92+
const runningStats = environmentMetricsRepository.getCurrentRunningStats({
9093
organizationId,
9194
projectId,
9295
environmentId,
9396
days: 6,
9497
tasks: slugs,
9598
});
9699

97-
const durations = this.environmentMetricsRepository.getAverageDurations({
100+
const durations = environmentMetricsRepository.getAverageDurations({
98101
organizationId,
99102
projectId,
100103
environmentId,
@@ -109,9 +112,5 @@ export class TaskListPresenter {
109112
export const taskListPresenter = singleton("taskListPresenter", setupTaskListPresenter);
110113

111114
function setupTaskListPresenter() {
112-
const environmentMetricsRepository = new ClickHouseEnvironmentMetricsRepository({
113-
clickhouse: clickhouseClient,
114-
});
115-
116-
return new TaskListPresenter(environmentMetricsRepository, $replica);
115+
return new TaskListPresenter($replica);
117116
}

apps/webapp/app/presenters/v3/UsagePresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { getUsage, getUsageSeries } from "~/services/platform.v3.server";
44
import { createTimeSeriesData } from "~/utils/graphs";
55
import { BasePresenter } from "./basePresenter.server";
66
import { DataPoint, linear } from "regression";
7-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
7+
import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
88

99
type Options = {
1010
organizationId: string;
@@ -124,7 +124,8 @@ async function getTaskUsageByOrganization(
124124
endOfMonth: Date,
125125
replica: PrismaClientOrTransaction
126126
) {
127-
const [queryError, tasks] = await clickhouseClient.taskRuns.getTaskUsageByOrganization({
127+
const clickhouse = await getClickhouseForOrganization(organizationId, "standard");
128+
const [queryError, tasks] = await clickhouse.taskRuns.getTaskUsageByOrganization({
128129
startTime: startOfMonth.getTime(),
129130
endTime: endOfMonth.getTime(),
130131
organizationId,

apps/webapp/app/presenters/v3/ViewSchedulePresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { ScheduleObject } from "@trigger.dev/core/v3";
22
import { PrismaClient, prisma } from "~/db.server";
33
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
4-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
4+
import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
55
import { nextScheduledTimestamps } from "~/v3/utils/calculateNextSchedule.server";
66
import { NextRunListPresenter } from "./NextRunListPresenter.server";
77
import { scheduleWhereClause } from "~/models/schedules.server";
@@ -75,7 +75,8 @@ export class ViewSchedulePresenter {
7575
? nextScheduledTimestamps(schedule.generatorExpression, schedule.timezone, new Date(), 5)
7676
: [];
7777

78-
const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouseClient);
78+
const clickhouse = await getClickhouseForOrganization(schedule.project.organizationId, "standard");
79+
const runPresenter = new NextRunListPresenter(this.#prismaClient, clickhouse);
7980
const { runs } = await runPresenter.call(schedule.project.organizationId, environmentId, {
8081
projectId: schedule.project.id,
8182
scheduleId: schedule.id,

apps/webapp/app/presenters/v3/WaitpointPresenter.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { isWaitpointOutputTimeout, prettyPrintPacket } from "@trigger.dev/core/v3";
2-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
2+
import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
33
import { generateHttpCallbackUrl } from "~/services/httpCallback.server";
44
import { logger } from "~/services/logger.server";
55
import { BasePresenter } from "./basePresenter.server";
@@ -79,7 +79,8 @@ export class WaitpointPresenter extends BasePresenter {
7979
const connectedRuns: NextRunListItem[] = [];
8080

8181
if (connectedRunIds.length > 0) {
82-
const runPresenter = new NextRunListPresenter(this._prisma, clickhouseClient);
82+
const clickhouse = await getClickhouseForOrganization(waitpoint.environment.organizationId, "standard");
83+
const runPresenter = new NextRunListPresenter(this._prisma, clickhouse);
8384
const { runs } = await runPresenter.call(
8485
waitpoint.environment.organizationId,
8586
environmentId,

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.dashboards.$dashboardKey/route.tsx

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import {
3232
MetricDashboardPresenter,
3333
} from "~/presenters/v3/MetricDashboardPresenter.server";
3434
import { PromptPresenter } from "~/presenters/v3/PromptPresenter.server";
35-
import { clickhouseClient } from "~/services/clickhouseInstance.server";
35+
import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
3636
import { requireUser } from "~/services/session.server";
3737
import { cn } from "~/utils/cn";
3838
import { EnvironmentParamSchema } from "~/utils/pathBuilder";
@@ -75,10 +75,12 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
7575

7676
const filters = dashboard.filters ?? ["tasks", "queues"];
7777

78+
const clickhouse = await getClickhouseForOrganization(project.organizationId, "standard");
79+
7880
// Load distinct models from ClickHouse if the dashboard has a models filter
7981
let possibleModels: { model: string; system: string }[] = [];
8082
if (filters.includes("models")) {
81-
const queryFn = clickhouseClient.reader.query({
83+
const queryFn = clickhouse.reader.query({
8284
name: "getDistinctModels",
8385
query: `SELECT response_model, any(gen_ai_system) AS gen_ai_system FROM trigger_dev.llm_metrics_v1 WHERE organization_id = {organizationId: String} AND project_id = {projectId: String} AND environment_id = {environmentId: String} AND response_model != '' GROUP BY response_model ORDER BY response_model`,
8486
params: z.object({
@@ -98,7 +100,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
98100
}
99101
}
100102

101-
const promptPresenter = new PromptPresenter(clickhouseClient);
103+
const promptPresenter = new PromptPresenter(clickhouse);
102104
const [possiblePrompts, possibleOperations, possibleProviders] = await Promise.all([
103105
filters.includes("prompts")
104106
? promptPresenter.getDistinctPromptSlugs(project.organizationId, project.id, environment.id)

0 commit comments

Comments
 (0)