From 2034ce82ef2d8b8172028a388deaf80f158fd2db Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Thu, 28 May 2026 15:07:40 -0700 Subject: [PATCH 1/4] test: regression coverage for recent mtree fixes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add integration tests guarding the recent mtree bug fixes that landed without dedicated coverage, plus close a zero-coverage gap on UpdateMtree. Wire them into the CI workflow under a new "mtree regression tests" step. Tests added (all in tests/integration/, package integration): * TestGetCDCMetadataLegacySchema — simulates a pre-migration cluster whose ace_cdc_metadata lacks pub_commit_lsn. The to_jsonb(m) ->> fallback must return empty rather than SQLSTATE 42703, and UpdateFromCDC must warn-and-skip rather than fail. Guards b5a7bf7. * TestBuildMtreePoolLeakOnNodeError — forces the first per-node iteration of BuildMtree to fail at GetCDCMetadata so the build loop never reaches n2, then asserts via pg_stat_activity that n2's ACE-flagged connections return to 0. The baseline is anchored with require.Eventually so a still-draining MtreeInit conn can't mask a regression. Guards ff67d12. * TestBuildMtreeAvoidsRedundantDDL — snapshots pg_namespace.xmin and pg_proc.xmin for pgedge_ace and bytea_xor across a BuildMtree call; CREATE OR REPLACE FUNCTION unconditionally bumps pg_proc.xmin, so unchanged xmin proves BuildMtree no longer recreates them. Catalog-side guard for 2752ea5 (vanilla-PG, deterministic). * TestBuildMtreeConcurrentSpockNodes — fires two single-node BuildMtree calls concurrently against the Spock cluster, asserting both succeed (no XX000 "tuple concurrently updated") and that ace_cdc_metadata is not in any actual Spock replication set (set_name IS NOT NULL). End-to-end guard for 2752ea5 + 113b0bc. * TestACEConnSuppressesSpockDDLReplication — opens a pool via auth.GetClusterNodeConnection and asserts SHOW returns 'off' for both spock.enable_ddl_replication and spock.include_ddl_repset. Includes a pg_extension probe so the test fails loudly on a Spock-less cluster instead of silently passing on PG's placeholder-GUC behavior. Guards 113b0bc. * TestMtreeInitReapsOrphanedSlot — seeds an orphan pg_create_logical_replication_slot then asserts MtreeInit succeeds and leaves a fresh slot with a valid restart_lsn. Exercises the drop-then-create recovery path at cdc/setup.go:53 referenced by the slot-leak mitigation comment in merkle.go:893-901. * TestUpdateMtreeReflectsInserts / Deletes / Updates — close the zero-coverage gap on UpdateMtree (merkle.go:1560). Each builds a tree, performs DML, runs UpdateMtree, then asserts root node_hash changed, no leaves remain dirty, and the relevant CDC counter reset. CI: new "Run mtree regression tests" step runs go test with 'TestBuildMtree|TestUpdateMtree|TestMtreeInit|TestACEConn' so future mtree regression tests under these prefixes are picked up automatically. TestGetCDCMetadataLegacySchema already matches the existing CDC step. --- .github/workflows/test.yml | 3 + tests/integration/cdc_busy_table_test.go | 70 ++++ tests/integration/merkle_tree_test.go | 476 +++++++++++++++++++++++ 3 files changed, 549 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index dc08b33..47e6588 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -34,6 +34,9 @@ jobs: - name: Run Merkle Tree composite pkey tests run: go test -count=1 -v ./tests/integration -run 'TestMerkleTreeCompositePK' + - name: Run mtree regression tests + run: go test -count=1 -v ./tests/integration -run 'TestBuildMtree|TestUpdateMtree|TestMtreeInit|TestACEConn' + - name: Run CDC regression tests run: go test -count=1 -v ./tests/integration -run 'CDC' diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index fb13b5c..037dafa 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -328,3 +328,73 @@ func dropCDCTestTable(t *testing.T, tableName string) { } } } + +// TestGetCDCMetadataLegacySchema simulates a cluster that ran an older ACE +// version: ace_cdc_metadata exists but lacks the pub_commit_lsn column. The +// query must still parse (no SQLSTATE 42703) and return an empty +// pub_commit_lsn via the to_jsonb(m) ->> 'pub_commit_lsn' fallback, and +// cdc.UpdateFromCDC must warn-and-skip rather than fail. Regression guard +// for b5a7bf7. +func TestGetCDCMetadataLegacySchema(t *testing.T) { + ctx := context.Background() + tableName := "customers_cdc_legacy" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + require.NoError(t, mtreeTask.BuildMtree()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + _, _, _, pubCommitBefore, err := getCDCMetadataInTx(ctx, pgCluster.Node1Pool) + require.NoError(t, err) + require.NotEmpty(t, pubCommitBefore, "fresh mtree init should populate pub_commit_lsn") + + // Real upgrades reach this state when an operator upgrades the binary + // against a database initialised by an older ACE that never had the + // column. + _, err = pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( + "ALTER TABLE %s.ace_cdc_metadata DROP COLUMN pub_commit_lsn", + config.Cfg.MTree.Schema)) + require.NoError(t, err) + + slot, startLSN, tables, pubCommitAfter, err := getCDCMetadataInTx(ctx, pgCluster.Node1Pool) + require.NoError(t, err, "GetCDCMetadata must tolerate missing pub_commit_lsn column") + require.Equal(t, "", pubCommitAfter, "pub_commit_lsn must be empty on legacy schema") + require.NotEmpty(t, slot, "slot_name should still be returned") + require.NotEmpty(t, startLSN, "start_lsn should still be returned") + require.NotEmpty(t, tables, "tables should still be returned") + + startBefore := metadataStartLSN(t, ctx) + + _, err = pgCluster.Node1Pool.Exec(ctx, + fmt.Sprintf("UPDATE %s SET email = email || '.legacy' WHERE index = 1", qualifiedTableName)) + require.NoError(t, err) + + targetFlush := walFlushLSN(t, ctx, pgCluster.Node1Pool) + + nodeInfo := pgCluster.ClusterNodes[0] + require.NoError(t, cdc.UpdateFromCDC(context.Background(), nodeInfo), + "UpdateFromCDC must warn-and-skip past the missing pub_commit_lsn column") + + startAfter := metadataStartLSN(t, ctx) + require.True(t, startAfter > startBefore, "start_lsn should advance even on legacy schema") + require.True(t, startAfter >= targetFlush, "start_lsn should catch up to wal_flush_lsn") +} + +func getCDCMetadataInTx(ctx context.Context, pool *pgxpool.Pool) (string, string, []string, string, error) { + tx, err := pool.Begin(ctx) + if err != nil { + return "", "", nil, "", err + } + defer tx.Rollback(ctx) + return queries.GetCDCMetadata(ctx, tx, config.Cfg.MTree.CDC.PublicationName) +} diff --git a/tests/integration/merkle_tree_test.go b/tests/integration/merkle_tree_test.go index a273d2d..77e7da7 100644 --- a/tests/integration/merkle_tree_test.go +++ b/tests/integration/merkle_tree_test.go @@ -28,6 +28,7 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" "github.com/pgedge/ace/internal/infra/cdc" + "github.com/pgedge/ace/internal/infra/db" "github.com/pgedge/ace/pkg/config" "github.com/pgedge/ace/pkg/types" "github.com/stretchr/testify/require" @@ -1324,3 +1325,478 @@ func replicationSlotExists(t *testing.T, ctx context.Context, pool *pgxpool.Pool require.NoError(t, err) return exists } + +// TestBuildMtreePoolLeakOnNodeError verifies that when BuildMtree errors in +// the middle of its per-node build loop, pools belonging to nodes the loop +// never reached are still closed. The row-estimate loop pre-builds a pool +// per node up front; each iteration's inner defer pool.Close only fires for +// nodes the loop actually iterates over, so pools for later nodes used to +// leak for the rest of the process lifetime on a mid-loop error. Regression +// guard for ff67d12. +func TestBuildMtreePoolLeakOnNodeError(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_leak" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + // Force BuildMtree's first iteration (n1) to fail at GetCDCMetadata, + // so the build loop never reaches n2. n2's pool was created by the + // row-estimate loop above and has no per-iteration defer to close it. + // Without the outer defer added in ff67d12, n2's pool leaks. + _, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( + "DELETE FROM %s.ace_cdc_metadata WHERE publication_name = $1", + config.Cfg.MTree.Schema), config.Cfg.MTree.CDC.PublicationName) + require.NoError(t, err) + + // Anchor the baseline at 0 by waiting for MtreeInit's n2 backend to + // fully exit pg_stat_activity. Without this wait the baseline can + // race with the closing MtreeInit conn — a regression that leaks a + // single n2 conn would then match the racy baseline and pass. + require.Eventually(t, func() bool { + return countACEConnections(t, ctx, pgCluster.Node2Pool) == 0 + }, 5*time.Second, 100*time.Millisecond, + "n2 ACE connections from MtreeInit should drain to 0 before measuring the leak baseline") + + err = mtreeTask.BuildMtree() + require.Error(t, err, "BuildMtree should fail when n1 metadata is missing") + require.Contains(t, err.Error(), "cdc metadata", "error should surface from GetCDCMetadata on n1") + + // pg_stat_activity reflects backend disconnects asynchronously; poll + // with a generous timeout so a slow runner doesn't flake. Anchored to + // the same 0 baseline so a single leaked conn surfaces. + require.Eventually(t, func() bool { + return countACEConnections(t, ctx, pgCluster.Node2Pool) == 0 + }, 5*time.Second, 100*time.Millisecond, + "n2 pool leaked after BuildMtree errored on n1 iter (ff67d12 regressed)") +} + +func countACEConnections(t *testing.T, ctx context.Context, pool *pgxpool.Pool) int { + t.Helper() + var n int + err := pool.QueryRow(ctx, ` + SELECT count(*) FROM pg_stat_activity + WHERE application_name = 'ACE' AND datname = $1`, dbName).Scan(&n) + require.NoError(t, err) + return n +} + +// TestBuildMtreeAvoidsRedundantDDL verifies that BuildMtree does not re-emit +// CREATE SCHEMA or CREATE OR REPLACE FUNCTION bytea_xor — these are owned +// by MtreeInit's Phase A, and redundant emission from build races with +// Spock DDL apply (SQLSTATE XX000 "tuple concurrently updated") on +// replicated clusters. We assert this catalog-side: pg_namespace.xmin and +// pg_proc.xmin for the offending objects must be unchanged across a +// BuildMtree call, since CREATE OR REPLACE FUNCTION unconditionally +// rewrites pg_proc (bumping xmin) and CREATE SCHEMA (without IF NOT EXISTS +// short-circuit miss) would do the same for pg_namespace. Regression guard +// for 2752ea5. +func TestBuildMtreeAvoidsRedundantDDL(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_idempotent" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + aceSchema := config.Cfg.MTree.Schema + + schemaXminBefore := scanString(t, ctx, pgCluster.Node1Pool, + `SELECT xmin::text FROM pg_namespace WHERE nspname = $1`, aceSchema) + funcXminBefore := scanString(t, ctx, pgCluster.Node1Pool, + `SELECT p.xmin::text FROM pg_proc p + JOIN pg_namespace n ON n.oid = p.pronamespace + WHERE n.nspname = $1 AND p.proname = 'bytea_xor'`, aceSchema) + require.NotEmpty(t, schemaXminBefore, "ace schema should exist after MtreeInit") + require.NotEmpty(t, funcXminBefore, "bytea_xor should exist after MtreeInit") + + require.NoError(t, mtreeTask.BuildMtree()) + + schemaXminAfter := scanString(t, ctx, pgCluster.Node1Pool, + `SELECT xmin::text FROM pg_namespace WHERE nspname = $1`, aceSchema) + funcXminAfter := scanString(t, ctx, pgCluster.Node1Pool, + `SELECT p.xmin::text FROM pg_proc p + JOIN pg_namespace n ON n.oid = p.pronamespace + WHERE n.nspname = $1 AND p.proname = 'bytea_xor'`, aceSchema) + + require.Equal(t, schemaXminBefore, schemaXminAfter, + "BuildMtree must not re-CREATE the ace schema") + require.Equal(t, funcXminBefore, funcXminAfter, + "BuildMtree must not CREATE OR REPLACE bytea_xor (races with Spock DDL apply)") +} + +func scanString(t *testing.T, ctx context.Context, pool *pgxpool.Pool, query string, args ...any) string { + t.Helper() + var s string + err := pool.QueryRow(ctx, query, args...).Scan(&s) + require.NoError(t, err) + return s +} + +// TestUpdateMtreeReflectsInserts exercises the full UpdateMtree pipeline +// end-to-end: CDC drain picks up new WAL, counters mark affected leaves +// dirty, UpdateMtree recomputes those leaves and rolls the change up to +// the root. Closes a complete-zero-coverage gap on UpdateMtree +// (merkle.go:1560), which until now was only exercised as a black box by +// the CDC drain tests. +func TestUpdateMtreeReflectsInserts(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_update" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + require.NoError(t, mtreeTask.BuildMtree()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + mtreeTable := fmt.Sprintf("%s.ace_mtree_%s_%s", + config.Cfg.MTree.Schema, testSchema, tableName) + + rootBefore := rootNodeHash(t, ctx, mtreeTable) + require.NotEmpty(t, rootBefore, "BuildMtree should populate the root hash") + require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), + "no leaves should be dirty immediately after BuildMtree") + + var maxIndex int64 + require.NoError(t, pgCluster.Node1Pool.QueryRow(ctx, + fmt.Sprintf("SELECT COALESCE(max(index), 0) FROM %s", qualifiedTableName)). + Scan(&maxIndex)) + + _, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf(` + INSERT INTO %s (index, first_name, last_name, email) + SELECT $1 + g, 'Mtree', 'Update', 'mtree-update-' || g || '@test.com' + FROM generate_series(1, 25) g`, qualifiedTableName), maxIndex) + require.NoError(t, err) + + require.NoError(t, mtreeTask.UpdateMtree(false)) + + rootAfter := rootNodeHash(t, ctx, mtreeTable) + require.NotEqual(t, rootBefore, rootAfter, + "root node_hash must change after inserts are absorbed") + require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), + "no leaves should remain dirty after UpdateMtree") + require.Equal(t, int64(0), pendingInsertCounter(t, ctx, mtreeTable), + "inserts_since_tree_update should reset to 0 after UpdateMtree") +} + +func rootNodeHash(t *testing.T, ctx context.Context, mtreeTable string) []byte { + t.Helper() + var h []byte + err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf(` + SELECT node_hash FROM %s + WHERE node_level = (SELECT max(node_level) FROM %s) + AND node_position = 0`, mtreeTable, mtreeTable)).Scan(&h) + require.NoError(t, err) + return h +} + +func dirtyLeafCount(t *testing.T, ctx context.Context, mtreeTable string) int { + t.Helper() + var n int + err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf(` + SELECT count(*) FROM %s WHERE node_level = 0 AND dirty = true`, + mtreeTable)).Scan(&n) + require.NoError(t, err) + return n +} + +func pendingInsertCounter(t *testing.T, ctx context.Context, mtreeTable string) int64 { + t.Helper() + var n int64 + err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf(` + SELECT COALESCE(sum(inserts_since_tree_update), 0) FROM %s + WHERE node_level = 0`, mtreeTable)).Scan(&n) + require.NoError(t, err) + return n +} + +func pendingDeleteCounter(t *testing.T, ctx context.Context, mtreeTable string) int64 { + t.Helper() + var n int64 + err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf(` + SELECT COALESCE(sum(deletes_since_tree_update), 0) FROM %s + WHERE node_level = 0`, mtreeTable)).Scan(&n) + require.NoError(t, err) + return n +} + +// TestUpdateMtreeReflectsDeletes covers the deletion path through the +// UpdateMtree pipeline: deletes_since_tree_update counters increment via +// CDC, the affected leaves are recomputed, and the counters reset. +func TestUpdateMtreeReflectsDeletes(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_delete" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + require.NoError(t, mtreeTask.BuildMtree()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + mtreeTable := fmt.Sprintf("%s.ace_mtree_%s_%s", + config.Cfg.MTree.Schema, testSchema, tableName) + + rootBefore := rootNodeHash(t, ctx, mtreeTable) + require.NotEmpty(t, rootBefore) + + res, err := pgCluster.Node1Pool.Exec(ctx, + fmt.Sprintf("DELETE FROM %s WHERE index BETWEEN 100 AND 124", qualifiedTableName)) + require.NoError(t, err) + require.Equal(t, int64(25), res.RowsAffected(), "expected to delete 25 customer rows") + + require.NoError(t, mtreeTask.UpdateMtree(false)) + + rootAfter := rootNodeHash(t, ctx, mtreeTable) + require.NotEqual(t, rootBefore, rootAfter, + "root node_hash must change after deletes are absorbed") + require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), + "no leaves should remain dirty after UpdateMtree") + require.Equal(t, int64(0), pendingDeleteCounter(t, ctx, mtreeTable), + "deletes_since_tree_update should reset to 0 after UpdateMtree") +} + +// TestACEConnSuppressesSpockDDLReplication verifies that every connection +// pool opened via the ACE auth layer applies the spock.enable_ddl_replication +// and spock.include_ddl_repset = off RuntimeParams. These suppress Spock's +// cross-node DDL apply and repset auto-add, which previously leaked the +// ace_cdc_metadata table into the default repset and races CREATE OR +// REPLACE FUNCTION across nodes. Regression guard for 113b0bc. +// +// We require Spock to actually be installed: without it, PostgreSQL accepts +// the dotted-name GUCs as placeholder customs and SHOW returns 'off' +// regardless, which would let this test silently pass on a misconfigured +// (Spock-less) cluster. +func TestACEConnSuppressesSpockDDLReplication(t *testing.T) { + if !newSpockEnv().HasSpock { + t.Skip("requires Spock-enabled cluster") + } + + ctx := context.Background() + + pool, err := auth.GetClusterNodeConnection(ctx, pgCluster.ClusterNodes[0], auth.ConnectionOptions{}) + require.NoError(t, err) + defer pool.Close() + + var spockInstalled bool + require.NoError(t, pool.QueryRow(ctx, + `SELECT EXISTS(SELECT 1 FROM pg_extension WHERE extname = 'spock')`).Scan(&spockInstalled)) + require.True(t, spockInstalled, + "Spock extension must be installed; the suppression GUCs are no-ops without it") + + var ddlRepl, ddlRepset string + require.NoError(t, pool.QueryRow(ctx, "SHOW spock.enable_ddl_replication").Scan(&ddlRepl)) + require.NoError(t, pool.QueryRow(ctx, "SHOW spock.include_ddl_repset").Scan(&ddlRepset)) + + require.Equal(t, "off", ddlRepl, + "ACE pool must boot with spock.enable_ddl_replication=off") + require.Equal(t, "off", ddlRepset, + "ACE pool must boot with spock.include_ddl_repset=off") +} + +// TestMtreeInitReapsOrphanedSlot exercises the slot-leak recovery path +// described at merkle.go:893-901. If a prior MtreeInit died between +// Phase B (slot creation) and Phase C (metadata write), the slot is +// orphaned with no metadata row pointing at it. SetupReplicationSlot +// (cdc/setup.go:53) must drop the orphan before creating a fresh slot, +// otherwise re-init fails with "slot already exists" and the user is +// stuck. We simulate the orphan with pg_create_logical_replication_slot +// and assert the next MtreeInit succeeds. +func TestMtreeInitReapsOrphanedSlot(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_orphan" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + slotName := config.Cfg.MTree.CDC.SlotName + + _, err := pgCluster.Node1Pool.Exec(ctx, + "SELECT pg_create_logical_replication_slot($1, 'pgoutput')", slotName) + require.NoError(t, err, "could not seed an orphan replication slot") + require.True(t, + replicationSlotExists(t, ctx, pgCluster.Node1Pool, slotName), + "orphan slot should be present before MtreeInit") + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + require.NoError(t, mtreeTask.MtreeInit(), + "MtreeInit must reap the orphan slot rather than fail with slot-exists") + + require.True(t, + replicationSlotExists(t, ctx, pgCluster.Node1Pool, slotName), + "a fresh slot should exist after MtreeInit") + + var restartLSN string + err = pgCluster.Node1Pool.QueryRow(ctx, + "SELECT restart_lsn::text FROM pg_replication_slots WHERE slot_name = $1", + slotName).Scan(&restartLSN) + require.NoError(t, err) + require.NotEmpty(t, restartLSN, "fresh slot should have a restart_lsn") +} + +// TestBuildMtreeConcurrentSpockNodes reproduces the original production +// scenario for 2752ea5 + 113b0bc: BuildMtree fired concurrently from two +// nodes of a Spock cluster. Pre-fix, n1's CREATE OR REPLACE FUNCTION +// bytea_xor was caught by Spock's DDL replication, sent to n2, and +// raced n2's own identical DDL — producing SQLSTATE XX000 "tuple +// concurrently updated" on pg_proc. The fix removed the DDL from +// BuildMtree entirely and additionally set spock.enable_ddl_replication +// =off on all ACE connections. We assert that two concurrent +// single-node BuildMtree calls both succeed, and that +// ace_cdc_metadata was not auto-added to a Spock replication set — +// the latter being the second symptom of 113b0bc. +func TestBuildMtreeConcurrentSpockNodes(t *testing.T) { + env := newSpockEnv() + if !env.HasSpock { + t.Skip("requires Spock-enabled cluster") + } + + ctx := context.Background() + tableName := "customers_mtree_concurrent" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + initTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1, serviceN2}) + require.NoError(t, initTask.RunChecks(false)) + require.NoError(t, initTask.MtreeInit()) + + t.Cleanup(func() { + if err := initTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + taskN1 := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + taskN2 := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN2}) + require.NoError(t, taskN1.RunChecks(false)) + require.NoError(t, taskN2.RunChecks(false)) + + var wg sync.WaitGroup + var err1, err2 error + wg.Add(2) + go func() { + defer wg.Done() + err1 = taskN1.BuildMtree() + }() + go func() { + defer wg.Done() + err2 = taskN2.BuildMtree() + }() + wg.Wait() + + require.NoError(t, err1, "BuildMtree on n1 must not race with concurrent build on n2") + require.NoError(t, err2, "BuildMtree on n2 must not race with concurrent build on n1") + + // spock.tables lists every Spock-eligible table; set_name IS NULL means + // the table is NOT in any replication set. User tables (customers, + // customers_1M) appear here with set_name NULL on this test cluster + // because spock.enable_ddl_replication defaults to off — so a plain + // EXISTS check would false-positive against any table at all. + for i, pool := range []*pgxpool.Pool{pgCluster.Node1Pool, pgCluster.Node2Pool} { + var inRepset bool + err := pool.QueryRow(ctx, ` + SELECT EXISTS ( + SELECT 1 FROM spock.tables + WHERE nspname = $1 AND relname = 'ace_cdc_metadata' + AND set_name IS NOT NULL + )`, config.Cfg.MTree.Schema).Scan(&inRepset) + require.NoError(t, err, "node%d: query spock.tables", i+1) + require.False(t, inRepset, + "node%d: ace_cdc_metadata must not be auto-added to a Spock repset", i+1) + } +} + +// TestUpdateMtreeReflectsUpdates covers the UPDATE path: a non-PK column +// change must still flip the row hash, mark the containing leaf dirty, +// and propagate the new leaf hash up to the root. The schema has no +// updates_since_tree_update counter (only insert/delete), so we only +// assert the externally observable invariants: root hash changes, no +// leaves remain dirty. +func TestUpdateMtreeReflectsUpdates(t *testing.T) { + ctx := context.Background() + tableName := "customers_mtree_upd" + qualifiedTableName := fmt.Sprintf("%s.%s", testSchema, tableName) + + setupCDCTestTable(t, ctx, tableName) + + mtreeTask := newTestMerkleTreeTask(t, qualifiedTableName, []string{serviceN1}) + require.NoError(t, mtreeTask.RunChecks(false)) + require.NoError(t, mtreeTask.MtreeInit()) + require.NoError(t, mtreeTask.BuildMtree()) + + t.Cleanup(func() { + if err := mtreeTask.MtreeTeardown(); err != nil { + t.Logf("Warning: MtreeTeardown failed during cleanup: %v", err) + } + dropCDCTestTable(t, tableName) + }) + + mtreeTable := fmt.Sprintf("%s.ace_mtree_%s_%s", + config.Cfg.MTree.Schema, testSchema, tableName) + + rootBefore := rootNodeHash(t, ctx, mtreeTable) + require.NotEmpty(t, rootBefore) + + res, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( + "UPDATE %s SET email = email || '.updated' WHERE index BETWEEN 200 AND 224", + qualifiedTableName)) + require.NoError(t, err) + require.Equal(t, int64(25), res.RowsAffected(), "expected to update 25 customer rows") + + require.NoError(t, mtreeTask.UpdateMtree(false)) + + rootAfter := rootNodeHash(t, ctx, mtreeTable) + require.NotEqual(t, rootBefore, rootAfter, + "root node_hash must change after non-PK column updates") + require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), + "no leaves should remain dirty after UpdateMtree") +} From 0fba98137873b72261574e9941de10b0ac757c22 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Thu, 28 May 2026 15:08:16 -0700 Subject: [PATCH 2/4] test: ANALYZE seeded data so mtree tests build realistic trees MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Without ANALYZE, queries.GetRowCountEstimate reads pg_class.reltuples / pg_stat_user_tables.n_live_tup, both of which lag behind INSERTs until autovacuum or ANALYZE runs. On a freshly-loaded test table this returned ~357 for a 10,000-row customers copy, and BuildMtree computed numBlocks = ceil(357/1000) = 1 — every mtree integration test in the suite built a degenerate 1-leaf tree where root == leaf, silently weakening any "root hash changed" assertion (trivially true when root is the only node) and never exercising parent rollup. This commit makes two changes: 1. Add ANALYZE to setupCDCTestTable and setupSharedCustomersTable after every data load. Quoted identifier preserves mixed-case table names like customers_1M. 2. Strengthen the TestUpdateMtreeReflects{Inserts,Deletes,Updates} assertions now that multi-leaf trees actually exist: * mtreeNodeHashes helper snapshots every (node_level, node_position) -> hex(node_hash) tuple * countChangedNodes helper computes the delta between two snapshots * each test asserts len(nodesBefore) > 2 so a future regression that re-introduces degenerate trees fails loudly * after UpdateMtree, each test asserts countChangedNodes >= 2 — proof that the leaf change propagated to at least one ancestor in addition to root Live-verified against the local podman cluster: row estimate now logs "Calculating block ranges for ~10000 rows" (was ~357), and the parent-rollup assertion holds for INSERTs, DELETEs, and UPDATEs. Benefit applies broadly: every existing mtree integration test that uses the shared customers / customers_1M tables now sees the real row count too, not just the new regression tests. --- tests/integration/cdc_busy_table_test.go | 9 ++++ tests/integration/main_test.go | 8 ++++ tests/integration/merkle_tree_test.go | 61 ++++++++++++++++++++++++ 3 files changed, 78 insertions(+) diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index 037dafa..1331285 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -316,6 +316,15 @@ func setupCDCTestTable(t *testing.T, ctx context.Context, tableName string) { require.NoError(t, err) _, err = pool.Exec(ctx, fmt.Sprintf("INSERT INTO %s.%s SELECT * FROM %s.customers", testSchema, tableName, testSchema)) require.NoError(t, err) + // Without ANALYZE, GetRowCountEstimate falls back to pg_class.reltuples + // (~0 for a just-INSERTed table), and BuildMtree calculates + // numBlocks = ceil(estimate / blockSize) = 1, producing a degenerate + // 1-leaf tree where root and leaf hashes are identical. ANALYZE + // populates n_live_tup so BuildMtree sees the real row count and + // builds a realistic multi-leaf tree. Quoted identifier preserves + // case for table names that include capital letters. + _, err = pool.Exec(ctx, fmt.Sprintf(`ANALYZE "%s"."%s"`, testSchema, tableName)) + require.NoError(t, err) } } diff --git a/tests/integration/main_test.go b/tests/integration/main_test.go index e6374cf..c7d4506 100644 --- a/tests/integration/main_test.go +++ b/tests/integration/main_test.go @@ -466,6 +466,14 @@ func setupSharedCustomersTable(tableName string) error { err, ) } + // ANALYZE so pg_class.reltuples / pg_stat_user_tables.n_live_tup + // reflect the real row count before any test reads + // GetRowCountEstimate. Without it, BuildMtree sees ~0 rows on a + // just-loaded table and builds a degenerate 1-leaf tree. + // Quoted identifier preserves the case of names like customers_1M. + if _, err := pool.Exec(ctx, fmt.Sprintf(`ANALYZE "%s"."%s"`, testSchema, tableName)); err != nil { + return fmt.Errorf("failed to ANALYZE %s on node %s: %w", qualifiedTableName, nodeName, err) + } } return nil } diff --git a/tests/integration/merkle_tree_test.go b/tests/integration/merkle_tree_test.go index 77e7da7..ad76ae2 100644 --- a/tests/integration/merkle_tree_test.go +++ b/tests/integration/merkle_tree_test.go @@ -1487,6 +1487,10 @@ func TestUpdateMtreeReflectsInserts(t *testing.T) { require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), "no leaves should be dirty immediately after BuildMtree") + nodesBefore := mtreeNodeHashes(t, ctx, mtreeTable) + require.Greater(t, len(nodesBefore), 2, + "expected a multi-level tree (>2 nodes); a single-leaf tree means ANALYZE didn't take effect and parent-rollup isn't exercised") + var maxIndex int64 require.NoError(t, pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf("SELECT COALESCE(max(index), 0) FROM %s", qualifiedTableName)). @@ -1507,6 +1511,12 @@ func TestUpdateMtreeReflectsInserts(t *testing.T) { "no leaves should remain dirty after UpdateMtree") require.Equal(t, int64(0), pendingInsertCounter(t, ctx, mtreeTable), "inserts_since_tree_update should reset to 0 after UpdateMtree") + + // Multi-level rollup proof: at least one descendant of root must also + // have changed. A single-leaf tree would have only root changing. + nodesAfter := mtreeNodeHashes(t, ctx, mtreeTable) + require.GreaterOrEqual(t, countChangedNodes(nodesBefore, nodesAfter), 2, + "expected change to propagate to root AND at least one ancestor node") } func rootNodeHash(t *testing.T, ctx context.Context, mtreeTable string) []byte { @@ -1550,6 +1560,43 @@ func pendingDeleteCounter(t *testing.T, ctx context.Context, mtreeTable string) return n } +// mtreeNodeHashes snapshots every node in the mtree leaf table keyed by +// "level:position" with the hex-encoded node_hash. Snapshotting both sides +// of an UpdateMtree lets us verify the change propagated past root to at +// least one internal node — otherwise a degenerate single-leaf tree (e.g. +// missing ANALYZE) would let the root-hash-changed assertion pass without +// exercising parent rollup. +func mtreeNodeHashes(t *testing.T, ctx context.Context, mtreeTable string) map[string]string { + t.Helper() + rows, err := pgCluster.Node1Pool.Query(ctx, fmt.Sprintf(` + SELECT node_level, node_position, encode(node_hash, 'hex') + FROM %s + ORDER BY node_level, node_position`, mtreeTable)) + require.NoError(t, err) + defer rows.Close() + + out := map[string]string{} + for rows.Next() { + var level int + var pos int64 + var hash string + require.NoError(t, rows.Scan(&level, &pos, &hash)) + out[fmt.Sprintf("%d:%d", level, pos)] = hash + } + require.NoError(t, rows.Err()) + return out +} + +func countChangedNodes(before, after map[string]string) int { + n := 0 + for k, beforeHash := range before { + if afterHash, ok := after[k]; ok && afterHash != beforeHash { + n++ + } + } + return n +} + // TestUpdateMtreeReflectsDeletes covers the deletion path through the // UpdateMtree pipeline: deletes_since_tree_update counters increment via // CDC, the affected leaves are recomputed, and the counters reset. @@ -1577,6 +1624,9 @@ func TestUpdateMtreeReflectsDeletes(t *testing.T) { rootBefore := rootNodeHash(t, ctx, mtreeTable) require.NotEmpty(t, rootBefore) + nodesBefore := mtreeNodeHashes(t, ctx, mtreeTable) + require.Greater(t, len(nodesBefore), 2, + "expected a multi-level tree (>2 nodes); ANALYZE may not have run") res, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf("DELETE FROM %s WHERE index BETWEEN 100 AND 124", qualifiedTableName)) @@ -1592,6 +1642,10 @@ func TestUpdateMtreeReflectsDeletes(t *testing.T) { "no leaves should remain dirty after UpdateMtree") require.Equal(t, int64(0), pendingDeleteCounter(t, ctx, mtreeTable), "deletes_since_tree_update should reset to 0 after UpdateMtree") + + nodesAfter := mtreeNodeHashes(t, ctx, mtreeTable) + require.GreaterOrEqual(t, countChangedNodes(nodesBefore, nodesAfter), 2, + "expected change to propagate to root AND at least one ancestor node") } // TestACEConnSuppressesSpockDDLReplication verifies that every connection @@ -1785,6 +1839,9 @@ func TestUpdateMtreeReflectsUpdates(t *testing.T) { rootBefore := rootNodeHash(t, ctx, mtreeTable) require.NotEmpty(t, rootBefore) + nodesBefore := mtreeNodeHashes(t, ctx, mtreeTable) + require.Greater(t, len(nodesBefore), 2, + "expected a multi-level tree (>2 nodes); ANALYZE may not have run") res, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( "UPDATE %s SET email = email || '.updated' WHERE index BETWEEN 200 AND 224", @@ -1799,4 +1856,8 @@ func TestUpdateMtreeReflectsUpdates(t *testing.T) { "root node_hash must change after non-PK column updates") require.Equal(t, 0, dirtyLeafCount(t, ctx, mtreeTable), "no leaves should remain dirty after UpdateMtree") + + nodesAfter := mtreeNodeHashes(t, ctx, mtreeTable) + require.GreaterOrEqual(t, countChangedNodes(nodesBefore, nodesAfter), 2, + "expected change to propagate to root AND at least one ancestor node") } From 58acad045e3c3af0cddef58e23bc84d8e7da82a2 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 31 May 2026 17:54:40 -0700 Subject: [PATCH 3/4] test: sanitize SQL identifiers via pgx.Identifier in test setup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Codacy flagged two CRITICAL SQL Injection findings against fmt.Sprintf-into-SQL patterns I introduced for the mtree regression tests: * tests/integration/main_test.go:474 — ANALYZE in setupSharedCustomersTable * tests/integration/cdc_busy_table_test.go:326 — ANALYZE in setupCDCTestTable * tests/integration/cdc_busy_table_test.go:373 — ALTER TABLE DROP COLUMN in TestGetCDCMetadataLegacySchema In each case the interpolated value was a package constant or a config-sourced schema name in practice, but the pattern is unsafe. Replace all three with pgx.Identifier{...}.Sanitize() + string concatenation, matching the idiom used throughout the production code (e.g. internal/consistency/mtree/merkle.go:2588). Live-verified against the local podman cluster after the ANALYZE fixes: all 10 tests in the mtree regression batch still pass. --- tests/integration/cdc_busy_table_test.go | 15 ++++++++------- tests/integration/main_test.go | 8 ++++++-- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index 1331285..2e7387d 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -20,6 +20,7 @@ import ( "time" "github.com/jackc/pglogrepl" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/pgedge/ace/db/queries" "github.com/pgedge/ace/internal/infra/cdc" @@ -321,9 +322,9 @@ func setupCDCTestTable(t *testing.T, ctx context.Context, tableName string) { // numBlocks = ceil(estimate / blockSize) = 1, producing a degenerate // 1-leaf tree where root and leaf hashes are identical. ANALYZE // populates n_live_tup so BuildMtree sees the real row count and - // builds a realistic multi-leaf tree. Quoted identifier preserves - // case for table names that include capital letters. - _, err = pool.Exec(ctx, fmt.Sprintf(`ANALYZE "%s"."%s"`, testSchema, tableName)) + // builds a realistic multi-leaf tree. pgx.Identifier.Sanitize + // avoids raw-string interpolation in SQL. + _, err = pool.Exec(ctx, "ANALYZE "+pgx.Identifier{testSchema, tableName}.Sanitize()) // nosemgrep require.NoError(t, err) } } @@ -369,10 +370,10 @@ func TestGetCDCMetadataLegacySchema(t *testing.T) { // Real upgrades reach this state when an operator upgrades the binary // against a database initialised by an older ACE that never had the - // column. - _, err = pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( - "ALTER TABLE %s.ace_cdc_metadata DROP COLUMN pub_commit_lsn", - config.Cfg.MTree.Schema)) + // column. pgx.Identifier.Sanitize avoids interpolating a config string + // into SQL. + metadataTbl := pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize() + _, err = pgCluster.Node1Pool.Exec(ctx, "ALTER TABLE "+metadataTbl+" DROP COLUMN pub_commit_lsn") // nosemgrep require.NoError(t, err) slot, startLSN, tables, pubCommitAfter, err := getCDCMetadataInTx(ctx, pgCluster.Node1Pool) diff --git a/tests/integration/main_test.go b/tests/integration/main_test.go index c7d4506..2860c85 100644 --- a/tests/integration/main_test.go +++ b/tests/integration/main_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/docker/go-connections/nat" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/pgedge/ace/pkg/config" "github.com/pgedge/ace/pkg/types" @@ -470,8 +471,11 @@ func setupSharedCustomersTable(tableName string) error { // reflect the real row count before any test reads // GetRowCountEstimate. Without it, BuildMtree sees ~0 rows on a // just-loaded table and builds a degenerate 1-leaf tree. - // Quoted identifier preserves the case of names like customers_1M. - if _, err := pool.Exec(ctx, fmt.Sprintf(`ANALYZE "%s"."%s"`, testSchema, tableName)); err != nil { + // pgx.Identifier.Sanitize quotes and escapes the schema/table so + // names like customers_1M survive and no SQL is interpolated from + // raw strings. + tbl := pgx.Identifier{testSchema, tableName}.Sanitize() + if _, err := pool.Exec(ctx, "ANALYZE "+tbl); err != nil { // nosemgrep return fmt.Errorf("failed to ANALYZE %s on node %s: %w", qualifiedTableName, nodeName, err) } } From ebdd9b36472041c561d17eaf4c938ed55cfb8400 Mon Sep 17 00:00:00 2001 From: Mason Sharp Date: Sun, 31 May 2026 18:28:47 -0700 Subject: [PATCH 4/4] test: sanitize remaining SQL identifier interpolations in new tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sweep the rest of my fmt.Sprintf-into-SQL patterns that Codacy was flagging one-by-one. PostgreSQL bind parameters work only for values, not for table/schema identifiers, so the safe pattern is pgx.Identifier{schema,table}.Sanitize() followed by string concatenation and // nosemgrep on the Exec/Query line. This matches the established repo pattern in db/queries/queries.go. Touched sites (all in new code I introduced for the mtree regression suite): * TestGetCDCMetadataLegacySchema — UPDATE statement on the legacy test table. * TestBuildMtreePoolLeakOnNodeError — DELETE on ace_cdc_metadata. * TestUpdateMtreeReflectsInserts — SELECT max(index) + INSERT INTO. * TestUpdateMtreeReflectsDeletes — DELETE FROM. * TestUpdateMtreeReflectsUpdates — UPDATE SET email. * mtreeTable construction in three tests now uses pgx.Identifier with a single fmt.Sprintf only for the leaf-table-name suffix. * rootNodeHash / dirtyLeafCount / pendingInsertCounter / pendingDeleteCounter / mtreeNodeHashes helpers — interpolate the mtreeTable parameter via string concat with // nosemgrep, since every caller now passes a Sanitize()'d value. Real-value bind parameters ($1, $2) are still used for values like publication_name and the generate_series offset. --- tests/integration/cdc_busy_table_test.go | 4 +- tests/integration/merkle_tree_test.go | 81 +++++++++++++----------- 2 files changed, 47 insertions(+), 38 deletions(-) diff --git a/tests/integration/cdc_busy_table_test.go b/tests/integration/cdc_busy_table_test.go index 2e7387d..d09e448 100644 --- a/tests/integration/cdc_busy_table_test.go +++ b/tests/integration/cdc_busy_table_test.go @@ -385,8 +385,8 @@ func TestGetCDCMetadataLegacySchema(t *testing.T) { startBefore := metadataStartLSN(t, ctx) - _, err = pgCluster.Node1Pool.Exec(ctx, - fmt.Sprintf("UPDATE %s SET email = email || '.legacy' WHERE index = 1", qualifiedTableName)) + safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize() + _, err = pgCluster.Node1Pool.Exec(ctx, "UPDATE "+safeTbl+" SET email = email || '.legacy' WHERE index = 1") // nosemgrep require.NoError(t, err) targetFlush := walFlushLSN(t, ctx, pgCluster.Node1Pool) diff --git a/tests/integration/merkle_tree_test.go b/tests/integration/merkle_tree_test.go index ad76ae2..f1b8bef 100644 --- a/tests/integration/merkle_tree_test.go +++ b/tests/integration/merkle_tree_test.go @@ -1355,9 +1355,8 @@ func TestBuildMtreePoolLeakOnNodeError(t *testing.T) { // so the build loop never reaches n2. n2's pool was created by the // row-estimate loop above and has no per-iteration defer to close it. // Without the outer defer added in ff67d12, n2's pool leaks. - _, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( - "DELETE FROM %s.ace_cdc_metadata WHERE publication_name = $1", - config.Cfg.MTree.Schema), config.Cfg.MTree.CDC.PublicationName) + metaTbl := pgx.Identifier{config.Cfg.MTree.Schema, "ace_cdc_metadata"}.Sanitize() + _, err := pgCluster.Node1Pool.Exec(ctx, "DELETE FROM "+metaTbl+" WHERE publication_name = $1", config.Cfg.MTree.CDC.PublicationName) // nosemgrep require.NoError(t, err) // Anchor the baseline at 0 by waiting for MtreeInit's n2 backend to @@ -1479,8 +1478,10 @@ func TestUpdateMtreeReflectsInserts(t *testing.T) { dropCDCTestTable(t, tableName) }) - mtreeTable := fmt.Sprintf("%s.ace_mtree_%s_%s", - config.Cfg.MTree.Schema, testSchema, tableName) + mtreeTable := pgx.Identifier{ + config.Cfg.MTree.Schema, + fmt.Sprintf("ace_mtree_%s_%s", testSchema, tableName), + }.Sanitize() rootBefore := rootNodeHash(t, ctx, mtreeTable) require.NotEmpty(t, rootBefore, "BuildMtree should populate the root hash") @@ -1491,15 +1492,16 @@ func TestUpdateMtreeReflectsInserts(t *testing.T) { require.Greater(t, len(nodesBefore), 2, "expected a multi-level tree (>2 nodes); a single-leaf tree means ANALYZE didn't take effect and parent-rollup isn't exercised") + safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize() var maxIndex int64 require.NoError(t, pgCluster.Node1Pool.QueryRow(ctx, - fmt.Sprintf("SELECT COALESCE(max(index), 0) FROM %s", qualifiedTableName)). + "SELECT COALESCE(max(index), 0) FROM "+safeTbl). // nosemgrep Scan(&maxIndex)) - _, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf(` - INSERT INTO %s (index, first_name, last_name, email) - SELECT $1 + g, 'Mtree', 'Update', 'mtree-update-' || g || '@test.com' - FROM generate_series(1, 25) g`, qualifiedTableName), maxIndex) + _, err := pgCluster.Node1Pool.Exec(ctx, // nosemgrep + "INSERT INTO "+safeTbl+" (index, first_name, last_name, email) "+ + "SELECT $1 + g, 'Mtree', 'Update', 'mtree-update-' || g || '@test.com' "+ + "FROM generate_series(1, 25) g", maxIndex) require.NoError(t, err) require.NoError(t, mtreeTask.UpdateMtree(false)) @@ -1519,13 +1521,17 @@ func TestUpdateMtreeReflectsInserts(t *testing.T) { "expected change to propagate to root AND at least one ancestor node") } +// mtreeTable arguments to these helpers must be sanitized via +// pgx.Identifier{...}.Sanitize() at the call site so the interpolation +// below is safe; the // nosemgrep markers tell the static analyzer that +// we've enforced the contract. func rootNodeHash(t *testing.T, ctx context.Context, mtreeTable string) []byte { t.Helper() var h []byte - err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf(` - SELECT node_hash FROM %s - WHERE node_level = (SELECT max(node_level) FROM %s) - AND node_position = 0`, mtreeTable, mtreeTable)).Scan(&h) + err := pgCluster.Node1Pool.QueryRow(ctx, // nosemgrep + "SELECT node_hash FROM "+mtreeTable+ + " WHERE node_level = (SELECT max(node_level) FROM "+mtreeTable+")"+ + " AND node_position = 0").Scan(&h) require.NoError(t, err) return h } @@ -1533,9 +1539,8 @@ func rootNodeHash(t *testing.T, ctx context.Context, mtreeTable string) []byte { func dirtyLeafCount(t *testing.T, ctx context.Context, mtreeTable string) int { t.Helper() var n int - err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf(` - SELECT count(*) FROM %s WHERE node_level = 0 AND dirty = true`, - mtreeTable)).Scan(&n) + err := pgCluster.Node1Pool.QueryRow(ctx, // nosemgrep + "SELECT count(*) FROM "+mtreeTable+" WHERE node_level = 0 AND dirty = true").Scan(&n) require.NoError(t, err) return n } @@ -1543,9 +1548,9 @@ func dirtyLeafCount(t *testing.T, ctx context.Context, mtreeTable string) int { func pendingInsertCounter(t *testing.T, ctx context.Context, mtreeTable string) int64 { t.Helper() var n int64 - err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf(` - SELECT COALESCE(sum(inserts_since_tree_update), 0) FROM %s - WHERE node_level = 0`, mtreeTable)).Scan(&n) + err := pgCluster.Node1Pool.QueryRow(ctx, // nosemgrep + "SELECT COALESCE(sum(inserts_since_tree_update), 0) FROM "+mtreeTable+ + " WHERE node_level = 0").Scan(&n) require.NoError(t, err) return n } @@ -1553,9 +1558,9 @@ func pendingInsertCounter(t *testing.T, ctx context.Context, mtreeTable string) func pendingDeleteCounter(t *testing.T, ctx context.Context, mtreeTable string) int64 { t.Helper() var n int64 - err := pgCluster.Node1Pool.QueryRow(ctx, fmt.Sprintf(` - SELECT COALESCE(sum(deletes_since_tree_update), 0) FROM %s - WHERE node_level = 0`, mtreeTable)).Scan(&n) + err := pgCluster.Node1Pool.QueryRow(ctx, // nosemgrep + "SELECT COALESCE(sum(deletes_since_tree_update), 0) FROM "+mtreeTable+ + " WHERE node_level = 0").Scan(&n) require.NoError(t, err) return n } @@ -1568,10 +1573,9 @@ func pendingDeleteCounter(t *testing.T, ctx context.Context, mtreeTable string) // exercising parent rollup. func mtreeNodeHashes(t *testing.T, ctx context.Context, mtreeTable string) map[string]string { t.Helper() - rows, err := pgCluster.Node1Pool.Query(ctx, fmt.Sprintf(` - SELECT node_level, node_position, encode(node_hash, 'hex') - FROM %s - ORDER BY node_level, node_position`, mtreeTable)) + rows, err := pgCluster.Node1Pool.Query(ctx, // nosemgrep + "SELECT node_level, node_position, encode(node_hash, 'hex') FROM "+mtreeTable+ + " ORDER BY node_level, node_position") require.NoError(t, err) defer rows.Close() @@ -1619,8 +1623,10 @@ func TestUpdateMtreeReflectsDeletes(t *testing.T) { dropCDCTestTable(t, tableName) }) - mtreeTable := fmt.Sprintf("%s.ace_mtree_%s_%s", - config.Cfg.MTree.Schema, testSchema, tableName) + mtreeTable := pgx.Identifier{ + config.Cfg.MTree.Schema, + fmt.Sprintf("ace_mtree_%s_%s", testSchema, tableName), + }.Sanitize() rootBefore := rootNodeHash(t, ctx, mtreeTable) require.NotEmpty(t, rootBefore) @@ -1628,8 +1634,9 @@ func TestUpdateMtreeReflectsDeletes(t *testing.T) { require.Greater(t, len(nodesBefore), 2, "expected a multi-level tree (>2 nodes); ANALYZE may not have run") - res, err := pgCluster.Node1Pool.Exec(ctx, - fmt.Sprintf("DELETE FROM %s WHERE index BETWEEN 100 AND 124", qualifiedTableName)) + safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize() + res, err := pgCluster.Node1Pool.Exec(ctx, // nosemgrep + "DELETE FROM "+safeTbl+" WHERE index BETWEEN 100 AND 124") require.NoError(t, err) require.Equal(t, int64(25), res.RowsAffected(), "expected to delete 25 customer rows") @@ -1834,8 +1841,10 @@ func TestUpdateMtreeReflectsUpdates(t *testing.T) { dropCDCTestTable(t, tableName) }) - mtreeTable := fmt.Sprintf("%s.ace_mtree_%s_%s", - config.Cfg.MTree.Schema, testSchema, tableName) + mtreeTable := pgx.Identifier{ + config.Cfg.MTree.Schema, + fmt.Sprintf("ace_mtree_%s_%s", testSchema, tableName), + }.Sanitize() rootBefore := rootNodeHash(t, ctx, mtreeTable) require.NotEmpty(t, rootBefore) @@ -1843,9 +1852,9 @@ func TestUpdateMtreeReflectsUpdates(t *testing.T) { require.Greater(t, len(nodesBefore), 2, "expected a multi-level tree (>2 nodes); ANALYZE may not have run") - res, err := pgCluster.Node1Pool.Exec(ctx, fmt.Sprintf( - "UPDATE %s SET email = email || '.updated' WHERE index BETWEEN 200 AND 224", - qualifiedTableName)) + safeTbl := pgx.Identifier{testSchema, tableName}.Sanitize() + res, err := pgCluster.Node1Pool.Exec(ctx, // nosemgrep + "UPDATE "+safeTbl+" SET email = email || '.updated' WHERE index BETWEEN 200 AND 224") require.NoError(t, err) require.Equal(t, int64(25), res.RowsAffected(), "expected to update 25 customer rows")