diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index dc08b33..47e6588 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,6 +34,9 @@ jobs: - name: Run Merkle Tree composite pkey tests run: go test -count=1 -v ./tests/integration -run 'TestMerkleTreeCompositePK' + - name: Run mtree regression tests + run: go test -count=1 -v ./tests/integration -run 'TestBuildMtree|TestUpdateMtree|TestMtreeInit|TestACEConn' + - name: Run CDC regression tests run: go test -count=1 -v ./tests/integration -run 'CDC' diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index fb13b5c..d09e448 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/jackc/pglogrepl" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/pgedge/ace/db/queries" "github.com/pgedge/ace/internal/infra/cdc" @@ -316,6 +317,15 @@ func setupCDCTestTable(t *testing.T, ctx context.Context, tableName string) { require.NoError(t, err) _, err = pool.Exec(ctx, fmt.Sprintf("INSERT INTO %s.%s SELECT * FROM %s.customers", testSchema, tableName, testSchema)) require.NoError(t, err) + // Without ANALYZE, GetRowCountEstimate falls back to pg_class.reltuples + // (~0 for a just-INSERTed table), and BuildMtree calculates + // numBlocks = ceil(estimate / blockSize) = 1, producing a degenerate + // 1-leaf tree where root and leaf hashes are identical. ANALYZE + // populates n_live_tup so BuildMtree sees the real row count and + // builds a realistic multi-leaf tree. pgx.Identifier.Sanitize + // avoids raw-string interpolation in SQL. + _, err = pool.Exec(ctx, "ANALYZE "+pgx.Identifier{testSchema, tableName}.Sanitize()) // nosemgrep + require.NoError(t, err) } } @@ -328,3 +338,73 @@ func dropCDCTestTable(t *testing.T, tableName string) { } } } + +// TestGetCDCMetadataLegacySchema simulates a cluster that ran an older ACE +// version: ace_cdc_metadata exists but lacks the pub_commit_lsn column. The +// query must still parse (no SQLSTATE 42703) and return an empty +// pub_commit_lsn via the to_jsonb(m) ->> 'pub_commit_lsn' fallback, and +// cdc.UpdateFromCDC must warn-and-skip rather than fail. Regression guard +// for b5a7bf7. +func TestGetCDCMetadataLegacySchema(t *testing.T) { + ctx := context.Background() + tableName := "customers_cdc_legacy" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + require.NoError(t, mtreeTask.BuildMtree()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + _, _, _, pubCommitBefore, err := getCDCMetadataInTx(ctx, pgCluster.Node1Pool) + require.NoError(t, err) + require.NotEmpty(t, pubCommitBefore, "fresh mtree init should populate pub_commit_lsn") + + // Real upgrades reach this state when an operator upgrades the binary + // against a database initialised by an older ACE that never had the + // column. pgx.Identifier.Sanitize avoids interpolating a config string + // into SQL. + metadataTbl := pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize() + _, err = pgCluster.Node1Pool.Exec(ctx, "ALTER TABLE "+metadataTbl+" DROP COLUMN pub_commit_lsn") // nosemgrep + require.NoError(t, err) + + slot, startLSN, tables, pubCommitAfter, err := getCDCMetadataInTx(ctx, pgCluster.Node1Pool) + require.NoError(t, err, "GetCDCMetadata must tolerate missing pub_commit_lsn column") + require.Equal(t, "", pubCommitAfter, "pub_commit_lsn must be empty on legacy schema") + require.NotEmpty(t, slot, "slot_name should still be returned") + require.NotEmpty(t, startLSN, "start_lsn should still be returned") + require.NotEmpty(t, tables, "tables should still be returned") + + startBefore := metadataStartLSN(t, ctx) + + safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize() + _, err = pgCluster.Node1Pool.Exec(ctx, "UPDATE "+safeTbl+" SET email = email || '.legacy' WHERE index = 1") // nosemgrep + require.NoError(t, err) + + targetFlush := walFlushLSN(t, ctx, pgCluster.Node1Pool) + + nodeInfo := pgCluster.ClusterNodes[0] + require.NoError(t, cdc.UpdateFromCDC(context.Background(), nodeInfo), + "UpdateFromCDC must warn-and-skip past the missing pub_commit_lsn column") + + startAfter := metadataStartLSN(t, ctx) + require.True(t, startAfter > startBefore, "start_lsn should advance even on legacy schema") + require.True(t, startAfter >= targetFlush, "start_lsn should catch up to wal_flush_lsn") +} + +func getCDCMetadataInTx(ctx context.Context, pool *pgxpool.Pool) (string, string, []string, string, error) { + tx, err := pool.Begin(ctx) + if err != nil { + return "", "", nil, "", err + } + defer tx.Rollback(ctx) + return queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) +} diff --git a/tests/integration/main_test.go b/tests/integration/main_test.go index e6374cf..2860c85 100644 --- a/tests/integration/main_test.go +++ b/tests/integration/main_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/docker/go-connections/nat" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/pgedge/ace/pkg/config" "github.com/pgedge/ace/pkg/types" @@ -466,6 +467,17 @@ func setupSharedCustomersTable(tableName string) error { err, ) } + // ANALYZE so pg_class.reltuples / pg_stat_user_tables.n_live_tup + // reflect the real row count before any test reads + // GetRowCountEstimate. Without it, BuildMtree sees ~0 rows on a + // just-loaded table and builds a degenerate 1-leaf tree. + // pgx.Identifier.Sanitize quotes and escapes the schema/table so + // names like customers_1M survive and no SQL is interpolated from + // raw strings. + tbl := pgx.Identifier{testSchema, tableName}.Sanitize() + if _, err := pool.Exec(ctx, "ANALYZE "+tbl); err != nil { // nosemgrep + return fmt.Errorf("failed to ANALYZE %s on node %s: %w", qualifiedTableName, nodeName, err) + } } return nil } diff --git a/tests/integration/merkle_tree_test.go b/tests/integration/merkle_tree_test.go index a273d2d..f1b8bef 100644 --- a/tests/integration/merkle_tree_test.go +++ b/tests/integration/merkle_tree_test.go @@ -28,6 +28,7 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/pgedge/ace/internal/infra/cdc" + "github.com/pgedge/ace/internal/infra/db" "github.com/pgedge/ace/pkg/config" "github.com/pgedge/ace/pkg/types" "github.com/stretchr/testify/require" @@ -1324,3 +1325,548 @@ func replicationSlotExists(t *testing.T, ctx context.Context, pool *pgxpool.Pool require.NoError(t, err) return exists } + +// TestBuildMtreePoolLeakOnNodeError verifies that when BuildMtree errors in +// the middle of its per-node build loop, pools belonging to nodes the loop +// never reached are still closed. The row-estimate loop pre-builds a pool +// per node up front; each iteration's inner defer pool.Close only fires for +// nodes the loop actually iterates over, so pools for later nodes used to +// leak for the rest of the process lifetime on a mid-loop error. Regression +// guard for ff67d12. +func TestBuildMtreePoolLeakOnNodeError(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_leak" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + // Force BuildMtree's first iteration (n1) to fail at GetCDCMetadata, + // so the build loop never reaches n2. n2's pool was created by the + // row-estimate loop above and has no per-iteration defer to close it. + // Without the outer defer added in ff67d12, n2's pool leaks. + metaTbl := pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize() + _, err := pgCluster.Node1Pool.Exec(ctx, "DELETE FROM "+metaTbl+" WHERE publication_name = $1", config.Cfg.MTree.CDC.PublicationName) // nosemgrep + require.NoError(t, err) + + // Anchor the baseline at 0 by waiting for MtreeInit's n2 backend to + // fully exit pg_stat_activity. Without this wait the baseline can + // race with the closing MtreeInit conn — a regression that leaks a + // single n2 conn would then match the racy baseline and pass. + require.Eventually(t, func() bool { + return countACEConnections(t, ctx, pgCluster.Node2Pool) == 0 + }, 5*time.Second, 100*time.Millisecond, + "n2 ACE connections from MtreeInit should drain to 0 before measuring the leak baseline") + + err = mtreeTask.BuildMtree() + require.Error(t, err, "BuildMtree should fail when n1 metadata is missing") + require.Contains(t, err.Error(), "cdc metadata", "error should surface from GetCDCMetadata on n1") + + // pg_stat_activity reflects backend disconnects asynchronously; poll + // with a generous timeout so a slow runner doesn't flake. Anchored to + // the same 0 baseline so a single leaked conn surfaces. + require.Eventually(t, func() bool { + return countACEConnections(t, ctx, pgCluster.Node2Pool) == 0 + }, 5*time.Second, 100*time.Millisecond, + "n2 pool leaked after BuildMtree errored on n1 iter (ff67d12 regressed)") +} + +func countACEConnections(t *testing.T, ctx context.Context, pool *pgxpool.Pool) int { + t.Helper() + var n int + err := pool.QueryRow(ctx, ` + SELECT count(*) FROM pg_stat_activity + WHERE application_name = 'ACE' AND datname = $1`, dbName).Scan(&n) + require.NoError(t, err) + return n +} + +// TestBuildMtreeAvoidsRedundantDDL verifies that BuildMtree does not re-emit +// CREATE SCHEMA or CREATE OR REPLACE FUNCTION bytea_xor — these are owned +// by MtreeInit's Phase A, and redundant emission from build races with +// Spock DDL apply (SQLSTATE XX000 "tuple concurrently updated") on +// replicated clusters. We assert this catalog-side: pg_namespace.xmin and +// pg_proc.xmin for the offending objects must be unchanged across a +// BuildMtree call, since CREATE OR REPLACE FUNCTION unconditionally +// rewrites pg_proc (bumping xmin) and CREATE SCHEMA (without IF NOT EXISTS +// short-circuit miss) would do the same for pg_namespace. Regression guard +// for 2752ea5. +func TestBuildMtreeAvoidsRedundantDDL(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_idempotent" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + aceSchema := config.Cfg.MTree.Schema + + schemaXminBefore := scanString(t, ctx, pgCluster.Node1Pool, + `SELECT xmin::text FROM pg_namespace WHERE nspname = $1`, aceSchema) + funcXminBefore := scanString(t, ctx, pgCluster.Node1Pool, + `SELECT p.xmin::text FROM pg_proc p + JOIN pg_namespace n ON n.oid = p.pronamespace + WHERE n.nspname = $1 AND p.proname = 'bytea_xor'`, aceSchema) + require.NotEmpty(t, schemaXminBefore, "ace schema should exist after MtreeInit") + require.NotEmpty(t, funcXminBefore, "bytea_xor should exist after MtreeInit") + + require.NoError(t, mtreeTask.BuildMtree()) + + schemaXminAfter := scanString(t, ctx, pgCluster.Node1Pool, + `SELECT xmin::text FROM pg_namespace WHERE nspname = $1`, aceSchema) + funcXminAfter := scanString(t, ctx, pgCluster.Node1Pool, + `SELECT p.xmin::text FROM pg_proc p + JOIN pg_namespace n ON n.oid = p.pronamespace + WHERE n.nspname = $1 AND p.proname = 'bytea_xor'`, aceSchema) + + require.Equal(t, schemaXminBefore, schemaXminAfter, + "BuildMtree must not re-CREATE the ace schema") + require.Equal(t, funcXminBefore, funcXminAfter, + "BuildMtree must not CREATE OR REPLACE bytea_xor (races with Spock DDL apply)") +} + +func scanString(t *testing.T, ctx context.Context, pool *pgxpool.Pool, query string, args ...any) string { + t.Helper() + var s string + err := pool.QueryRow(ctx, query, args...).Scan(&s) + require.NoError(t, err) + return s +} + +// TestUpdateMtreeReflectsInserts exercises the full UpdateMtree pipeline +// end-to-end: CDC drain picks up new WAL, counters mark affected leaves +// dirty, UpdateMtree recomputes those leaves and rolls the change up to +// the root. Closes a complete-zero-coverage gap on UpdateMtree +// (merkle.go:1560), which until now was only exercised as a black box by +// the CDC drain tests. +func TestUpdateMtreeReflectsInserts(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_update" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + require.NoError(t, mtreeTask.BuildMtree()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + mtreeTable := pgx.Identifier{ + config.Cfg.MTree.Schema, + fmt.Sprintf("ace_mtree_%s_%s", testSchema, tableName), + }.Sanitize() + + rootBefore := rootNodeHash(t, ctx, mtreeTable) + require.NotEmpty(t, rootBefore, "BuildMtree should populate the root hash") + require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), + "no leaves should be dirty immediately after BuildMtree") + + nodesBefore := mtreeNodeHashes(t, ctx, mtreeTable) + require.Greater(t, len(nodesBefore), 2, + "expected a multi-level tree (>2 nodes); a single-leaf tree means ANALYZE didn't take effect and parent-rollup isn't exercised") + + safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize() + var maxIndex int64 + require.NoError(t, pgCluster.Node1Pool.QueryRow(ctx, + "SELECT COALESCE(max(index), 0) FROM "+safeTbl). // nosemgrep + Scan(&maxIndex)) + + _, err := pgCluster.Node1Pool.Exec(ctx, // nosemgrep + "INSERT INTO "+safeTbl+" (index, first_name, last_name, email) "+ + "SELECT $1 + g, 'Mtree', 'Update', 'mtree-update-' || g || '@test.com' "+ + "FROM generate_series(1, 25) g", maxIndex) + require.NoError(t, err) + + require.NoError(t, mtreeTask.UpdateMtree(false)) + + rootAfter := rootNodeHash(t, ctx, mtreeTable) + require.NotEqual(t, rootBefore, rootAfter, + "root node_hash must change after inserts are absorbed") + require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), + "no leaves should remain dirty after UpdateMtree") + require.Equal(t, int64(0), pendingInsertCounter(t, ctx, mtreeTable), + "inserts_since_tree_update should reset to 0 after UpdateMtree") + + // Multi-level rollup proof: at least one descendant of root must also + // have changed. A single-leaf tree would have only root changing. + nodesAfter := mtreeNodeHashes(t, ctx, mtreeTable) + require.GreaterOrEqual(t, countChangedNodes(nodesBefore, nodesAfter), 2, + "expected change to propagate to root AND at least one ancestor node") +} + +// mtreeTable arguments to these helpers must be sanitized via +// pgx.Identifier{...}.Sanitize() at the call site so the interpolation +// below is safe; the // nosemgrep markers tell the static analyzer that +// we've enforced the contract. +func rootNodeHash(t *testing.T, ctx context.Context, mtreeTable string) []byte { + t.Helper() + var h []byte + err := pgCluster.Node1Pool.QueryRow(ctx, // nosemgrep + "SELECT node_hash FROM "+mtreeTable+ + " WHERE node_level = (SELECT max(node_level) FROM "+mtreeTable+")"+ + " AND node_position = 0").Scan(&h) + require.NoError(t, err) + return h +} + +func dirtyLeafCount(t *testing.T, ctx context.Context, mtreeTable string) int { + t.Helper() + var n int + err := pgCluster.Node1Pool.QueryRow(ctx, // nosemgrep + "SELECT count(*) FROM "+mtreeTable+" WHERE node_level = 0 AND dirty = true").Scan(&n) + require.NoError(t, err) + return n +} + +func pendingInsertCounter(t *testing.T, ctx context.Context, mtreeTable string) int64 { + t.Helper() + var n int64 + err := pgCluster.Node1Pool.QueryRow(ctx, // nosemgrep + "SELECT COALESCE(sum(inserts_since_tree_update), 0) FROM "+mtreeTable+ + " WHERE node_level = 0").Scan(&n) + require.NoError(t, err) + return n +} + +func pendingDeleteCounter(t *testing.T, ctx context.Context, mtreeTable string) int64 { + t.Helper() + var n int64 + err := pgCluster.Node1Pool.QueryRow(ctx, // nosemgrep + "SELECT COALESCE(sum(deletes_since_tree_update), 0) FROM "+mtreeTable+ + " WHERE node_level = 0").Scan(&n) + require.NoError(t, err) + return n +} + +// mtreeNodeHashes snapshots every node in the mtree leaf table keyed by +// "level:position" with the hex-encoded node_hash. Snapshotting both sides +// of an UpdateMtree lets us verify the change propagated past root to at +// least one internal node — otherwise a degenerate single-leaf tree (e.g. +// missing ANALYZE) would let the root-hash-changed assertion pass without +// exercising parent rollup. +func mtreeNodeHashes(t *testing.T, ctx context.Context, mtreeTable string) map[string]string { + t.Helper() + rows, err := pgCluster.Node1Pool.Query(ctx, // nosemgrep + "SELECT node_level, node_position, encode(node_hash, 'hex') FROM "+mtreeTable+ + " ORDER BY node_level, node_position") + require.NoError(t, err) + defer rows.Close() + + out := map[string]string{} + for rows.Next() { + var level int + var pos int64 + var hash string + require.NoError(t, rows.Scan(&level, &pos, &hash)) + out[fmt.Sprintf("%d:%d", level, pos)] = hash + } + require.NoError(t, rows.Err()) + return out +} + +func countChangedNodes(before, after map[string]string) int { + n := 0 + for k, beforeHash := range before { + if afterHash, ok := after[k]; ok && afterHash != beforeHash { + n++ + } + } + return n +} + +// TestUpdateMtreeReflectsDeletes covers the deletion path through the +// UpdateMtree pipeline: deletes_since_tree_update counters increment via +// CDC, the affected leaves are recomputed, and the counters reset. +func TestUpdateMtreeReflectsDeletes(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_delete" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + require.NoError(t, mtreeTask.BuildMtree()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + mtreeTable := pgx.Identifier{ + config.Cfg.MTree.Schema, + fmt.Sprintf("ace_mtree_%s_%s", testSchema, tableName), + }.Sanitize() + + rootBefore := rootNodeHash(t, ctx, mtreeTable) + require.NotEmpty(t, rootBefore) + nodesBefore := mtreeNodeHashes(t, ctx, mtreeTable) + require.Greater(t, len(nodesBefore), 2, + "expected a multi-level tree (>2 nodes); ANALYZE may not have run") + + safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize() + res, err := pgCluster.Node1Pool.Exec(ctx, // nosemgrep + "DELETE FROM "+safeTbl+" WHERE index BETWEEN 100 AND 124") + require.NoError(t, err) + require.Equal(t, int64(25), res.RowsAffected(), "expected to delete 25 customer rows") + + require.NoError(t, mtreeTask.UpdateMtree(false)) + + rootAfter := rootNodeHash(t, ctx, mtreeTable) + require.NotEqual(t, rootBefore, rootAfter, + "root node_hash must change after deletes are absorbed") + require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), + "no leaves should remain dirty after UpdateMtree") + require.Equal(t, int64(0), pendingDeleteCounter(t, ctx, mtreeTable), + "deletes_since_tree_update should reset to 0 after UpdateMtree") + + nodesAfter := mtreeNodeHashes(t, ctx, mtreeTable) + require.GreaterOrEqual(t, countChangedNodes(nodesBefore, nodesAfter), 2, + "expected change to propagate to root AND at least one ancestor node") +} + +// TestACEConnSuppressesSpockDDLReplication verifies that every connection +// pool opened via the ACE auth layer applies the spock.enable_ddl_replication +// and spock.include_ddl_repset = off RuntimeParams. These suppress Spock's +// cross-node DDL apply and repset auto-add, which previously leaked the +// ace_cdc_metadata table into the default repset and races CREATE OR +// REPLACE FUNCTION across nodes. Regression guard for 113b0bc. +// +// We require Spock to actually be installed: without it, PostgreSQL accepts +// the dotted-name GUCs as placeholder customs and SHOW returns 'off' +// regardless, which would let this test silently pass on a misconfigured +// (Spock-less) cluster. +func TestACEConnSuppressesSpockDDLReplication(t *testing.T) { + if !newSpockEnv().HasSpock { + t.Skip("requires Spock-enabled cluster") + } + + ctx := context.Background() + + pool, err := auth.GetClusterNodeConnection(ctx, pgCluster.ClusterNodes[0], auth.ConnectionOptions{}) + require.NoError(t, err) + defer pool.Close() + + var spockInstalled bool + require.NoError(t, pool.QueryRow(ctx, + `SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'spock')`).Scan(&spockInstalled)) + require.True(t, spockInstalled, + "Spock extension must be installed; the suppression GUCs are no-ops without it") + + var ddlRepl, ddlRepset string + require.NoError(t, pool.QueryRow(ctx, "SHOW spock.enable_ddl_replication").Scan(&ddlRepl)) + require.NoError(t, pool.QueryRow(ctx, "SHOW spock.include_ddl_repset").Scan(&ddlRepset)) + + require.Equal(t, "off", ddlRepl, + "ACE pool must boot with spock.enable_ddl_replication=off") + require.Equal(t, "off", ddlRepset, + "ACE pool must boot with spock.include_ddl_repset=off") +} + +// TestMtreeInitReapsOrphanedSlot exercises the slot-leak recovery path +// described at merkle.go:893-901. If a prior MtreeInit died between +// Phase B (slot creation) and Phase C (metadata write), the slot is +// orphaned with no metadata row pointing at it. SetupReplicationSlot +// (cdc/setup.go:53) must drop the orphan before creating a fresh slot, +// otherwise re-init fails with "slot already exists" and the user is +// stuck. We simulate the orphan with pg_create_logical_replication_slot +// and assert the next MtreeInit succeeds. +func TestMtreeInitReapsOrphanedSlot(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_orphan" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + slotName := config.Cfg.MTree.CDC.SlotName + + _, err := pgCluster.Node1Pool.Exec(ctx, + "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slotName) + require.NoError(t, err, "could not seed an orphan replication slot") + require.True(t, + replicationSlotExists(t, ctx, pgCluster.Node1Pool, slotName), + "orphan slot should be present before MtreeInit") + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + require.NoError(t, mtreeTask.MtreeInit(), + "MtreeInit must reap the orphan slot rather than fail with slot-exists") + + require.True(t, + replicationSlotExists(t, ctx, pgCluster.Node1Pool, slotName), + "a fresh slot should exist after MtreeInit") + + var restartLSN string + err = pgCluster.Node1Pool.QueryRow(ctx, + "SELECT restart_lsn::text FROM pg_replication_slots WHERE slot_name = $1", + slotName).Scan(&restartLSN) + require.NoError(t, err) + require.NotEmpty(t, restartLSN, "fresh slot should have a restart_lsn") +} + +// TestBuildMtreeConcurrentSpockNodes reproduces the original production +// scenario for 2752ea5 + 113b0bc: BuildMtree fired concurrently from two +// nodes of a Spock cluster. Pre-fix, n1's CREATE OR REPLACE FUNCTION +// bytea_xor was caught by Spock's DDL replication, sent to n2, and +// raced n2's own identical DDL — producing SQLSTATE XX000 "tuple +// concurrently updated" on pg_proc. The fix removed the DDL from +// BuildMtree entirely and additionally set spock.enable_ddl_replication +// =off on all ACE connections. We assert that two concurrent +// single-node BuildMtree calls both succeed, and that +// ace_cdc_metadata was not auto-added to a Spock replication set — +// the latter being the second symptom of 113b0bc. +func TestBuildMtreeConcurrentSpockNodes(t *testing.T) { + env := newSpockEnv() + if !env.HasSpock { + t.Skip("requires Spock-enabled cluster") + } + + ctx := context.Background() + tableName := "customers_mtree_concurrent" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + initTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, initTask.RunChecks(false)) + require.NoError(t, initTask.MtreeInit()) + + t.Cleanup(func() { + if err := initTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + taskN1 := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + taskN2 := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN2}) + require.NoError(t, taskN1.RunChecks(false)) + require.NoError(t, taskN2.RunChecks(false)) + + var wg sync.WaitGroup + var err1, err2 error + wg.Add(2) + go func() { + defer wg.Done() + err1 = taskN1.BuildMtree() + }() + go func() { + defer wg.Done() + err2 = taskN2.BuildMtree() + }() + wg.Wait() + + require.NoError(t, err1, "BuildMtree on n1 must not race with concurrent build on n2") + require.NoError(t, err2, "BuildMtree on n2 must not race with concurrent build on n1") + + // spock.tables lists every Spock-eligible table; set_name IS NULL means + // the table is NOT in any replication set. User tables (customers, + // customers_1M) appear here with set_name NULL on this test cluster + // because spock.enable_ddl_replication defaults to off — so a plain + // EXISTS check would false-positive against any table at all. + for i, pool := range []*pgxpool.Pool{pgCluster.Node1Pool, pgCluster.Node2Pool} { + var inRepset bool + err := pool.QueryRow(ctx, ` + SELECT EXISTS ( + SELECT 1 FROM spock.tables + WHERE nspname = $1 AND relname = 'ace_cdc_metadata' + AND set_name IS NOT NULL + )`, config.Cfg.MTree.Schema).Scan(&inRepset) + require.NoError(t, err, "node%d: query spock.tables", i+1) + require.False(t, inRepset, + "node%d: ace_cdc_metadata must not be auto-added to a Spock repset", i+1) + } +} + +// TestUpdateMtreeReflectsUpdates covers the UPDATE path: a non-PK column +// change must still flip the row hash, mark the containing leaf dirty, +// and propagate the new leaf hash up to the root. The schema has no +// updates_since_tree_update counter (only insert/delete), so we only +// assert the externally observable invariants: root hash changes, no +// leaves remain dirty. +func TestUpdateMtreeReflectsUpdates(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_upd" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + require.NoError(t, mtreeTask.BuildMtree()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + mtreeTable := pgx.Identifier{ + config.Cfg.MTree.Schema, + fmt.Sprintf("ace_mtree_%s_%s", testSchema, tableName), + }.Sanitize() + + rootBefore := rootNodeHash(t, ctx, mtreeTable) + require.NotEmpty(t, rootBefore) + nodesBefore := mtreeNodeHashes(t, ctx, mtreeTable) + require.Greater(t, len(nodesBefore), 2, + "expected a multi-level tree (>2 nodes); ANALYZE may not have run") + + safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize() + res, err := pgCluster.Node1Pool.Exec(ctx, // nosemgrep + "UPDATE "+safeTbl+" SET email = email || '.updated' WHERE index BETWEEN 200 AND 224") + require.NoError(t, err) + require.Equal(t, int64(25), res.RowsAffected(), "expected to update 25 customer rows") + + require.NoError(t, mtreeTask.UpdateMtree(false)) + + rootAfter := rootNodeHash(t, ctx, mtreeTable) + require.NotEqual(t, rootBefore, rootAfter, + "root node_hash must change after non-PK column updates") + require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), + "no leaves should remain dirty after UpdateMtree") + + nodesAfter := mtreeNodeHashes(t, ctx, mtreeTable) + require.GreaterOrEqual(t, countChangedNodes(nodesBefore, nodesAfter), 2, + "expected change to propagate to root AND at least one ancestor node") +}