@@ -29,6 +29,7 @@ import EventEmitter from "node:events";
2929import pLimit from "p-limit" ;
3030import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings" ;
3131import { calculateErrorFingerprint } from "~/utils/errorFingerprinting" ;
32+ import { getClickhouseForOrganization } from "~/services/clickhouse/clickhouseFactory.server" ;
3233
3334interface TransactionEvent < T = any > {
3435 tag : "insert" | "update" | "delete" ;
@@ -617,51 +618,59 @@ export class RunsReplicationService {
617618 payloadInserts : payloadInserts . length ,
618619 } ) ;
619620
620- // Group task runs by organization for routing to correct ClickHouse instance
621- const taskRunsByOrg = new Map < string , TaskRunInsertArray [ ] > ( ) ;
622- for ( const taskRun of taskRunInserts ) {
623- const orgId = getTaskRunField ( taskRun , "organization_id" ) ;
624- const orgRuns = taskRunsByOrg . get ( orgId ) || [ ] ;
625- orgRuns . push ( taskRun ) ;
626- taskRunsByOrg . set ( orgId , orgRuns ) ;
627- }
621+ // Task runs are already sorted by org (lines 571-576), so we can stream through
622+ // and flush when org changes - no grouping overhead, no O(n²) lookups
628623
629- // Group payloads by organization (extract from run_id -> task runs mapping)
624+ // Build run_id -> org_id index for O(1) payload->org lookups
625+ const runIdToOrgId = new Map (
626+ taskRunInserts . map ( tr => [ getTaskRunField ( tr , "run_id" ) , getTaskRunField ( tr , "organization_id" ) ] )
627+ ) ;
628+
629+ // Group payloads by org using the index (O(n) instead of O(n²))
630630 const payloadsByOrg = new Map < string , PayloadInsertArray [ ] > ( ) ;
631631 for ( const payload of payloadInserts ) {
632632 const runId = getPayloadField ( payload , "run_id" ) ;
633- // Find the corresponding task run to get its organization
634- const taskRun = taskRunInserts . find ( ( tr ) => getTaskRunField ( tr , "run_id" ) === runId ) ;
635- if ( taskRun ) {
636- const orgId = getTaskRunField ( taskRun , "organization_id" ) ;
637- const orgPayloads = payloadsByOrg . get ( orgId ) || [ ] ;
638- orgPayloads . push ( payload ) ;
639- payloadsByOrg . set ( orgId , orgPayloads ) ;
633+ const orgId = runIdToOrgId . get ( runId ) ;
634+ if ( orgId ) {
635+ const orgPayloads = payloadsByOrg . get ( orgId ) ;
636+ if ( orgPayloads ) {
637+ orgPayloads . push ( payload ) ;
638+ } else {
639+ payloadsByOrg . set ( orgId , [ payload ] ) ;
640+ }
640641 }
641642 }
642643
643- // Insert task runs and payloads with retry logic for connection errors
644- // Process each organization's data in parallel
645- const insertPromises = Array . from ( taskRunsByOrg . entries ( ) ) . map (
646- async ( [ orgId , orgTaskRuns ] ) => {
647- const orgPayloads = payloadsByOrg . get ( orgId ) || [ ] ;
644+ // Stream through task runs, flushing when org changes
645+ const insertPromises : Promise < { taskRunError : Error | null ; payloadError : Error | null ; orgId : string } > [ ] = [ ] ;
646+ let currentOrgId : string | null = null ;
647+ let currentOrgTaskRuns : TaskRunInsertArray [ ] = [ ] ;
648648
649- const [ taskRunError , taskRunResult ] = await this . #insertWithRetry(
650- ( attempt ) => this . #insertTaskRunInserts( orgId , orgTaskRuns , attempt ) ,
651- "task run inserts" ,
652- flushId
653- ) ;
649+ for ( const taskRun of taskRunInserts ) {
650+ const orgId = getTaskRunField ( taskRun , "organization_id" ) ;
654651
655- const [ payloadError , payloadResult ] = await this . #insertWithRetry(
656- ( attempt ) => this . #insertPayloadInserts( orgId , orgPayloads , attempt ) ,
657- "payload inserts" ,
658- flushId
652+ // Org changed? Flush previous org's batch
653+ if ( currentOrgId !== null && currentOrgId !== orgId ) {
654+ const orgPayloads = payloadsByOrg . get ( currentOrgId ) || [ ] ;
655+ insertPromises . push (
656+ this . #insertOrgBatch( currentOrgId , currentOrgTaskRuns , orgPayloads , flushId )
659657 ) ;
660-
661- return { taskRunError, payloadError, orgId } ;
658+ currentOrgTaskRuns = [ ] ;
662659 }
663- ) ;
664660
661+ currentOrgId = orgId ;
662+ currentOrgTaskRuns . push ( taskRun ) ;
663+ }
664+
665+ // Flush final org's batch
666+ if ( currentOrgId !== null && currentOrgTaskRuns . length > 0 ) {
667+ const orgPayloads = payloadsByOrg . get ( currentOrgId ) || [ ] ;
668+ insertPromises . push (
669+ this . #insertOrgBatch( currentOrgId , currentOrgTaskRuns , orgPayloads , flushId )
670+ ) ;
671+ }
672+
673+ // Wait for all org batches to complete (parallel execution)
665674 const results = await Promise . all ( insertPromises ) ;
666675
667676 // Aggregate errors from all organizations
@@ -817,16 +826,34 @@ export class RunsReplicationService {
817826 } ;
818827 }
819828
829+ async #insertOrgBatch(
830+ organizationId : string ,
831+ taskRunInserts : TaskRunInsertArray [ ] ,
832+ payloadInserts : PayloadInsertArray [ ] ,
833+ flushId : string
834+ ) : Promise < { taskRunError : Error | null ; payloadError : Error | null ; orgId : string } > {
835+ const [ taskRunError ] = await this . #insertWithRetry(
836+ ( attempt ) => this . #insertTaskRunInserts( organizationId , taskRunInserts , attempt ) ,
837+ "task run inserts" ,
838+ flushId
839+ ) ;
840+
841+ const [ payloadError ] = await this . #insertWithRetry(
842+ ( attempt ) => this . #insertPayloadInserts( organizationId , payloadInserts , attempt ) ,
843+ "payload inserts" ,
844+ flushId
845+ ) ;
846+
847+ return { taskRunError, payloadError, orgId : organizationId } ;
848+ }
849+
820850 async #insertTaskRunInserts(
821851 organizationId : string ,
822852 taskRunInserts : TaskRunInsertArray [ ] ,
823853 attempt : number
824854 ) {
825855 return await startSpan ( this . _tracer , "insertTaskRunsInserts" , async ( span ) => {
826856 // Get the appropriate ClickHouse client for this organization
827- const { getClickhouseForOrganization } = await import (
828- "~/services/clickhouse/clickhouseFactory.server"
829- ) ;
830857 const clickhouse = await getClickhouseForOrganization ( organizationId , "replication" ) ;
831858
832859 const [ insertError , insertResult ] = await clickhouse . taskRuns . insertCompactArrays (
@@ -860,9 +887,6 @@ export class RunsReplicationService {
860887 ) {
861888 return await startSpan ( this . _tracer , "insertPayloadInserts" , async ( span ) => {
862889 // Get the appropriate ClickHouse client for this organization
863- const { getClickhouseForOrganization } = await import (
864- "~/services/clickhouse/clickhouseFactory.server"
865- ) ;
866890 const clickhouse = await getClickhouseForOrganization ( organizationId , "replication" ) ;
867891
868892 const [ insertError , insertResult ] = await clickhouse . taskRuns . insertPayloadsCompactArrays (
0 commit comments