Skip to content
49 changes: 42 additions & 7 deletions db/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2913,18 +2913,53 @@ func DropCDCMetadataTable(ctx context.Context, db DBQuerier) error {
return nil
}

func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (string, string, []string, error) {
// pubCommitLSN is empty for legacy metadata rows that pre-date the
// pub_commit_lsn column; callers must treat empty as "invariant
// uncheckable" and skip the publication-commit guard with a warning.
func GetCDCMetadata(ctx context.Context, db DBQuerier, publicationName string) (slotName, startLSN string, tables []string, pubCommitLSN string, err error) {
sql, err := RenderSQL(SQLTemplates.GetCDCMetadata, nil)
if err != nil {
return "", "", nil, err
return "", "", nil, "", err
}
var slotName, startLSN string
var tables []string
err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables)
err = db.QueryRow(ctx, sql, publicationName).Scan(&slotName, &startLSN, &tables, &pubCommitLSN)
if err != nil {
return "", "", nil, "", err
}
return slotName, startLSN, tables, pubCommitLSN, nil
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// Init path only. Ongoing flushes use UpdateCDCMetadata, which deliberately
// leaves pub_commit_lsn untouched so the listen.go guard always compares
// against the LSN captured at the matching init.
func InitCDCMetadata(ctx context.Context, db DBQuerier, publicationName, slotName, startLSN, pubCommitLSN string, tables []string) error {
sql, err := RenderSQL(SQLTemplates.InitCDCMetadata, nil)
if err != nil {
return fmt.Errorf("failed to render InitCDCMetadata SQL: %w", err)
}
if tables == nil {
tables = []string{}
}
_, err = db.Exec(ctx, sql, publicationName, slotName, startLSN, pubCommitLSN, tables)
if err != nil {
return "", "", nil, err
return fmt.Errorf("query to init cdc metadata failed: %w", err)
}
return slotName, startLSN, tables, nil
return nil
}

// Called mid-Phase-A, after CREATE PUBLICATION, so the captured value is
// strictly less than Phase A's commit LSN. The slot created in Phase B
// has consistent_point >= that commit LSN, hence consistent_point >
// captured value: a safe lower bound for any valid replication start LSN.
func CurrentWalInsertLSN(ctx context.Context, db DBQuerier) (string, error) {
sql, err := RenderSQL(SQLTemplates.CurrentWalInsertLSN, nil)
if err != nil {
return "", fmt.Errorf("failed to render CurrentWalInsertLSN SQL: %w", err)
}
var lsn string
if err := db.QueryRow(ctx, sql).Scan(&lsn); err != nil {
return "", fmt.Errorf("failed to fetch current WAL insert LSN: %w", err)
}
return lsn, nil
}

func UpdateMtreeCounters(ctx context.Context, db DBQuerier, mtreeTable string, isComposite bool, compositeTypeName string, pkeyType string, inserts, deletes, updates []string) error {
Expand Down
71 changes: 65 additions & 6 deletions db/queries/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ type Templates struct {
ResetReplicationOriginSession *template.Template
SetupReplicationOriginXact *template.Template
ResetReplicationOriginXact *template.Template

InitCDCMetadata *template.Template
CurrentWalInsertLSN *template.Template
}

var SQLTemplates = Templates{
Expand Down Expand Up @@ -221,6 +224,39 @@ var SQLTemplates = Templates{
last_updated = EXCLUDED.last_updated
`)),

// On conflict every column including pub_commit_lsn is refreshed:
// reaching this query path means a re-init, which created a fresh
// publication with a new commit LSN, and listen.go's guard must
// compare against that current LSN — not a stale prior one.
InitCDCMetadata: template.Must(template.New("initCdcMetadata").Funcs(aceTemplateFuncs).Parse(`
INSERT INTO
{{aceSchema}}.ace_cdc_metadata (
publication_name,
slot_name,
start_lsn,
pub_commit_lsn,
tables,
last_updated
)
VALUES
(
$1,
$2,
$3,
$4,
$5,
current_timestamp
)
ON CONFLICT (publication_name) DO
UPDATE
SET
slot_name = EXCLUDED.slot_name,
start_lsn = EXCLUDED.start_lsn,
pub_commit_lsn = EXCLUDED.pub_commit_lsn,
tables = EXCLUDED.tables,
last_updated = EXCLUDED.last_updated
`)),

DropPublication: template.Must(template.New("dropPublication").Parse(`
DROP PUBLICATION IF EXISTS {{.PublicationName}}
`)),
Expand All @@ -240,15 +276,25 @@ var SQLTemplates = Templates{
DROP TABLE IF EXISTS {{aceSchema}}.ace_cdc_metadata
`)),

// pub_commit_lsn is extracted via to_jsonb(row) ->> 'pub_commit_lsn'
// instead of a direct column reference so the query parses and runs
// against pre-migration ace_cdc_metadata tables that lack the column.
// On legacy 3-column rows the JSON object has no pub_commit_lsn key,
// ->> returns NULL, and COALESCE produces the empty string the
// listen.go guard treats as "invariant uncheckable, warn and skip".
// The additive ALTER TABLE in CreateCDCMetadataTable still backfills
// the column on the next MtreeInit so post-init reads use the real
// column path.
GetCDCMetadata: template.Must(template.New("getCDCMetadata").Funcs(aceTemplateFuncs).Parse(`
SELECT
slot_name,
start_lsn,
tables
m.slot_name,
m.start_lsn,
m.tables,
COALESCE(to_jsonb(m) ->> 'pub_commit_lsn', '') AS pub_commit_lsn
FROM
{{aceSchema}}.ace_cdc_metadata
{{aceSchema}}.ace_cdc_metadata AS m
WHERE
publication_name = $1
m.publication_name = $1
`)),

UpdateMtreeCounters: template.Must(template.New("updateMtreeCounters").Parse(`
Expand Down Expand Up @@ -338,9 +384,13 @@ var SQLTemplates = Templates{
publication_name text PRIMARY KEY,
slot_name text,
start_lsn text,
pub_commit_lsn text,
tables text[],
last_updated timestamptz
)`),
);
-- Forward-compatible addition for clusters that ran older versions.
ALTER TABLE {{aceSchema}}.ace_cdc_metadata
ADD COLUMN IF NOT EXISTS pub_commit_lsn text;`),
),
GetPrimaryKey: template.Must(template.New("getPrimaryKey").Parse(`
SELECT
Expand Down Expand Up @@ -1599,4 +1649,13 @@ var SQLTemplates = Templates{
ResetReplicationOriginXact: template.Must(template.New("resetReplicationOriginXact").Parse(`
SELECT pg_replication_origin_xact_reset()
`)),

// CurrentWalInsertLSN returns pg_current_wal_insert_lsn() as text. Used
// at MtreeInit time, just after CREATE PUBLICATION, as the lower bound
// for any future replication start LSN. The slot's consistent point is
// guaranteed to be >= this value because the slot is created after this
// transaction commits.
CurrentWalInsertLSN: template.Must(template.New("currentWalInsertLSN").Parse(`
SELECT pg_current_wal_insert_lsn()::text
`)),
}
Loading
Loading