Skip to content

Commit add30c5

Browse files
committed
otlpExporter.server reverted to main
1 parent 98f9aa4 commit add30c5

1 file changed

Lines changed: 10 additions & 30 deletions

File tree

apps/webapp/app/v3/otlpExporter.server.ts

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
} from "@trigger.dev/otlp-importer";
2121
import type { MetricsV1Input } from "@internal/clickhouse";
2222
import { logger } from "~/services/logger.server";
23+
import { clickhouseClient } from "~/services/clickhouseInstance.server";
2324
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
2425
import { ClickhouseEventRepository } from "./eventRepository/clickhouseEventRepository.server";
2526
import {
@@ -40,7 +41,6 @@ import { waitForLlmPricingReady } from "./llmPricingRegistry.server";
4041
import { env } from "~/env.server";
4142
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
4243
import { singleton } from "~/utils/singleton";
43-
import { getClickhouseForOrganization, getEventRepositoryForOrganization } from "~/services/clickhouse/clickhouseFactory.server";
4444

4545
class OTLPExporter {
4646
private _tracer: Tracer;
@@ -118,26 +118,21 @@ class OTLPExporter {
118118
async #exportEvents(
119119
eventsWithStores: { events: Array<CreateEventInput>; taskEventStore: string }[]
120120
) {
121-
// Group events by both store and organization for proper routing
122-
const eventsGroupedByStoreAndOrg = eventsWithStores.reduce((acc, { events, taskEventStore }) => {
123-
for (const event of events) {
124-
const orgId = event.organizationId || "default";
125-
const key = `${taskEventStore}:${orgId}`;
126-
acc[key] = acc[key] || { store: taskEventStore, orgId, events: [] };
127-
acc[key].events.push(event);
128-
}
121+
const eventsGroupedByStore = eventsWithStores.reduce((acc, { events, taskEventStore }) => {
122+
acc[taskEventStore] = acc[taskEventStore] || [];
123+
acc[taskEventStore].push(...events);
129124
return acc;
130-
}, {} as Record<string, { store: string; orgId: string; events: Array<CreateEventInput> }>);
125+
}, {} as Record<string, Array<CreateEventInput>>);
131126

132127
let eventCount = 0;
133128

134-
for (const { store, orgId, events } of Object.values(eventsGroupedByStoreAndOrg)) {
135-
const eventRepository = await this.#getEventRepositoryForStoreAndOrg(store, orgId);
129+
for (const [store, events] of Object.entries(eventsGroupedByStore)) {
130+
const eventRepository = this.#getEventRepositoryForStore(store);
136131

137132
await waitForLlmPricingReady();
138133
const enrichedEvents = enrichCreatableEvents(events);
139134

140-
this.#logEventsVerbose(enrichedEvents, `exportEvents ${store}:${orgId}`);
135+
this.#logEventsVerbose(enrichedEvents, `exportEvents ${store}`);
141136

142137
eventCount += enrichedEvents.length;
143138

@@ -147,16 +142,6 @@ class OTLPExporter {
147142
return eventCount;
148143
}
149144

150-
async #getEventRepositoryForStoreAndOrg(store: string, orgId: string): Promise<IEventRepository> {
151-
// For ClickHouse stores with a specific org (not "default"), use org-specific repository
152-
if ((store === "clickhouse" || store === "clickhouse_v2") && orgId !== "default") {
153-
return await getEventRepositoryForOrganization(orgId);
154-
}
155-
156-
// Fall back to default repositories for non-ClickHouse stores or default org
157-
return this.#getEventRepositoryForStore(store);
158-
}
159-
160145
#getEventRepositoryForStore(store: string): IEventRepository {
161146
if (store === "clickhouse") {
162147
return this._clickhouseEventRepository;
@@ -1187,17 +1172,12 @@ function hasUnpairedSurrogateAtEnd(str: string): boolean {
11871172

11881173
export const otlpExporter = singleton("otlpExporter", initializeOTLPExporter);
11891174

1190-
async function initializeOTLPExporter() {
1191-
// Metrics are written globally (not per-org), use standard clickhouse
1192-
// Use a sentinel org ID for global metrics writes
1193-
// In practice, all orgs currently share the same metrics table/instance
1194-
const metricsClickhouse = await getClickhouseForOrganization("METRICS_GLOBAL", "standard");
1195-
1175+
function initializeOTLPExporter() {
11961176
const metricsFlushScheduler = new DynamicFlushScheduler<MetricsV1Input>({
11971177
batchSize: env.METRICS_CLICKHOUSE_BATCH_SIZE,
11981178
flushInterval: env.METRICS_CLICKHOUSE_FLUSH_INTERVAL_MS,
11991179
callback: async (_flushId, batch) => {
1200-
await metricsClickhouse.metrics.insert(batch);
1180+
await clickhouseClient.metrics.insert(batch);
12011181
},
12021182
minConcurrency: 1,
12031183
maxConcurrency: env.METRICS_CLICKHOUSE_MAX_CONCURRENCY,

0 commit comments

Comments
 (0)