From e05e0016b75f87ead98b78d295b92826a92badba Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Sun, 24 May 2026 19:04:59 -0700 Subject: [PATCH 1/8] fix: mtree init publication ordering Fixes 'publication "ace_mtree_pub" does not exist' (SQLSTATE 42704) during 'ace mtree table-diff'. Init order: the replication slot was being created before the publication was committed to WAL. The slot's consistent point therefore preceded the publication, and pgoutput's get_publication_oid failed during change-callback replay. MtreeInit is now split into three phases per node: Phase A commits schema + metadata table + publication and captures the publication commit LSN; Phase B creates the slot (its consistent point is now strictly after the publication); Phase C persists slot/start_lsn/ pub_commit_lsn. A new pub_commit_lsn column is added to ace_cdc_metadata (additive, ALTER TABLE IF NOT EXISTS). processReplicationStream refuses to open a stream whose start_lsn is older than pub_commit_lsn, returning an actionable error instead of silently rewinding to a pre-publication LSN. Empty pub_commit_lsn (legacy metadata rows) skips the check with a warning. Existing broken installs are not auto-repaired; users must run 'ace mtree teardown' + 'ace mtree init' + 'ace mtree build' after upgrade. '--skip-cdc' remains a valid interim workaround for pre-upgrade clusters. --- db/queries/queries.go | 49 +++++++++-- db/queries/templates.go | 54 +++++++++++- internal/consistency/mtree/merkle.go | 107 +++++++++++++++++------ internal/infra/cdc/listen.go | 54 +++++++----- internal/infra/cdc/setup.go | 1 + tests/integration/cdc_busy_table_test.go | 2 +- 6 files changed, 204 insertions(+), 63 deletions(-) diff --git a/db/queries/queries.go b/db/queries/queries.go index 8dfe37c..a722c55 100644 --- a/db/queries/queries.go +++ b/db/queries/queries.go @@ -2913,18 +2913,53 @@ func DropCDCMetadataTable(ctx context.Context, db DBQuerier) error { return nil } -func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (string, string, []string, error) { +// pubCommitLSN is empty for legacy metadata rows that pre-date the +// pub_commit_lsn column; callers must treat empty as "invariant +// uncheckable" and skip the publication-commit guard with a warning. +func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (slotName, startLSN string, tables []string, pubCommitLSN string, err error) { sql, err := RenderSQL(SQLTemplates.GetCDCMetadata, nil) if err != nil { - return "", "", nil, err + return "", "", nil, "", err } - var slotName, startLSN string - var tables []string - err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables) + err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables, &pubCommitLSN) + if err != nil { + return "", "", nil, "", err + } + return slotName, startLSN, tables, pubCommitLSN, nil +} + +// Init path only. Ongoing flushes use UpdateCDCMetadata, which deliberately +// leaves pub_commit_lsn untouched so the listen.go guard always compares +// against the LSN captured at the matching init. +func InitCDCMetadata(ctx context.Context, db DBQuerier, publicationName, slotName, startLSN, pubCommitLSN string, tables []string) error { + sql, err := RenderSQL(SQLTemplates.InitCDCMetadata, nil) + if err != nil { + return fmt.Errorf("failed to render InitCDCMetadata SQL: %w", err) + } + if tables == nil { + tables = []string{} + } + _, err = db.Exec(ctx, sql, publicationName, slotName, startLSN, pubCommitLSN, tables) if err != nil { - return "", "", nil, err + return fmt.Errorf("query to init cdc metadata failed: %w", err) } - return slotName, startLSN, tables, nil + return nil +} + +// Called mid-Phase-A, after CREATE PUBLICATION, so the captured value is +// strictly less than Phase A's commit LSN. The slot created in Phase B +// has consistent_point >= that commit LSN, hence consistent_point > +// captured value: a safe lower bound for any valid replication start LSN. +func CurrentWalInsertLSN(ctx context.Context, db DBQuerier) (string, error) { + sql, err := RenderSQL(SQLTemplates.CurrentWalInsertLSN, nil) + if err != nil { + return "", fmt.Errorf("failed to render CurrentWalInsertLSN SQL: %w", err) + } + var lsn string + if err := db.QueryRow(ctx, sql).Scan(&lsn); err != nil { + return "", fmt.Errorf("failed to fetch current WAL insert LSN: %w", err) + } + return lsn, nil } func UpdateMtreeCounters(ctx context.Context, db DBQuerier, mtreeTable string, isComposite bool, compositeTypeName string, pkeyType string, inserts, deletes, updates []string) error { diff --git a/db/queries/templates.go b/db/queries/templates.go index 6a0ffb8..bec9e11 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -145,6 +145,9 @@ type Templates struct { ResetReplicationOriginSession *template.Template SetupReplicationOriginXact *template.Template ResetReplicationOriginXact *template.Template + + InitCDCMetadata *template.Template + CurrentWalInsertLSN *template.Template } var SQLTemplates = Templates{ @@ -221,6 +224,39 @@ var SQLTemplates = Templates{ last_updated = EXCLUDED.last_updated `)), + // On conflict every column including pub_commit_lsn is refreshed: + // reaching this query path means a re-init, which created a fresh + // publication with a new commit LSN, and listen.go's guard must + // compare against that current LSN — not a stale prior one. + InitCDCMetadata: template.Must(template.New("initCdcMetadata").Funcs(aceTemplateFuncs).Parse(` + INSERT INTO + {{aceSchema}}.ace_cdc_metadata ( + publication_name, + slot_name, + start_lsn, + pub_commit_lsn, + tables, + last_updated + ) + VALUES + ( + $1, + $2, + $3, + $4, + $5, + current_timestamp + ) + ON CONFLICT (publication_name) DO + UPDATE + SET + slot_name = EXCLUDED.slot_name, + start_lsn = EXCLUDED.start_lsn, + pub_commit_lsn = EXCLUDED.pub_commit_lsn, + tables = EXCLUDED.tables, + last_updated = EXCLUDED.last_updated + `)), + DropPublication: template.Must(template.New("dropPublication").Parse(` DROP PUBLICATION IF EXISTS {{.PublicationName}} `)), @@ -244,7 +280,8 @@ var SQLTemplates = Templates{ SELECT slot_name, start_lsn, - tables + tables, + COALESCE(pub_commit_lsn, '') AS pub_commit_lsn FROM {{aceSchema}}.ace_cdc_metadata WHERE @@ -338,9 +375,13 @@ var SQLTemplates = Templates{ publication_name text PRIMARY KEY, slot_name text, start_lsn text, + pub_commit_lsn text, tables text[], last_updated timestamptz - )`), + ); + -- Forward-compatible addition for clusters that ran older versions. + ALTER TABLE {{aceSchema}}.ace_cdc_metadata + ADD COLUMN IF NOT EXISTS pub_commit_lsn text;`), ), GetPrimaryKey: template.Must(template.New("getPrimaryKey").Parse(` SELECT @@ -1599,4 +1640,13 @@ var SQLTemplates = Templates{ ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(` SELECT pg_replication_origin_xact_reset() `)), + + // CurrentWalInsertLSN returns pg_current_wal_insert_lsn() as text. Used + // at MtreeInit time, just after CREATE PUBLICATION, as the lower bound + // for any future replication start LSN. The slot's consistent point is + // guaranteed to be >= this value because the slot is created after this + // transaction commits. + CurrentWalInsertLSN: template.Must(template.New("currentWalInsertLSN").Parse(` + SELECT pg_current_wal_insert_lsn()::text + `)), } diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 4b6bd03..201b13a 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -807,51 +807,100 @@ func (m *MerkleTreeTask) MtreeInit() (err error) { cfg := config.Get().MTree.CDC for _, nodeInfo := range m.ClusterNodes { - logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) - - lsn, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo) - if err != nil { - return fmt.Errorf("failed to set up replication slot on node %s: %w", nodeInfo["Name"], err) + if err := m.initOneNode(nodeInfo, cfg.PublicationName, cfg.SlotName); err != nil { + return err } + } + return nil +} - pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) - if err != nil { - return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err) - } - defer pool.Close() +// initOneNode runs MtreeInit on a single node in three phases: +// +// 1. Phase A (tx 1): create schema, helpers, the CDC metadata table, +// create the publication, and capture the publication's commit LSN. +// Commit. +// 2. Phase B (no tx): create the logical replication slot via a fresh +// replication-mode connection. Its consistent point is now +// guaranteed to be > the publication's commit LSN, fixing the +// "publication does not exist" replay error. +// 3. Phase C (tx 2): persist slot/start_lsn/pub_commit_lsn into +// ace_cdc_metadata. +// +// Each iteration runs inside this function so deferred Close/Rollback +// calls fire per-node rather than at MtreeInit return. +func (m *MerkleTreeTask) initOneNode(nodeInfo map[string]any, publicationName, slotName string) error { + logger.Info("Initialising Merkle tree objects on node: %s", nodeInfo["Name"]) - if err = queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { - return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err) - } + pool, err := auth.GetClusterNodeConnection(m.Ctx, nodeInfo, m.connOpts()) + if err != nil { + return fmt.Errorf("failed to get connection pool for node %s: %w", nodeInfo["Name"], err) + } + defer pool.Close() + + if err := queries.CreateSchema(m.Ctx, pool, m.aceSchema()); err != nil { + return fmt.Errorf("failed to create schema '%s' on node %s: %w", m.aceSchema(), nodeInfo["Name"], err) + } + var pubCommitLSN string + if err := func() (err error) { tx, err := pool.Begin(m.Ctx) if err != nil { - return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) + return err } defer tx.Rollback(m.Ctx) - if err = queries.CreateXORFunction(m.Ctx, tx); err != nil { - return fmt.Errorf("failed to create xor function: %w", err) + if err := queries.CreateXORFunction(m.Ctx, tx); err != nil { + return fmt.Errorf("create xor function: %w", err) } - - if err = queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil { - return fmt.Errorf("failed to create cdc metadata table: %w", err) + if err := queries.CreateCDCMetadataTable(m.Ctx, tx); err != nil { + return fmt.Errorf("create cdc metadata table: %w", err) } - - if err := cdc.SetupPublication(m.Ctx, tx, cfg.PublicationName); err != nil { - return fmt.Errorf("failed to setup publication on node %s: %w", nodeInfo["Name"], err) + if err := cdc.SetupPublication(m.Ctx, tx, publicationName); err != nil { + return fmt.Errorf("setup publication: %w", err) } - - if err = queries.UpdateCDCMetadata(m.Ctx, tx, cfg.PublicationName, cfg.SlotName, lsn.String(), []string{}); err != nil { - return fmt.Errorf("failed to update cdc metadata: %w", err) + // Captured mid-tx after CREATE PUBLICATION, so the value is + // strictly less than this tx's commit LSN; Phase B's slot + // consistent_point is >= that commit LSN. listen.go's guard + // therefore catches any start LSN that predates the publication. + pubCommitLSN, err = queries.CurrentWalInsertLSN(m.Ctx, tx) + if err != nil { + return fmt.Errorf("capture publication commit LSN: %w", err) } + return tx.Commit(m.Ctx) + }(); err != nil { + return fmt.Errorf("phase A failed on node %s: %w", nodeInfo["Name"], err) + } - if err := tx.Commit(m.Ctx); err != nil { - return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err) + slotLSN, err := cdc.SetupReplicationSlot(m.Ctx, nodeInfo) + if err != nil { + return fmt.Errorf("phase B (replication slot) failed on node %s: %w", nodeInfo["Name"], err) + } + + if err := func() (err error) { + tx, err := pool.Begin(m.Ctx) + if err != nil { + return err } + defer tx.Rollback(m.Ctx) - logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"]) + if err := queries.InitCDCMetadata(m.Ctx, tx, + publicationName, slotName, + slotLSN.String(), pubCommitLSN, []string{}); err != nil { + return fmt.Errorf("write cdc metadata: %w", err) + } + return tx.Commit(m.Ctx) + }(); err != nil { + // Slot leak mitigation: Phase B created a slot that we failed to + // record. A subsequent SetupReplicationSlot will drop any prior + // slot of the same name, so a re-run reaps it — but try a best + // effort drop here too to keep the cluster tidy. + if dropErr := queries.DropReplicationSlot(m.Ctx, pool, slotName); dropErr != nil { + logger.Warn("failed to drop orphaned slot %s on node %s: %v", slotName, nodeInfo["Name"], dropErr) + } + return fmt.Errorf("phase C failed on node %s: %w", nodeInfo["Name"], err) } + + logger.Info("Merkle tree objects initialised on node: %s", nodeInfo["Name"]) return nil } @@ -1427,7 +1476,7 @@ func (m *MerkleTreeTask) BuildMtree() (err error) { } defer tx.Rollback(m.Ctx) - slotName, startLSN, tables, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) + slotName, startLSN, tables, _, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) if err != nil { pool.Close() return fmt.Errorf("failed to get cdc metadata on node %s: %w", nodeInfo["Name"], err) diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 0038dae..7454e19 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -65,6 +65,7 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont publication := cfg.PublicationName slotName := cfg.SlotName var startLSNStr string + var pubCommitLSNStr string var tables []string func() { tx, err := pool.Begin(ctx) @@ -74,13 +75,14 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } defer tx.Rollback(ctx) - metaSlot, metaLSN, metaTables, err := queries.GetCDCMetadata(ctx, tx, publication) + metaSlot, metaLSN, metaTables, metaPubCommit, err := queries.GetCDCMetadata(ctx, tx, publication) if err != nil { logger.Error("failed to get cdc metadata: %v", err) startLSNStr = "" return } startLSNStr = metaLSN + pubCommitLSNStr = metaPubCommit tables = metaTables if metaSlot != "" { slotName = metaSlot @@ -98,10 +100,33 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return fmt.Errorf("failed to parse lsn: %v", err) } - slotConfirmedLSN, err := getSlotConfirmedFlushLSN(ctx, pool, slotName) - if err != nil { - logger.Error("failed to get confirmed_flush_lsn for slot %s: %v", slotName, err) - return fmt.Errorf("failed to get confirmed_flush_lsn for slot %s: %w", slotName, err) + // Publication-commit guard. If metadata records the LSN at which the + // publication was committed during MtreeInit, refuse to start a stream + // from any LSN strictly older than that — pgoutput's historical + // catalog snapshot would not yet contain the publication and the + // stream would abort with "publication does not exist" (SQLSTATE + // 42704). + // + // Empty pubCommitLSNStr means the metadata row was written by a + // version of ACE that pre-dates this fix; we cannot check the + // invariant and fall through with a warning. + if pubCommitLSNStr == "" { + logger.Warn("ace_cdc_metadata has no pub_commit_lsn for publication %s; cannot verify slot/publication ordering. If you see SQLSTATE 42704, run 'ace mtree teardown' and 'ace mtree init' to recover", publication) + } else { + pubCommitLSN, err := pglogrepl.ParseLSN(pubCommitLSNStr) + if err != nil { + logger.Error("failed to parse pub_commit_lsn %s: %v", pubCommitLSNStr, err) + return fmt.Errorf("failed to parse pub_commit_lsn %s: %w", pubCommitLSNStr, err) + } + if startLSN < pubCommitLSN { + return fmt.Errorf( + "ace_cdc_metadata.start_lsn (%s) is older than publication commit LSN (%s) "+ + "for slot %s; this typically indicates ace_cdc_metadata was replicated "+ + "cross-node by Spock, or the slot/metadata is from a prior init. "+ + "Run 'ace mtree teardown' and 'ace mtree init' to recover", + startLSN, pubCommitLSN, slotName, + ) + } } targetFlushLSN := pglogrepl.LSN(0) @@ -119,9 +144,6 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont } lastLSN := startLSN - if slotConfirmedLSN != 0 && slotConfirmedLSN < lastLSN { - lastLSN = slotConfirmedLSN - } var lastFlushedLSN pglogrepl.LSN = lastLSN var lastLSNVal atomic.Uint64 lastLSNVal.Store(uint64(lastLSN)) @@ -472,22 +494,6 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont return nil } -func getSlotConfirmedFlushLSN(ctx context.Context, pool *pgxpool.Pool, slotName string) (pglogrepl.LSN, error) { - var lsnStr string - err := pool.QueryRow(ctx, "SELECT confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1", slotName).Scan(&lsnStr) - if err != nil { - return 0, fmt.Errorf("failed to fetch confirmed_flush_lsn for slot %s: %w", slotName, err) - } - if lsnStr == "" { - return 0, fmt.Errorf("confirmed_flush_lsn empty for slot %s", slotName) - } - lsn, err := pglogrepl.ParseLSN(lsnStr) - if err != nil { - return 0, fmt.Errorf("failed to parse confirmed_flush_lsn %s for slot %s: %w", lsnStr, slotName, err) - } - return lsn, nil -} - func getCurrentWalFlushLSN(ctx context.Context, pool *pgxpool.Pool) (pglogrepl.LSN, error) { var lsnStr string err := pool.QueryRow(ctx, "SELECT pg_current_wal_flush_lsn()").Scan(&lsnStr) diff --git a/internal/infra/cdc/setup.go b/internal/infra/cdc/setup.go index a135cb8..52dcc09 100644 --- a/internal/infra/cdc/setup.go +++ b/internal/infra/cdc/setup.go @@ -38,6 +38,7 @@ func SetupPublication(ctx context.Context, db queries.DBQuerier, publicationName return nil } + func SetupReplicationSlot(ctx context.Context, nodeInfo map[string]any) (pglogrepl.LSN, error) { cfg := config.Get().MTree.CDC slot := cfg.SlotName diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index 8a2771d..fb13b5c 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -252,7 +252,7 @@ func currentMetadataLSN(t *testing.T, ctx context.Context) pglogrepl.LSN { require.NoError(t, err) defer tx.Rollback(ctx) - _, lsnStr, _, err := queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) + _, lsnStr, _, _, err := queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) require.NoError(t, err) lsn, err := pglogrepl.ParseLSN(lsnStr) From 00b9d51afa325ebc2e966b2b0cfd740ceed50467 Mon Sep 17 00:00:00 2001 From: Andrei Lepikhov Date: Sun, 24 May 2026 19:07:14 -0700 Subject: [PATCH 2/8] test: regression coverage for mtree CDC init ordering Two integration tests against the spock cluster fixture: - TestMtreeInitSlotIsAfterPublication: after MtreeInit, every node's slot consistent_point must be >= the publication's commit LSN recorded in ace_cdc_metadata.pub_commit_lsn. This is the ordering invariant the 3-phase init split restores. - TestProcessReplicationStreamRejectsStaleStartLSN: corrupts n1's start_lsn to a value below pub_commit_lsn and asserts UpdateFromCDC returns the new guard's actionable error rather than the cryptic SQLSTATE 42704 that pgoutput would otherwise emit. --- tests/integration/cdc_init_ordering_test.go | 163 ++++++++++++++++++++ 1 file changed, 163 insertions(+) create mode 100644 tests/integration/cdc_init_ordering_test.go diff --git a/tests/integration/cdc_init_ordering_test.go b/tests/integration/cdc_init_ordering_test.go new file mode 100644 index 0000000..8a8dd51 --- /dev/null +++ b/tests/integration/cdc_init_ordering_test.go @@ -0,0 +1,163 @@ +// /////////////////////////////////////////////////////////////////////////// +// +// # ACE - Active Consistency Engine +// +// Copyright (C) 2023 - 2026, pgEdge (https://www.pgedge.com/) +// +// This software is released under the PostgreSQL License: +// https://opensource.org/license/postgresql +// +// /////////////////////////////////////////////////////////////////////////// + +// Regression coverage for SQLSTATE 42704 "publication \"ace_mtree_pub\" +// does not exist" during mtree table-diff. MtreeInit was creating the +// replication slot before the publication was committed, so the slot's +// consistent point preceded the publication. pgoutput's +// get_publication_oid then failed during replay. The fix splits init into +// three phases so the slot's consistent point sits at or beyond the +// publication's commit LSN, and adds a pub_commit_lsn guard in +// processReplicationStream that rejects streams whose start_lsn predates +// it. + +package integration + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/pgedge/ace/internal/infra/cdc" + "github.com/pgedge/ace/pkg/config" + "github.com/stretchr/testify/require" +) + +// cdcMetadataTable returns the safely-quoted "schema"."ace_cdc_metadata" +// identifier. Built once with pgx.Identifier.Sanitize() so call sites can +// concatenate it into SQL without fmt.Sprintf — matches the production +// convention in queries package and keeps Codacy/Semgrep's +// concat-sqli pattern quiet (schema is config-loaded, never user input). +func cdcMetadataTable() string { + return pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize() +} + +// TestMtreeInitSlotIsAfterPublication verifies the init-ordering fix: the replication +// slot's consistent point sits at or beyond the publication's commit LSN, so +// pgoutput can always find the publication in the historical catalog snapshot. +func TestMtreeInitSlotIsAfterPublication(t *testing.T) { + ctx := context.Background() + tableName := "customers_pub_ordering" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + for _, np := range []struct { + name string + pool *pgxpool.Pool + }{ + {serviceN1, pgCluster.Node1Pool}, + {serviceN2, pgCluster.Node2Pool}, + } { + // pub_commit_lsn must be populated by the new InitCDCMetadata path. + var pubCommitLSN string + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := np.pool.QueryRow(ctx, + "SELECT COALESCE(pub_commit_lsn, '') FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&pubCommitLSN) + require.NoError(t, err, "read pub_commit_lsn on %s", np.name) + require.NotEmpty(t, pubCommitLSN, "pub_commit_lsn must be set on %s (proves new InitCDCMetadata ran)", np.name) + + // The slot's confirmed_flush_lsn is the consistent point of the + // replication slot. It must be >= the publication's commit LSN, or + // pgoutput would replay from a snapshot that doesn't see the + // publication. This is the core ordering invariant the fix restores. + var slotConfirmed string + err = np.pool.QueryRow(ctx, + "SELECT confirmed_flush_lsn::text FROM pg_replication_slots WHERE slot_name = $1", + config.Cfg.MTree.CDC.SlotName, + ).Scan(&slotConfirmed) + require.NoError(t, err, "read slot confirmed_flush_lsn on %s", np.name) + + var aheadOrEqual bool + err = np.pool.QueryRow(ctx, + "SELECT $1::pg_lsn >= $2::pg_lsn", + slotConfirmed, pubCommitLSN, + ).Scan(&aheadOrEqual) + require.NoError(t, err, "compare LSNs on %s", np.name) + require.True(t, aheadOrEqual, + "slot consistent point %s must be >= publication commit LSN %s on %s", + slotConfirmed, pubCommitLSN, np.name) + } +} + +// TestProcessReplicationStreamRejectsStaleStartLSN verifies the new +// publication-commit guard in processReplicationStream. We corrupt n1's +// start_lsn to a value strictly older than pub_commit_lsn and assert that +// UpdateFromCDC returns an explicit, actionable error rather than the +// cryptic SQLSTATE 42704 the customer originally saw. +func TestProcessReplicationStreamRejectsStaleStartLSN(t *testing.T) { + ctx := context.Background() + tableName := "customers_stale_start_lsn" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("MtreeTeardown during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + // Read pub_commit_lsn and craft a stale start_lsn strictly below it. + var pubCommitLSN string + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + err := pgCluster.Node1Pool.QueryRow(ctx, + "SELECT pub_commit_lsn FROM "+cdcMetadataTable()+" WHERE publication_name = $1", + config.Cfg.MTree.CDC.PublicationName, + ).Scan(&pubCommitLSN) + require.NoError(t, err) + require.NotEmpty(t, pubCommitLSN, "fix relies on pub_commit_lsn being populated") + + var staleLSN string + err = pgCluster.Node1Pool.QueryRow(ctx, + "SELECT ($1::pg_lsn - 16)::text", pubCommitLSN, + ).Scan(&staleLSN) + require.NoError(t, err) + + // FP: identifier sanitised by pgx.Identifier; user values via $N. + // nosemgrep + _, err = pgCluster.Node1Pool.Exec(ctx, + "UPDATE "+cdcMetadataTable()+" SET start_lsn = $1 WHERE publication_name = $2", + staleLSN, config.Cfg.MTree.CDC.PublicationName, + ) + require.NoError(t, err, "inject stale start_lsn") + + nodeInfo := pgCluster.ClusterNodes[0] + err = cdc.UpdateFromCDC(context.Background(), nodeInfo) + require.Error(t, err, "UpdateFromCDC must refuse a stream whose start_lsn precedes the publication commit") + + msg := err.Error() + require.Contains(t, msg, "publication commit LSN", + "error must mention the publication-commit invariant, got: %s", msg) + require.NotContains(t, strings.ToLower(msg), "42704", + "caller should see the new guard's actionable error, not the raw 42704 from PostgreSQL: %s", msg) +} From b87723d0e9e216da6ef3b515fee9b7d6b67985c0 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 24 May 2026 18:27:58 -0700 Subject: [PATCH 3/8] fix: BuildMtree per-node error path deadlocked on pool.Close MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BuildMtree's per-node body opens a tx with defer tx.Rollback in the outer function scope, then on any error explicitly calls pool.Close() and returns the error. pool.Close() waits on puddle's WaitGroup for every acquired conn to be released — but the conn the tx holds will not be released until the deferred tx.Rollback runs, and that defer cannot run until the function returns, which is being blocked by pool.Close(). Classic Go defer-vs-explicit-close deadlock: the process hangs and the underlying error never surfaces. The shape repeats at every error site inside the per-iteration body (AlterPublicationAddTable, GetCDCMetadata, UpdateCDCMetadata, createMtreeObjects, insertBlockRanges, computeLeafHashes, buildParentNodes, tx.Commit). All eight are reachable; any one being hit hangs the build. Wrap the per-iteration body in a closure so defers are scoped to one iteration. defer pool.Close lands above defer tx.Rollback in registration order, so LIFO fires tx.Rollback first (releases the conn) then pool.Close (no acquired conns to wait for). The explicit pool.Close calls inside each error branch go away — defers handle it uniformly on success and failure. Confirmed via SIGQUIT goroutine dump showing main blocked in puddle.Pool.Close.deferwrap1 → WaitGroup.Wait, called from BuildMtree's createMtreeObjects error branch. The same defer-stacking pattern exists in the loop further down BuildMtree (around the UpdateMtree block) and in a few other *.Close sites elsewhere in this file. They don't deadlock today because either no tx is held or the order happens to be safe, but the closure pattern would make them uniformly correct. Left for a follow-up so this commit stays focused on the reproduced bug. --- internal/consistency/mtree/merkle.go | 135 ++++++++++++++------------- 1 file changed, 70 insertions(+), 65 deletions(-) diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 201b13a..c475294 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -1451,83 +1451,88 @@ func (m *MerkleTreeTask) BuildMtree() (err error) { } for _, nodeInfo := range m.ClusterNodes { - logger.Info("Processing node: %s", nodeInfo["Name"]) - pool, ok := pools[nodeInfo["Name"].(string)] - if !ok { - return fmt.Errorf("could not find node %s in pools", nodeInfo["Name"]) - } - publicationName := cfg.PublicationName - err := queries.AlterPublicationAddTable(m.Ctx, pool, publicationName, m.QualifiedTableName) - if err != nil { - var pgErr *pgconn.PgError - if errors.As(err, &pgErr) && pgErr.Code == tableAlreadyInPublicationError { - logger.Info("Table %s is already in publication %s on node %s", m.QualifiedTableName, publicationName, nodeInfo["Name"]) + // Per-iteration body in a closure so defer pool.Close and defer + // tx.Rollback fire at end-of-iteration in the correct LIFO order. + // Without this scoping, hitting any error inside the body called + // pool.Close while the tx still held a pooled conn — pool.Close + // blocks waiting for that conn to be released, but the deferred + // tx.Rollback that would release it cannot run until the function + // returns, which cannot happen while pool.Close is blocking. The + // process then hangs and the underlying error never surfaces. + if err := func() error { + logger.Info("Processing node: %s", nodeInfo["Name"]) + pool, ok := pools[nodeInfo["Name"].(string)] + if !ok { + return fmt.Errorf("could not find node %s in pools", nodeInfo["Name"]) + } + defer pool.Close() + + publicationName := cfg.PublicationName + err := queries.AlterPublicationAddTable(m.Ctx, pool, publicationName, m.QualifiedTableName) + if err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) && pgErr.Code == tableAlreadyInPublicationError { + logger.Info("Table %s is already in publication %s on node %s", m.QualifiedTableName, publicationName, nodeInfo["Name"]) + } else { + return fmt.Errorf("failed to add table to publication on node %s: %w", nodeInfo["Name"], err) + } } else { - pool.Close() - return fmt.Errorf("failed to add table to publication on node %s: %w", nodeInfo["Name"], err) + logger.Info("Added table %s to publication %s on node %s", m.QualifiedTableName, publicationName, nodeInfo["Name"]) } - } else { - logger.Info("Added table %s to publication %s on node %s", m.QualifiedTableName, publicationName, nodeInfo["Name"]) - } - tx, err := pool.Begin(m.Ctx) - if err != nil { - return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) - } - defer tx.Rollback(m.Ctx) + tx, err := pool.Begin(m.Ctx) + if err != nil { + return fmt.Errorf("failed to begin transaction on node %s: %w", nodeInfo["Name"], err) + } + defer tx.Rollback(m.Ctx) - slotName, startLSN, tables, _, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) - if err != nil { - pool.Close() - return fmt.Errorf("failed to get cdc metadata on node %s: %w", nodeInfo["Name"], err) - } + slotName, startLSN, tables, _, err := queries.GetCDCMetadata(m.Ctx, tx, publicationName) + if err != nil { + return fmt.Errorf("failed to get cdc metadata on node %s: %w", nodeInfo["Name"], err) + } - if !slices.Contains(tables, m.QualifiedTableName) { - tables = append(tables, m.QualifiedTableName) - } + if !slices.Contains(tables, m.QualifiedTableName) { + tables = append(tables, m.QualifiedTableName) + } - err = queries.UpdateCDCMetadata(m.Ctx, tx, publicationName, slotName, startLSN, tables) - if err != nil { - pool.Close() - return fmt.Errorf("failed to update cdc metadata on node %s: %w", nodeInfo["Name"], err) - } - logger.Info("Updated CDC metadata for table %s on node %s", m.QualifiedTableName, nodeInfo["Name"]) + err = queries.UpdateCDCMetadata(m.Ctx, tx, publicationName, slotName, startLSN, tables) + if err != nil { + return fmt.Errorf("failed to update cdc metadata on node %s: %w", nodeInfo["Name"], err) + } + logger.Info("Updated CDC metadata for table %s on node %s", m.QualifiedTableName, nodeInfo["Name"]) - logger.Info("Creating Merkle Tree objects on %s...", nodeInfo["Name"]) - err = m.createMtreeObjects(tx, maxRows, numBlocks) - if err != nil { - pool.Close() - return fmt.Errorf("failed to create mtree objects on node %s: %w", nodeInfo["Name"], err) - } + logger.Info("Creating Merkle Tree objects on %s...", nodeInfo["Name"]) + err = m.createMtreeObjects(tx, maxRows, numBlocks) + if err != nil { + return fmt.Errorf("failed to create mtree objects on node %s: %w", nodeInfo["Name"], err) + } - logger.Info("Inserting block ranges on %s...", nodeInfo["Name"]) - err = m.insertBlockRanges(tx, blockRanges) - if err != nil { - pool.Close() - return fmt.Errorf("failed to insert block ranges on node %s: %w", nodeInfo["Name"], err) - } + logger.Info("Inserting block ranges on %s...", nodeInfo["Name"]) + err = m.insertBlockRanges(tx, blockRanges) + if err != nil { + return fmt.Errorf("failed to insert block ranges on node %s: %w", nodeInfo["Name"], err) + } - logger.Info("Computing leaf hashes on %s...", nodeInfo["Name"]) - err = m.computeLeafHashes(pool, tx, blockRanges, numWorkers, "Computing leaf hashes:") - if err != nil { - pool.Close() - return fmt.Errorf("failed to compute leaf hashes on node %s: %w", nodeInfo["Name"], err) - } + logger.Info("Computing leaf hashes on %s...", nodeInfo["Name"]) + err = m.computeLeafHashes(pool, tx, blockRanges, numWorkers, "Computing leaf hashes:") + if err != nil { + return fmt.Errorf("failed to compute leaf hashes on node %s: %w", nodeInfo["Name"], err) + } - logger.Info("Building parent nodes on %s...", nodeInfo["Name"]) - err = m.buildParentNodes(tx) - if err != nil { - pool.Close() - return fmt.Errorf("failed to build parent nodes on node %s: %w", nodeInfo["Name"], err) - } + logger.Info("Building parent nodes on %s...", nodeInfo["Name"]) + err = m.buildParentNodes(tx) + if err != nil { + return fmt.Errorf("failed to build parent nodes on node %s: %w", nodeInfo["Name"], err) + } - logger.Info("Merkle tree built successfully on %s", nodeInfo["Name"]) - err = tx.Commit(m.Ctx) - if err != nil { - pool.Close() - return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err) + logger.Info("Merkle tree built successfully on %s", nodeInfo["Name"]) + if err := tx.Commit(m.Ctx); err != nil { + return fmt.Errorf("failed to commit transaction on node %s: %w", nodeInfo["Name"], err) + } + return nil + }(); err != nil { + return err } - pool.Close() } resultCtx["max_rows"] = maxRows From 6fe84d8cd89a5d8592dbab4b098bf0ed8dad04df Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 24 May 2026 18:39:01 -0700 Subject: [PATCH 4/8] fix: drop redundant DDL from BuildMtree that races with Spock apply MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit createMtreeObjects has called CreateSchema and CreateXORFunction at the start of every node's build iteration since the original BuildMtree commit (a05445e, July 2025) — back when build was the entry point and had to set up the schema and XOR function itself. When CDC-based MtreeInit landed (fd02c30, August 2025), those calls became redundant: init's Phase A creates both, and BuildMtree implicitly requires init to have run (it consumes the CDC publication, slot, and metadata that init sets up). The build-side calls were never cleaned up. The redundancy was harmless on vanilla PG. On Spock-replicated clusters it surfaces as SQLSTATE XX000 "tuple concurrently updated": CREATE OR REPLACE FUNCTION unconditionally writes pg_proc, and Spock's DDL apply of n1's BuildMtree replicating to n2 races with n2's BuildMtree running the same statement. The race was masked before the previous commit by the BuildMtree pool.Close deadlock that swallowed the error and hung the process. CREATE TABLE IF NOT EXISTS for ace_mtree_metadata stays — IF NOT EXISTS short-circuits without touching pg_class when the relation already exists, so the same race doesn't apply. --- internal/consistency/mtree/merkle.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index c475294..8f0087b 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -2550,17 +2550,17 @@ func (m *MerkleTreeTask) insertBlockRanges(conn queries.DBQuerier, ranges []type func (m *MerkleTreeTask) createMtreeObjects(tx pgx.Tx, totalRows int64, numBlocks int) error { - err := queries.CreateSchema(m.Ctx, tx, m.aceSchema()) - if err != nil { - return fmt.Errorf("failed to create schema '%s': %w", m.aceSchema(), err) - } - - err = queries.CreateXORFunction(m.Ctx, tx) - if err != nil { - return fmt.Errorf("failed to create xor function: %w", err) - } - - err = queries.CreateMetadataTable(m.Ctx, tx) + // Schema and the bytea_xor function are created by MtreeInit's Phase A + // and are required for BuildMtree to make sense (build needs the CDC + // publication and metadata that init sets up — if init ran, schema and + // XOR function exist). Recreating them here is redundant, and on Spock + // clusters CREATE OR REPLACE FUNCTION unconditionally writes to + // pg_proc, so it races with Spock's DDL apply of the matching + // replicated DDL from n1's own build — producing SQLSTATE XX000 + // "tuple concurrently updated". The CREATE TABLE IF NOT EXISTS below + // is race-safe because IF NOT EXISTS short-circuits when the relation + // already exists. + err := queries.CreateMetadataTable(m.Ctx, tx) if err != nil { return fmt.Errorf("failed to create metadata table: %w", err) } From 93511c1a9d0fe4c2d85d0b5a7c167d7bdfc11a1b Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 24 May 2026 18:50:21 -0700 Subject: [PATCH 5/8] fix: suppress Spock DDL replication and repset auto-add on ACE conns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ACE issues DDL exclusively against its own pgedge_ace schema and the ace_mtree_pub publication — never against user objects — and every piece of that DDL is intentionally per-node. Letting Spock replicate it produces a chain of cross-node interactions: - ace_cdc_metadata gets auto-added to peer nodes' default repsets by Spock's CREATE TABLE event trigger, leaking per-node LSNs cross-node. - CREATE OR REPLACE FUNCTION on pgedge_ace.bytea_xor races on pg_proc when n1's build replicates to n2 while n2's build runs the same statement (SQLSTATE XX000). - n2's SetupPublication DROP+CREATE replicates back to n1, landing in n1's pgoutput stream mid-replay and producing the original SQLSTATE 42704 "publication does not exist" the PR was meant to fix. Set spock.enable_ddl_replication = off and spock.include_ddl_repset = off as connection startup options on every ACE pool. Both are PGC_USERSET in Spock, so connection-level options override cluster defaults. Both are dotted custom GUCs, so PostgreSQL accepts them on vanilla-PG instances without Spock loaded — the parameters become placeholder custom variables with no effect, keeping the dual-mode native-PG path intact. Once this is verified in production, the per-DDL workarounds in earlier commits (post-init repset cleanup sweep, SetSpockRepairMode wrapping at every metadata-write site, ExcludeMetadataFromSpockRepsets itself) all become removable. Leaving them in place for now as defense-in-depth. --- internal/infra/db/auth.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/internal/infra/db/auth.go b/internal/infra/db/auth.go index fe4ac8d..dbe407d 100644 --- a/internal/infra/db/auth.go +++ b/internal/infra/db/auth.go @@ -252,6 +252,22 @@ func applyRuntimeParams(params map[string]string, pgCfg config.PostgresConfig) { if pgCfg.TCPKeepalivesCount != nil { params["tcp_keepalives_count"] = strconv.Itoa(*pgCfg.TCPKeepalivesCount) } + + // Suppress Spock's DDL replication and auto-repset-add behaviour on + // every ACE connection. ACE only issues DDL against its own pgedge_ace + // schema and the ace_mtree_pub publication — never against user + // objects — and all of that DDL is intentionally per-node. Letting + // Spock replicate it produces cross-node races (e.g. CREATE OR REPLACE + // FUNCTION racing on pg_proc) and 42704 "publication does not exist" + // when one node's DROP PUBLICATION + CREATE PUBLICATION lands in + // another node's pgoutput stream mid-replay. + // + // Both GUCs are PGC_USERSET in Spock, so connection-level options + // override any cluster-level default. Both have dotted names, so on + // vanilla PG (no Spock loaded) PostgreSQL accepts them as placeholder + // custom variables and they have no effect — safe in dual-mode. + params["spock.enable_ddl_replication"] = "off" + params["spock.include_ddl_repset"] = "off" } func ensureRuntimeParams(params map[string]string) map[string]string { From ed692d5065a43407d8dca384551cbe3ea16fb3f5 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 24 May 2026 19:28:55 -0700 Subject: [PATCH 6/8] fix: close all node pools in BuildMtree on early return MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit BuildMtree pre-builds a pool per node up front in the row-estimate loop, then hands the populated map to the per-node build loop which closes each pool only as its own iteration completes. If a later iteration errors out and exits, pools for nodes that loop never reached stay open for the rest of the process lifetime — a real leak in HTTP API server invocations, mostly invisible under CLI invocation where the process exits anyway. Add a defer at the top of the function that closes every entry in the pools map on exit. pgxpool.Pool.Close uses sync.Once internally, so the per-iteration closes still take effect promptly and this defer is a no-op for pools already closed by them. Pre-existing leak; the closure refactor for the pool.Close deadlock didn't introduce or remove it. --- internal/consistency/mtree/merkle.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/internal/consistency/mtree/merkle.go b/internal/consistency/mtree/merkle.go index 8f0087b..4ef6300 100644 --- a/internal/consistency/mtree/merkle.go +++ b/internal/consistency/mtree/merkle.go @@ -1330,6 +1330,21 @@ func (m *MerkleTreeTask) BuildMtree() (err error) { cfg := config.Get().MTree.CDC pools := make(map[string]*pgxpool.Pool, len(m.ClusterNodes)) + // Safety net for early returns. The row-estimate loop closes pools + // itself on its error path, and the per-node build loop closes each + // node's pool via its inner deferred close, but neither covers + // pools belonging to nodes the build loop never reaches when an + // earlier iteration errors out. pgxpool.Pool.Close is idempotent + // (sync.Once internally), so the per-iteration closes still take + // effect promptly and this defer is a no-op for pools already closed. + defer func() { + for _, p := range pools { + if p != nil { + p.Close() + } + } + }() + numWorkers := int(math.Ceil(float64(runtime.NumCPU()) * m.MaxCpuRatio * 2)) if numWorkers < 1 { numWorkers = 1 From f21096da9abb5f432cc95bf6794b77f671c7b394 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 24 May 2026 19:31:56 -0700 Subject: [PATCH 7/8] fix: GetCDCMetadata tolerates pre-migration ace_cdc_metadata MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pub_commit_lsn column is added by MtreeInit's Phase A via the additive ALTER TABLE in CreateCDCMetadataTable. On clusters upgraded to this binary without re-running init — say, going straight to mtree build or mtree table-diff — the table still has the old 3-column layout. The previous SELECT referenced pub_commit_lsn as a column, so PostgreSQL raised SQLSTATE 42703 ("column does not exist") before the listen.go fallback for an empty pub_commit_lsn could fire. CDC startup / build aborted instead of warning and continuing. Extract pub_commit_lsn via to_jsonb(row) ->> 'pub_commit_lsn' instead of a direct column reference. The SQL parses against any schema version; on legacy rows the JSON object has no key for the column, ->> returns NULL, COALESCE produces the empty string the listen.go guard already treats as "invariant uncheckable — warn and skip". The next MtreeInit still backfills the real column via the additive ALTER TABLE in CreateCDCMetadataTable. --- db/queries/templates.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/db/queries/templates.go b/db/queries/templates.go index bec9e11..88a2a9f 100644 --- a/db/queries/templates.go +++ b/db/queries/templates.go @@ -276,16 +276,25 @@ var SQLTemplates = Templates{ DROP TABLE IF EXISTS {{aceSchema}}.ace_cdc_metadata `)), + // pub_commit_lsn is extracted via to_jsonb(row) ->> 'pub_commit_lsn' + // instead of a direct column reference so the query parses and runs + // against pre-migration ace_cdc_metadata tables that lack the column. + // On legacy 3-column rows the JSON object has no pub_commit_lsn key, + // ->> returns NULL, and COALESCE produces the empty string the + // listen.go guard treats as "invariant uncheckable, warn and skip". + // The additive ALTER TABLE in CreateCDCMetadataTable still backfills + // the column on the next MtreeInit so post-init reads use the real + // column path. GetCDCMetadata: template.Must(template.New("getCDCMetadata").Funcs(aceTemplateFuncs).Parse(` SELECT - slot_name, - start_lsn, - tables, - COALESCE(pub_commit_lsn, '') AS pub_commit_lsn + m.slot_name, + m.start_lsn, + m.tables, + COALESCE(to_jsonb(m) ->> 'pub_commit_lsn', '') AS pub_commit_lsn FROM - {{aceSchema}}.ace_cdc_metadata + {{aceSchema}}.ace_cdc_metadata AS m WHERE - publication_name = $1 + m.publication_name = $1 `)), UpdateMtreeCounters: template.Must(template.New("updateMtreeCounters").Parse(` From 5766ab508fc7410ad9ae5068202498cd8bcab9ea Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 24 May 2026 19:34:34 -0700 Subject: [PATCH 8/8] docs: include 'ace mtree build' in the 42704 recovery guidance The two recovery messages in processReplicationStream told users to run 'ace mtree teardown' and 'ace mtree init' but stopped there. After init, the merkle tree itself is empty; a subsequent table-diff would either fail or produce incorrect results. 'ace mtree build' is the final step that repopulates the tree and makes table-diff usable again. Updated both messages to spell out the full three-step sequence. --- internal/infra/cdc/listen.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/infra/cdc/listen.go b/internal/infra/cdc/listen.go index 7454e19..f1fd2d4 100644 --- a/internal/infra/cdc/listen.go +++ b/internal/infra/cdc/listen.go @@ -111,7 +111,7 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont // version of ACE that pre-dates this fix; we cannot check the // invariant and fall through with a warning. if pubCommitLSNStr == "" { - logger.Warn("ace_cdc_metadata has no pub_commit_lsn for publication %s; cannot verify slot/publication ordering. If you see SQLSTATE 42704, run 'ace mtree teardown' and 'ace mtree init' to recover", publication) + logger.Warn("ace_cdc_metadata has no pub_commit_lsn for publication %s; cannot verify slot/publication ordering. If you see SQLSTATE 42704, run 'ace mtree teardown', 'ace mtree init', and 'ace mtree build' to recover", publication) } else { pubCommitLSN, err := pglogrepl.ParseLSN(pubCommitLSNStr) if err != nil { @@ -123,7 +123,7 @@ func processReplicationStream(ctx context.Context, nodeInfo map[string]any, cont "ace_cdc_metadata.start_lsn (%s) is older than publication commit LSN (%s) "+ "for slot %s; this typically indicates ace_cdc_metadata was replicated "+ "cross-node by Spock, or the slot/metadata is from a prior init. "+ - "Run 'ace mtree teardown' and 'ace mtree init' to recover", + "Run 'ace mtree teardown', 'ace mtree init', and 'ace mtree build' to recover", startLSN, pubCommitLSN, slotName, ) }