diff --git a/db/queries/queries.go b/db/queries/queries.go index 8dfe37c..a722c55 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -2913,18 +2913,53 @@ func DropCDCMetadataTable(ctx context.Context, db DBQuerier) error { return nil } -func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (string, string, []string, error) { +// pubCommitLSN is empty for legacy metadata rows that pre-date the +// pub_commit_lsn column; callers must treat empty as "invariant +// uncheckable" and skip the publication-commit guard with a warning. +func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (slotName, startLSN string, tables []string, pubCommitLSN string, err error) { sql, err := RenderSQL(SQLTemplates.GetCDCMetadata, nil) if err != nil { - return "", "", nil, err + return "", "", nil, "", err } - var slotName, startLSN string - var tables []string - err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables) + err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables, &pubCommitLSN) + if err != nil { + return "", "", nil, "", err + } + return slotName, startLSN, tables, pubCommitLSN, nil +} + +// Init path only. Ongoing flushes use UpdateCDCMetadata, which deliberately +// leaves pub_commit_lsn untouched so the listen.go guard always compares +// against the LSN captured at the matching init. +func InitCDCMetadata(ctx context.Context, db DBQuerier, publicationName, slotName, startLSN, pubCommitLSN string, tables []string) error { + sql, err := RenderSQL(SQLTemplates.InitCDCMetadata, nil) + if err != nil { + return fmt.Errorf("failed to render InitCDCMetadata SQL: %w", err) + } + if tables == nil { + tables = []string{} + } + _, err = db.Exec(ctx, sql, publicationName, slotName, startLSN, pubCommitLSN, tables) if err != nil { - return "", "", nil, err + return fmt.Errorf("query to init cdc metadata failed: %w", err) } - return slotName, startLSN, tables, nil + return nil +} + +// Called mid-Phase-A, after CREATE PUBLICATION, so the captured value is +// strictly less than Phase A's commit LSN. The slot created in Phase B +// has consistent_point >= that commit LSN, hence consistent_point > +// captured value: a safe lower bound for any valid replication start LSN. +func CurrentWalInsertLSN(ctx context.Context, db DBQuerier) (string, error) { + sql, err := RenderSQL(SQLTemplates.CurrentWalInsertLSN, nil) + if err != nil { + return "", fmt.Errorf("failed to render CurrentWalInsertLSN SQL: %w", err) + } + var lsn string + if err := db.QueryRow(ctx, sql).Scan(&lsn); err != nil { + return "", fmt.Errorf("failed to fetch current WAL insert LSN: %w", err) + } + return lsn, nil } func UpdateMtreeCounters(ctx context.Context, db DBQuerier, mtreeTable string, isComposite bool, compositeTypeName string, pkeyType string, inserts, deletes, updates []string) error { diff --git a/db/queries/templates.go b/db/queries/templates.go index 6a0ffb8..88a2a9f 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -145,6 +145,9 @@ type Templates struct { ResetReplicationOriginSession *template.Template SetupReplicationOriginXact *template.Template ResetReplicationOriginXact *template.Template + + InitCDCMetadata *template.Template + CurrentWalInsertLSN *template.Template } var SQLTemplates = Templates{ @@ -221,6 +224,39 @@ var SQLTemplates = Templates{ last_updated = EXCLUDED.last_updated `)), + // On conflict every column including pub_commit_lsn is refreshed: + // reaching this query path means a re-init, which created a fresh + // publication with a new commit LSN, and listen.go's guard must + // compare against that current LSN — not a stale prior one. + InitCDCMetadata: template.Must(template.New("initCdcMetadata").Funcs(aceTemplateFuncs).Parse(` + INSERT INTO + {{aceSchema}}.ace_cdc_metadata ( + publication_name, + slot_name, + start_lsn, + pub_commit_lsn, + tables, + last_updated + ) + VALUES + ( + $1, + $2, + $3, + $4, + $5, + current_timestamp + ) + ON CONFLICT (publication_name) DO + UPDATE + SET + slot_name = EXCLUDED.slot_name, + start_lsn = EXCLUDED.start_lsn, + pub_commit_lsn = EXCLUDED.pub_commit_lsn, + tables = EXCLUDED.tables, + last_updated = EXCLUDED.last_updated + `)), + DropPublication: template.Must(template.New("dropPublication").Parse(` DROP PUBLICATION IF EXISTS {{.PublicationName}} `)), @@ -240,15 +276,25 @@ var SQLTemplates = Templates{ DROP TABLE IF EXISTS {{aceSchema}}.ace_cdc_metadata `)), + // pub_commit_lsn is extracted via to_jsonb(row) ->> 'pub_commit_lsn' + // instead of a direct column reference so the query parses and runs + // against pre-migration ace_cdc_metadata tables that lack the column. + // On legacy 3-column rows the JSON object has no pub_commit_lsn key, + // ->> returns NULL, and COALESCE produces the empty string the + // listen.go guard treats as "invariant uncheckable, warn and skip". + // The additive ALTER TABLE in CreateCDCMetadataTable still backfills + // the column on the next MtreeInit so post-init reads use the real + // column path. GetCDCMetadata: template.Must(template.New("getCDCMetadata").Funcs(aceTemplateFuncs).Parse(` SELECT - slot_name, - start_lsn, - tables + m.slot_name, + m.start_lsn, + m.tables, + COALESCE(to_jsonb(m) ->> 'pub_commit_lsn', '') AS pub_commit_lsn FROM - {{aceSchema}}.ace_cdc_metadata + {{aceSchema}}.ace_cdc_metadata AS m WHERE - publication_name = $1 + m.publication_name = $1 `)), UpdateMtreeCounters: template.Must(template.New("updateMtreeCounters").Parse(` @@ -338,9 +384,13 @@ var SQLTemplates = Templates{ publication_name text PRIMARY KEY, slot_name text, start_lsn text, + pub_commit_lsn text, tables text[], last_updated timestamptz - )`), + ); + -- Forward-compatible addition for clusters that ran older versions. + ALTER TABLE {{aceSchema}}.ace_cdc_metadata + ADD COLUMN IF NOT EXISTS pub_commit_lsn text;`), ), GetPrimaryKey: template.Must(template.New("getPrimaryKey").Parse(` SELECT @@ -1599,4 +1649,13 @@ var SQLTemplates = Templates{ ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(` SELECT pg_replication_origin_xact_reset() `)), + + // CurrentWalInsertLSN returns pg_current_wal_insert_lsn() as text. Used + // at MtreeInit time, just after CREATE PUBLICATION, as the lower bound + // for any future replication start LSN. The slot's consistent point is + // guaranteed to be >= this value because the slot is created after this + // transaction commits. + CurrentWalInsertLSN: template.Must(template.New("currentWalInsertLSN").Parse(` + SELECT pg_current_wal_insert_lsn()::text + `)), } diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 4b6bd03..4ef6300 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -807,51 +807,100 @@ func (m *MerkleTreeTask) MtreeInit() (err error) { cfg := config.Get().MTree.CDC for _, nodeInfo := range m.ClusterNodes { - logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) - - lsn, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo) - if err != nil { - return fmt.Errorf("failed to set up replication slot on node %s: %w", nodeInfo["Name"], err) + if err := m.initOneNode(nodeInfo, cfg.PublicationName, cfg.SlotName); err != nil { + return err } + } + return nil +} - pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) - if err != nil { - return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err) - } - defer pool.Close() +// initOneNode runs MtreeInit on a single node in three phases: +// +// 1. Phase A (tx 1): create schema, helpers, the CDC metadata table, +// create the publication, and capture the publication's commit LSN. +// Commit. +// 2. Phase B (no tx): create the logical replication slot via a fresh +// replication-mode connection. Its consistent point is now +// guaranteed to be > the publication's commit LSN, fixing the +// "publication does not exist" replay error. +// 3. Phase C (tx 2): persist slot/start_lsn/pub_commit_lsn into +// ace_cdc_metadata. +// +// Each iteration runs inside this function so deferred Close/Rollback +// calls fire per-node rather than at MtreeInit return. +func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, slotName string) error { + logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) - if err = queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { - return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err) - } + pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) + if err != nil { + return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err) + } + defer pool.Close() + if err := queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { + return fmt.Errorf("failed to create schema '%s' on node %s: %w", m.aceSchema(), nodeInfo["Name"], err) + } + + var pubCommitLSN string + if err := func() (err error) { tx, err := pool.Begin(m.Ctx) if err != nil { - return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) + return err } defer tx.Rollback(m.Ctx) - if err = queries.CreateXORFunction(m.Ctx, tx); err != nil { - return fmt.Errorf("failed to create xor function: %w", err) + if err := queries.CreateXORFunction(m.Ctx, tx); err != nil { + return fmt.Errorf("create xor function: %w", err) } - - if err = queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil { - return fmt.Errorf("failed to create cdc metadata table: %w", err) + if err := queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil { + return fmt.Errorf("create cdc metadata table: %w", err) } - - if err := cdc.SetupPublication(m.Ctx, tx, cfg.PublicationName); err != nil { - return fmt.Errorf("failed to setup publication on node %s: %w", nodeInfo["Name"], err) + if err := cdc.SetupPublication(m.Ctx, tx, publicationName); err != nil { + return fmt.Errorf("setup publication: %w", err) } - - if err = queries.UpdateCDCMetadata(m.Ctx, tx, cfg.PublicationName, cfg.SlotName, lsn.String(), []string{}); err != nil { - return fmt.Errorf("failed to update cdc metadata: %w", err) + // Captured mid-tx after CREATE PUBLICATION, so the value is + // strictly less than this tx's commit LSN; Phase B's slot + // consistent_point is >= that commit LSN. listen.go's guard + // therefore catches any start LSN that predates the publication. + pubCommitLSN, err = queries.CurrentWalInsertLSN(m.Ctx, tx) + if err != nil { + return fmt.Errorf("capture publication commit LSN: %w", err) } + return tx.Commit(m.Ctx) + }(); err != nil { + return fmt.Errorf("phase A failed on node %s: %w", nodeInfo["Name"], err) + } - if err := tx.Commit(m.Ctx); err != nil { - return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err) + slotLSN, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo) + if err != nil { + return fmt.Errorf("phase B (replication slot) failed on node %s: %w", nodeInfo["Name"], err) + } + + if err := func() (err error) { + tx, err := pool.Begin(m.Ctx) + if err != nil { + return err } + defer tx.Rollback(m.Ctx) - logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"]) + if err := queries.InitCDCMetadata(m.Ctx, tx, + publicationName, slotName, + slotLSN.String(), pubCommitLSN, []string{}); err != nil { + return fmt.Errorf("write cdc metadata: %w", err) + } + return tx.Commit(m.Ctx) + }(); err != nil { + // Slot leak mitigation: Phase B created a slot that we failed to + // record. A subsequent SetupReplicationSlot will drop any prior + // slot of the same name, so a re-run reaps it — but try a best + // effort drop here too to keep the cluster tidy. + if dropErr := queries.DropReplicationSlot(m.Ctx, pool, slotName); dropErr != nil { + logger.Warn("failed to drop orphaned slot %s on node %s: %v", slotName, nodeInfo["Name"], dropErr) + } + return fmt.Errorf("phase C failed on node %s: %w", nodeInfo["Name"], err) } + + logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"]) return nil } @@ -1281,6 +1330,21 @@ func (m *MerkleTreeTask) BuildMtree() (err error) { cfg := config.Get().MTree.CDC pools := make(map[string]*pgxpool.Pool, len(m.ClusterNodes)) + // Safety net for early returns. The row-estimate loop closes pools + // itself on its error path, and the per-node build loop closes each + // node's pool via its inner deferred close, but neither covers + // pools belonging to nodes the build loop never reaches when an + // earlier iteration errors out. pgxpool.Pool.Close is idempotent + // (sync.Once internally), so the per-iteration closes still take + // effect promptly and this defer is a no-op for pools already closed. + defer func() { + for _, p := range pools { + if p != nil { + p.Close() + } + } + }() + numWorkers := int(math.Ceil(float64(runtime.NumCPU()) * m.MaxCpuRatio * 2)) if numWorkers < 1 { numWorkers = 1 @@ -1402,83 +1466,88 @@ func (m *MerkleTreeTask) BuildMtree() (err error) { } for _, nodeInfo := range m.ClusterNodes { - logger.Info("Processing node: %s", nodeInfo["Name"]) - pool, ok := pools[nodeInfo["Name"].(string)] - if !ok { - return fmt.Errorf("could not find node %s in pools", nodeInfo["Name"]) - } - publicationName := cfg.PublicationName - err := queries.AlterPublicationAddTable(m.Ctx, pool, publicationName, m.QualifiedTableName) - if err != nil { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) && pgErr.Code == tableAlreadyInPublicationError { - logger.Info("Table %s is already in publication %s on node %s", m.QualifiedTableName, publicationName, nodeInfo["Name"]) + // Per-iteration body in a closure so defer pool.Close and defer + // tx.Rollback fire at end-of-iteration in the correct LIFO order. + // Without this scoping, hitting any error inside the body called + // pool.Close while the tx still held a pooled conn — pool.Close + // blocks waiting for that conn to be released, but the deferred + // tx.Rollback that would release it cannot run until the function + // returns, which cannot happen while pool.Close is blocking. The + // process then hangs and the underlying error never surfaces. + if err := func() error { + logger.Info("Processing node: %s", nodeInfo["Name"]) + pool, ok := pools[nodeInfo["Name"].(string)] + if !ok { + return fmt.Errorf("could not find node %s in pools", nodeInfo["Name"]) + } + defer pool.Close() + + publicationName := cfg.PublicationName + err := queries.AlterPublicationAddTable(m.Ctx, pool, publicationName, m.QualifiedTableName) + if err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code == tableAlreadyInPublicationError { + logger.Info("Table %s is already in publication %s on node %s", m.QualifiedTableName, publicationName, nodeInfo["Name"]) + } else { + return fmt.Errorf("failed to add table to publication on node %s: %w", nodeInfo["Name"], err) + } } else { - pool.Close() - return fmt.Errorf("failed to add table to publication on node %s: %w", nodeInfo["Name"], err) + logger.Info("Added table %s to publication %s on node %s", m.QualifiedTableName, publicationName, nodeInfo["Name"]) } - } else { - logger.Info("Added table %s to publication %s on node %s", m.QualifiedTableName, publicationName, nodeInfo["Name"]) - } - tx, err := pool.Begin(m.Ctx) - if err != nil { - return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) - } - defer tx.Rollback(m.Ctx) + tx, err := pool.Begin(m.Ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) + } + defer tx.Rollback(m.Ctx) - slotName, startLSN, tables, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) - if err != nil { - pool.Close() - return fmt.Errorf("failed to get cdc metadata on node %s: %w", nodeInfo["Name"], err) - } + slotName, startLSN, tables, _, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) + if err != nil { + return fmt.Errorf("failed to get cdc metadata on node %s: %w", nodeInfo["Name"], err) + } - if !slices.Contains(tables, m.QualifiedTableName) { - tables = append(tables, m.QualifiedTableName) - } + if !slices.Contains(tables, m.QualifiedTableName) { + tables = append(tables, m.QualifiedTableName) + } - err = queries.UpdateCDCMetadata(m.Ctx, tx, publicationName, slotName, startLSN, tables) - if err != nil { - pool.Close() - return fmt.Errorf("failed to update cdc metadata on node %s: %w", nodeInfo["Name"], err) - } - logger.Info("Updated CDC metadata for table %s on node %s", m.QualifiedTableName, nodeInfo["Name"]) + err = queries.UpdateCDCMetadata(m.Ctx, tx, publicationName, slotName, startLSN, tables) + if err != nil { + return fmt.Errorf("failed to update cdc metadata on node %s: %w", nodeInfo["Name"], err) + } + logger.Info("Updated CDC metadata for table %s on node %s", m.QualifiedTableName, nodeInfo["Name"]) - logger.Info("Creating Merkle Tree objects on %s...", nodeInfo["Name"]) - err = m.createMtreeObjects(tx, maxRows, numBlocks) - if err != nil { - pool.Close() - return fmt.Errorf("failed to create mtree objects on node %s: %w", nodeInfo["Name"], err) - } + logger.Info("Creating Merkle Tree objects on %s...", nodeInfo["Name"]) + err = m.createMtreeObjects(tx, maxRows, numBlocks) + if err != nil { + return fmt.Errorf("failed to create mtree objects on node %s: %w", nodeInfo["Name"], err) + } - logger.Info("Inserting block ranges on %s...", nodeInfo["Name"]) - err = m.insertBlockRanges(tx, blockRanges) - if err != nil { - pool.Close() - return fmt.Errorf("failed to insert block ranges on node %s: %w", nodeInfo["Name"], err) - } + logger.Info("Inserting block ranges on %s...", nodeInfo["Name"]) + err = m.insertBlockRanges(tx, blockRanges) + if err != nil { + return fmt.Errorf("failed to insert block ranges on node %s: %w", nodeInfo["Name"], err) + } - logger.Info("Computing leaf hashes on %s...", nodeInfo["Name"]) - err = m.computeLeafHashes(pool, tx, blockRanges, numWorkers, "Computing leaf hashes:") - if err != nil { - pool.Close() - return fmt.Errorf("failed to compute leaf hashes on node %s: %w", nodeInfo["Name"], err) - } + logger.Info("Computing leaf hashes on %s...", nodeInfo["Name"]) + err = m.computeLeafHashes(pool, tx, blockRanges, numWorkers, "Computing leaf hashes:") + if err != nil { + return fmt.Errorf("failed to compute leaf hashes on node %s: %w", nodeInfo["Name"], err) + } - logger.Info("Building parent nodes on %s...", nodeInfo["Name"]) - err = m.buildParentNodes(tx) - if err != nil { - pool.Close() - return fmt.Errorf("failed to build parent nodes on node %s: %w", nodeInfo["Name"], err) - } + logger.Info("Building parent nodes on %s...", nodeInfo["Name"]) + err = m.buildParentNodes(tx) + if err != nil { + return fmt.Errorf("failed to build parent nodes on node %s: %w", nodeInfo["Name"], err) + } - logger.Info("Merkle tree built successfully on %s", nodeInfo["Name"]) - err = tx.Commit(m.Ctx) - if err != nil { - pool.Close() - return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err) + logger.Info("Merkle tree built successfully on %s", nodeInfo["Name"]) + if err := tx.Commit(m.Ctx); err != nil { + return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err) + } + return nil + }(); err != nil { + return err } - pool.Close() } resultCtx["max_rows"] = maxRows @@ -2496,17 +2565,17 @@ func (m *MerkleTreeTask) insertBlockRanges(conn queries.DBQuerier, ranges []type func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlocks int) error { - err := queries.CreateSchema(m.Ctx, tx, m.aceSchema()) - if err != nil { - return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err) - } - - err = queries.CreateXORFunction(m.Ctx, tx) - if err != nil { - return fmt.Errorf("failed to create xor function: %w", err) - } - - err = queries.CreateMetadataTable(m.Ctx, tx) + // Schema and the bytea_xor function are created by MtreeInit's Phase A + // and are required for BuildMtree to make sense (build needs the CDC + // publication and metadata that init sets up — if init ran, schema and + // XOR function exist). Recreating them here is redundant, and on Spock + // clusters CREATE OR REPLACE FUNCTION unconditionally writes to + // pg_proc, so it races with Spock's DDL apply of the matching + // replicated DDL from n1's own build — producing SQLSTATE XX000 + // "tuple concurrently updated". The CREATE TABLE IF NOT EXISTS below + // is race-safe because IF NOT EXISTS short-circuits when the relation + // already exists. + err := queries.CreateMetadataTable(m.Ctx, tx) if err != nil { return fmt.Errorf("failed to create metadata table: %w", err) } diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 0038dae..f1fd2d4 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -65,6 +65,7 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont publication := cfg.PublicationName slotName := cfg.SlotName var startLSNStr string + var pubCommitLSNStr string var tables []string func() { tx, err := pool.Begin(ctx) @@ -74,13 +75,14 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } defer tx.Rollback(ctx) - metaSlot, metaLSN, metaTables, err := queries.GetCDCMetadata(ctx, tx, publication) + metaSlot, metaLSN, metaTables, metaPubCommit, err := queries.GetCDCMetadata(ctx, tx, publication) if err != nil { logger.Error("failed to get cdc metadata: %v", err) startLSNStr = "" return } startLSNStr = metaLSN + pubCommitLSNStr = metaPubCommit tables = metaTables if metaSlot != "" { slotName = metaSlot @@ -98,10 +100,33 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return fmt.Errorf("failed to parse lsn: %v", err) } - slotConfirmedLSN, err := getSlotConfirmedFlushLSN(ctx, pool, slotName) - if err != nil { - logger.Error("failed to get confirmed_flush_lsn for slot %s: %v", slotName, err) - return fmt.Errorf("failed to get confirmed_flush_lsn for slot %s: %w", slotName, err) + // Publication-commit guard. If metadata records the LSN at which the + // publication was committed during MtreeInit, refuse to start a stream + // from any LSN strictly older than that — pgoutput's historical + // catalog snapshot would not yet contain the publication and the + // stream would abort with "publication does not exist" (SQLSTATE + // 42704). + // + // Empty pubCommitLSNStr means the metadata row was written by a + // version of ACE that pre-dates this fix; we cannot check the + // invariant and fall through with a warning. + if pubCommitLSNStr == "" { + logger.Warn("ace_cdc_metadata has no pub_commit_lsn for publication %s; cannot verify slot/publication ordering. If you see SQLSTATE 42704, run 'ace mtree teardown', 'ace mtree init', and 'ace mtree build' to recover", publication) + } else { + pubCommitLSN, err := pglogrepl.ParseLSN(pubCommitLSNStr) + if err != nil { + logger.Error("failed to parse pub_commit_lsn %s: %v", pubCommitLSNStr, err) + return fmt.Errorf("failed to parse pub_commit_lsn %s: %w", pubCommitLSNStr, err) + } + if startLSN < pubCommitLSN { + return fmt.Errorf( + "ace_cdc_metadata.start_lsn (%s) is older than publication commit LSN (%s) "+ + "for slot %s; this typically indicates ace_cdc_metadata was replicated "+ + "cross-node by Spock, or the slot/metadata is from a prior init. "+ + "Run 'ace mtree teardown', 'ace mtree init', and 'ace mtree build' to recover", + startLSN, pubCommitLSN, slotName, + ) + } } targetFlushLSN := pglogrepl.LSN(0) @@ -119,9 +144,6 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } lastLSN := startLSN - if slotConfirmedLSN != 0 && slotConfirmedLSN < lastLSN { - lastLSN = slotConfirmedLSN - } var lastFlushedLSN pglogrepl.LSN = lastLSN var lastLSNVal atomic.Uint64 lastLSNVal.Store(uint64(lastLSN)) @@ -472,22 +494,6 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return nil } -func getSlotConfirmedFlushLSN(ctx context.Context, pool *pgxpool.Pool, slotName string) (pglogrepl.LSN, error) { - var lsnStr string - err := pool.QueryRow(ctx, "SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1", slotName).Scan(&lsnStr) - if err != nil { - return 0, fmt.Errorf("failed to fetch confirmed_flush_lsn for slot %s: %w", slotName, err) - } - if lsnStr == "" { - return 0, fmt.Errorf("confirmed_flush_lsn empty for slot %s", slotName) - } - lsn, err := pglogrepl.ParseLSN(lsnStr) - if err != nil { - return 0, fmt.Errorf("failed to parse confirmed_flush_lsn %s for slot %s: %w", lsnStr, slotName, err) - } - return lsn, nil -} - func getCurrentWalFlushLSN(ctx context.Context, pool *pgxpool.Pool) (pglogrepl.LSN, error) { var lsnStr string err := pool.QueryRow(ctx, "SELECT pg_current_wal_flush_lsn()").Scan(&lsnStr) diff --git a/internal/infra/cdc/setup.go b/internal/infra/cdc/setup.go index a135cb8..52dcc09 100644 --- a/internal/infra/cdc/setup.go +++ b/internal/infra/cdc/setup.go @@ -38,6 +38,7 @@ func SetupPublication(ctx context.Context, db queries.DBQuerier, publicationName return nil } + func SetupReplicationSlot(ctx context.Context, nodeInfo map[string]any) (pglogrepl.LSN, error) { cfg := config.Get().MTree.CDC slot := cfg.SlotName diff --git a/internal/infra/db/auth.go b/internal/infra/db/auth.go index fe4ac8d..dbe407d 100644 --- a/internal/infra/db/auth.go +++ b/internal/infra/db/auth.go @@ -252,6 +252,22 @@ func applyRuntimeParams(params map[string]string, pgCfg config.PostgresConfig) { if pgCfg.TCPKeepalivesCount != nil { params["tcp_keepalives_count"] = strconv.Itoa(*pgCfg.TCPKeepalivesCount) } + + // Suppress Spock's DDL replication and auto-repset-add behaviour on + // every ACE connection. ACE only issues DDL against its own pgedge_ace + // schema and the ace_mtree_pub publication — never against user + // objects — and all of that DDL is intentionally per-node. Letting + // Spock replicate it produces cross-node races (e.g. CREATE OR REPLACE + // FUNCTION racing on pg_proc) and 42704 "publication does not exist" + // when one node's DROP PUBLICATION + CREATE PUBLICATION lands in + // another node's pgoutput stream mid-replay. + // + // Both GUCs are PGC_USERSET in Spock, so connection-level options + // override any cluster-level default. Both have dotted names, so on + // vanilla PG (no Spock loaded) PostgreSQL accepts them as placeholder + // custom variables and they have no effect — safe in dual-mode. + params["spock.enable_ddl_replication"] = "off" + params["spock.include_ddl_repset"] = "off" } func ensureRuntimeParams(params map[string]string) map[string]string { diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index 8a2771d..fb13b5c 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -252,7 +252,7 @@ func currentMetadataLSN(t *testing.T, ctx context.Context) pglogrepl.LSN { require.NoError(t, err) defer tx.Rollback(ctx) - _, lsnStr, _, err := queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) + _, lsnStr, _, _, err := queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) require.NoError(t, err) lsn, err := pglogrepl.ParseLSN(lsnStr) diff --git a/tests/integration/cdc_init_ordering_test.go b/tests/integration/cdc_init_ordering_test.go new file mode 100644 index 0000000..8a8dd51 --- /dev/null +++ b/tests/integration/cdc_init_ordering_test.go @@ -0,0 +1,163 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +// Regression coverage for SQLSTATE 42704 "publication \"ace_mtree_pub\" +// does not exist" during mtree table-diff. MtreeInit was creating the +// replication slot before the publication was committed, so the slot's +// consistent point preceded the publication. pgoutput's +// get_publication_oid then failed during replay. The fix splits init into +// three phases so the slot's consistent point sits at or beyond the +// publication's commit LSN, and adds a pub_commit_lsn guard in +// processReplicationStream that rejects streams whose start_lsn predates +// it. + +package integration + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pgedge/ace/internal/infra/cdc" + "github.com/pgedge/ace/pkg/config" + "github.com/stretchr/testify/require" +) + +// cdcMetadataTable returns the safely-quoted "schema"."ace_cdc_metadata" +// identifier. Built once with pgx.Identifier.Sanitize() so call sites can +// concatenate it into SQL without fmt.Sprintf — matches the production +// convention in queries package and keeps Codacy/Semgrep's +// concat-sqli pattern quiet (schema is config-loaded, never user input). +func cdcMetadataTable() string { + return pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize() +} + +// TestMtreeInitSlotIsAfterPublication verifies the init-ordering fix: the replication +// slot's consistent point sits at or beyond the publication's commit LSN, so +// pgoutput can always find the publication in the historical catalog snapshot. +func TestMtreeInitSlotIsAfterPublication(t *testing.T) { + ctx := context.Background() + tableName := "customers_pub_ordering" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + for _, np := range []struct { + name string + pool *pgxpool.Pool + }{ + {serviceN1, pgCluster.Node1Pool}, + {serviceN2, pgCluster.Node2Pool}, + } { + // pub_commit_lsn must be populated by the new InitCDCMetadata path. + var pubCommitLSN string + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := np.pool.QueryRow(ctx, + "SELECT COALESCE(pub_commit_lsn, '') FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&pubCommitLSN) + require.NoError(t, err, "read pub_commit_lsn on %s", np.name) + require.NotEmpty(t, pubCommitLSN, "pub_commit_lsn must be set on %s (proves new InitCDCMetadata ran)", np.name) + + // The slot's confirmed_flush_lsn is the consistent point of the + // replication slot. It must be >= the publication's commit LSN, or + // pgoutput would replay from a snapshot that doesn't see the + // publication. This is the core ordering invariant the fix restores. + var slotConfirmed string + err = np.pool.QueryRow(ctx, + "SELECT confirmed_flush_lsn::text FROM pg_replication_slots WHERE slot_name = $1", + config.Cfg.MTree.CDC.SlotName, + ).Scan(&slotConfirmed) + require.NoError(t, err, "read slot confirmed_flush_lsn on %s", np.name) + + var aheadOrEqual bool + err = np.pool.QueryRow(ctx, + "SELECT $1::pg_lsn >= $2::pg_lsn", + slotConfirmed, pubCommitLSN, + ).Scan(&aheadOrEqual) + require.NoError(t, err, "compare LSNs on %s", np.name) + require.True(t, aheadOrEqual, + "slot consistent point %s must be >= publication commit LSN %s on %s", + slotConfirmed, pubCommitLSN, np.name) + } +} + +// TestProcessReplicationStreamRejectsStaleStartLSN verifies the new +// publication-commit guard in processReplicationStream. We corrupt n1's +// start_lsn to a value strictly older than pub_commit_lsn and assert that +// UpdateFromCDC returns an explicit, actionable error rather than the +// cryptic SQLSTATE 42704 the customer originally saw. +func TestProcessReplicationStreamRejectsStaleStartLSN(t *testing.T) { + ctx := context.Background() + tableName := "customers_stale_start_lsn" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + // Read pub_commit_lsn and craft a stale start_lsn strictly below it. + var pubCommitLSN string + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := pgCluster.Node1Pool.QueryRow(ctx, + "SELECT pub_commit_lsn FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&pubCommitLSN) + require.NoError(t, err) + require.NotEmpty(t, pubCommitLSN, "fix relies on pub_commit_lsn being populated") + + var staleLSN string + err = pgCluster.Node1Pool.QueryRow(ctx, + "SELECT ($1::pg_lsn - 16)::text", pubCommitLSN, + ).Scan(&staleLSN) + require.NoError(t, err) + + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + _, err = pgCluster.Node1Pool.Exec(ctx, + "UPDATE "+cdcMetadataTable()+" SET start_lsn = $1 WHERE publication_name = $2", + staleLSN, config.Cfg.MTree.CDC.PublicationName, + ) + require.NoError(t, err, "inject stale start_lsn") + + nodeInfo := pgCluster.ClusterNodes[0] + err = cdc.UpdateFromCDC(context.Background(), nodeInfo) + require.Error(t, err, "UpdateFromCDC must refuse a stream whose start_lsn precedes the publication commit") + + msg := err.Error() + require.Contains(t, msg, "publication commit LSN", + "error must mention the publication-commit invariant, got: %s", msg) + require.NotContains(t, strings.ToLower(msg), "42704", + "caller should see the new guard's actionable error, not the raw 42704 from PostgreSQL: %s", msg) +}