From 4a16399e8e406a4a567ef1fa2a52fce342f0a9be Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Wed, 3 Jun 2026 22:22:41 +0500 Subject: [PATCH 1/3] spock_apply: do not send local LSN as publisher feedback on sync-standby The synchronous-standby hold queue stored the subscriber's local XactLastCommitEnd in RemoteSyncPosition.recvpos and later sent it to the publisher as the slot's confirmed flush position. The two LSNs belong to different WAL streams, so the publisher wrote a meaningless value into confirmed_flush_lsn -- typically far ahead of its own pg_current_wal_lsn. After a CNPG failover the promoted standby resumed from that bogus LSN, the publisher waited until its own WAL caught up, then started streaming from there. Every commit between the real origin position and the bogus LSN was silently dropped. Split RemoteSyncPosition into local_commit_lsn (gating) and remote_recvpos / remote_writepos / remote_flushpos (feedback). append_feedback_position now takes both LSNs explicitly. get_feedback_position gates the release on local_commit_lsn <= WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH] and returns the remote positions, so the publisher sees values from its own WAL stream. --- src/spock_apply.c | 145 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 100 insertions(+), 45 deletions(-) diff --git a/src/spock_apply.c b/src/spock_apply.c index 25730c62..dbe4e959 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -233,15 +233,37 @@ static ActionErrCallbackArg errcallback_arg; TransactionId remote_xid; /* - * Structure of RemoteSyncPosition to Save the LSN in case of - * Synchronous replica is attached + * Per-transaction feedback hold for the synchronous-replica path. + * + * When a local synchronous standby is configured (synchronous_standby_names + * is set), we cannot report a remote commit LSN back to the publisher until + * the corresponding local commit has been flushed on that sync standby -- + * otherwise the publisher could advance its slot past changes that a + * fail-over of THIS node has not yet preserved. + * + * The two LSNs are not interchangeable: + * - local_commit_lsn is the subscriber's XactLastCommitEnd, used ONLY to + * compare against WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH] + * to decide when the hold can be released. + * - remote_recvpos / remote_writepos / remote_flushpos are positions in + * the publisher's WAL stream. These are what we send + * in the standby-status feedback message; the + * publisher writes flushpos into the slot's + * confirmed_flush_lsn. + * + * A prior version of this struct stored only "recvpos" and reused it for + * both purposes -- the local LSN ended up being sent to the publisher, + * which then poisoned the slot's confirmed_flush_lsn with a value from a + * different WAL space. After a failover, the new primary would resume from + * that bogus LSN and silently drop every transaction in the window. */ typedef struct RemoteSyncPosition { - dlist_node node; - XLogRecPtr recvpos; - XLogRecPtr flushpos; - XLogRecPtr writepos; + dlist_node node; + XLogRecPtr local_commit_lsn; /* gating: subscriber's XactLastCommitEnd */ + XLogRecPtr remote_recvpos; /* feedback: publisher write LSN */ + XLogRecPtr remote_flushpos; /* feedback: publisher flush LSN */ + XLogRecPtr remote_writepos; /* feedback: publisher apply LSN */ } RemoteSyncPosition; /* @@ -305,7 +327,8 @@ static void apply_replay_entry_free(ApplyReplayEntry *entry); static void apply_replay_queue_reset(void); static void maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send, TimestampTz *last_receive_timestamp); -static void append_feedback_position(XLogRecPtr recvpos); +static void append_feedback_position(XLogRecPtr local_commit_lsn, + XLogRecPtr remote_commit_lsn); static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos, XLogRecPtr *flushpos, XLogRecPtr *max_recvpos); @@ -1082,7 +1105,7 @@ handle_commit(StringInfo s) CommitTransactionCommand(); if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) - append_feedback_position(XactLastCommitEnd); + append_feedback_position(XactLastCommitEnd, end_lsn); remoteTransactionStopTimestamp = 0; @@ -2768,30 +2791,50 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush) } /* - * If a synchronous replica is attached, in code we set synchronous commit off. - * This can be dangerous because feedback might be sent before receiving - * acknowledgment from the remote synchronous replica. To handle this, - * a list of locally committed LSNs is maintained. Feedback is delayed - * until acknowledgment is received from the remote synchronous replica, - * thus avoiding blocking the transaction while ensuring data consistency. + * If a synchronous replica is attached, we cannot send feedback to the + * publisher the instant the apply transaction commits locally: the local + * commit is not yet durable until the sync standby has flushed it, and + * if we tell the publisher we have flushed the change it may advance its + * slot past changes that a fail-over of THIS node would lose. + * + * To bridge that gap without blocking apply (we run with synchronous_commit + * off internally), we record a hold entry per transaction with two + * independent positions: + * + * local_commit_lsn -- subscriber's XactLastCommitEnd; compared against + * WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH] in + * get_feedback_position() to know when the sync + * standby has the commit. + * remote_* -- the publisher-side positions we will eventually + * report (recv = write position in the publisher's + * WAL stream we have processed, flush/write are the + * equivalent positions returned by get_flush_position + * once the local commit is durable). + * + * Sending the local LSN as the remote position (the previous behaviour of + * this function) poisons the publisher's slot->confirmed_flush_lsn with a + * value from a different WAL space and silently drops every transaction + * after a fail-over until the publisher's own WAL grows past the bogus + * value. */ static void -append_feedback_position(XLogRecPtr recvpos) +append_feedback_position(XLogRecPtr local_commit_lsn, XLogRecPtr remote_commit_lsn) { - XLogRecPtr writepos; - XLogRecPtr flushpos; + XLogRecPtr remote_writepos; + XLogRecPtr remote_flushpos; RemoteSyncPosition *syncpos; MemoryContext oldctx; Assert(WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED); - if (get_flush_position(&writepos, &flushpos)) + if (get_flush_position(&remote_writepos, &remote_flushpos)) { /* - * No outstanding transactions to flush, we can report the latest - * received position. This is important for synchronous replication. + * Nothing else outstanding -- the position we just committed is the + * tip of what we have applied from the publisher. This must be the + * REMOTE LSN, not the local one. */ - flushpos = writepos = recvpos; + remote_flushpos = remote_writepos = remote_commit_lsn; } /* Ensure that we are allocating in the top memory context */ @@ -2799,24 +2842,30 @@ append_feedback_position(XLogRecPtr recvpos) syncpos = (RemoteSyncPosition *) palloc0(sizeof(RemoteSyncPosition)); MemoryContextSwitchTo(oldctx); - syncpos->recvpos = recvpos; - syncpos->writepos = writepos; - syncpos->flushpos = flushpos; + syncpos->local_commit_lsn = local_commit_lsn; + syncpos->remote_recvpos = remote_commit_lsn; + syncpos->remote_writepos = remote_writepos; + syncpos->remote_flushpos = remote_flushpos; dlist_push_tail(&sync_replica_lsn, &syncpos->node); - elog(DEBUG2, "SPOCK %s: appended feedback to list %X/%X, write %X/%X, flush %X/%X", + elog(DEBUG2, + "SPOCK %s: queued sync-hold local=%X/%X remote_recv=%X/%X remote_write=%X/%X remote_flush=%X/%X", MySubscription->name, - (uint32) (recvpos >> 32), (uint32) recvpos, - (uint32) (writepos >> 32), (uint32) writepos, - (uint32) (flushpos >> 32), (uint32) flushpos - ); + LSN_FORMAT_ARGS(local_commit_lsn), + LSN_FORMAT_ARGS(remote_commit_lsn), + LSN_FORMAT_ARGS(remote_writepos), + LSN_FORMAT_ARGS(remote_flushpos)); } /* - * As we have maintained a list of LSNs that are waiting for - * acknowledgment from the synchronous replica, we need to get the - * feedback position from the list and send it to the Spock node attached to it. - * This ensures that we only send feedback that is committed and acknowledged - * by the synchronous replica. + * Walk the queue of sync-replica holds populated by append_feedback_position() + * and release any whose LOCAL commit has been flushed on the local sync + * standby. For each released entry we overwrite the caller's *recvpos / + * *writepos / *flushpos with the entry's REMOTE positions -- those are + * what go into the standby-status reply to the publisher. Mixing the two + * LSN spaces (the previous behaviour) caused the publisher to write a + * subscriber-local LSN into the slot's confirmed_flush_lsn, which after a + * fail-over made the new primary resume from a non-existent publisher + * position and silently drop every transaction in the window. */ static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos, XLogRecPtr *flushpos, XLogRecPtr *max_recvpos) @@ -2838,24 +2887,30 @@ get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos, XLogRecPtr *flu if (syncpos == NULL) break; - if (syncpos->recvpos <= WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH]) + /* + * Gate on the LOCAL commit reaching the sync standby's flush LSN + * (WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH] is a LOCAL LSN). Once that + * holds, it is safe to tell the publisher about the corresponding + * REMOTE position. + */ + if (syncpos->local_commit_lsn <= WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH]) { - *recvpos = syncpos->recvpos; - *writepos = syncpos->writepos; - *flushpos = syncpos->flushpos; - elog(DEBUG2, "SPOCK %s: received feedback %X/%X, " - "write %X/%X, flush %X/%X", + *recvpos = syncpos->remote_recvpos; + *writepos = syncpos->remote_writepos; + *flushpos = syncpos->remote_flushpos; + elog(DEBUG2, + "SPOCK %s: releasing sync-hold local=%X/%X -> remote recv=%X/%X write=%X/%X flush=%X/%X", MySubscription->name, - (uint32) (*recvpos >> 32), (uint32) *recvpos, - (uint32) (*writepos >> 32), (uint32) *writepos, - (uint32) (*flushpos >> 32), (uint32) *flushpos - ); + LSN_FORMAT_ARGS(syncpos->local_commit_lsn), + LSN_FORMAT_ARGS(*recvpos), + LSN_FORMAT_ARGS(*writepos), + LSN_FORMAT_ARGS(*flushpos)); dlist_delete(iter1.cur); pfree(syncpos); syncpos = NULL; } if (syncpos != NULL) - *max_recvpos = syncpos->recvpos; + *max_recvpos = syncpos->remote_recvpos; } /* Release the lock */ LWLockRelease(SyncRepLock); From c5b82da36ec04794f5d1d97802d765273ec2b8c6 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Thu, 4 Jun 2026 15:26:37 +0500 Subject: [PATCH 2/3] Tighten sync-standby feedback comments and add regression test Drop the trailing paragraph and sentence that described the pre-fix behaviour in the comments before append_feedback_position() and get_feedback_position(); they belong in the commit message, not the source. Add tests/tap/t/020_sync_standby_feedback_lsn.pl, which attaches a synchronous physical standby to the subscriber and asserts the publisher's confirmed_flush_lsn never exceeds its own pg_current_wal_lsn. --- src/spock_apply.c | 12 +- tests/tap/t/020_sync_standby_feedback_lsn.pl | 260 +++++++++++++++++++ 2 files changed, 261 insertions(+), 11 deletions(-) create mode 100644 tests/tap/t/020_sync_standby_feedback_lsn.pl diff --git a/src/spock_apply.c b/src/spock_apply.c index dbe4e959..7e364c87 100644 --- a/src/spock_apply.c +++ b/src/spock_apply.c @@ -2810,12 +2810,6 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush) * WAL stream we have processed, flush/write are the * equivalent positions returned by get_flush_position * once the local commit is durable). - * - * Sending the local LSN as the remote position (the previous behaviour of - * this function) poisons the publisher's slot->confirmed_flush_lsn with a - * value from a different WAL space and silently drops every transaction - * after a fail-over until the publisher's own WAL grows past the bogus - * value. */ static void append_feedback_position(XLogRecPtr local_commit_lsn, XLogRecPtr remote_commit_lsn) @@ -2861,11 +2855,7 @@ append_feedback_position(XLogRecPtr local_commit_lsn, XLogRecPtr remote_commit_l * and release any whose LOCAL commit has been flushed on the local sync * standby. For each released entry we overwrite the caller's *recvpos / * *writepos / *flushpos with the entry's REMOTE positions -- those are - * what go into the standby-status reply to the publisher. Mixing the two - * LSN spaces (the previous behaviour) caused the publisher to write a - * subscriber-local LSN into the slot's confirmed_flush_lsn, which after a - * fail-over made the new primary resume from a non-existent publisher - * position and silently drop every transaction in the window. + * what go into the standby-status reply to the publisher. */ static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos, XLogRecPtr *flushpos, XLogRecPtr *max_recvpos) diff --git a/tests/tap/t/020_sync_standby_feedback_lsn.pl b/tests/tap/t/020_sync_standby_feedback_lsn.pl new file mode 100644 index 00000000..6d914b96 --- /dev/null +++ b/tests/tap/t/020_sync_standby_feedback_lsn.pl @@ -0,0 +1,260 @@ +use strict; +use warnings; +use Test::More; +use lib '.'; +use lib 't'; +use SpockTest qw( + create_cluster destroy_cluster + get_test_config system_or_bail command_ok system_maybe + psql_or_bail scalar_query +); +use Time::HiRes qw(time); + +# ============================================================================= +# Test: 020_sync_standby_feedback_lsn.pl +# +# Regression test for "spock_apply: do not send local LSN as publisher +# feedback on sync-standby". +# +# Topology: +# n1 (publisher) ---logical---> n2 (subscriber) +# ^ +# | physical streaming +# | synchronous_standby_names +# n2_standby +# +# Bug: +# When the subscriber had a synchronous physical standby attached, the +# apply worker queued the subscriber's local XactLastCommitEnd as the +# "remote" position in the feedback message and later sent it to the +# publisher as the slot's confirmed flush position. The two LSNs are +# from different WAL streams, so the publisher wrote a meaningless value +# into the slot's confirmed_flush_lsn -- typically far ahead of the +# publisher's own pg_current_wal_lsn. +# +# Assertion (symptom-level, sufficient to catch regression): +# Publisher's pg_replication_slots.confirmed_flush_lsn must never exceed +# the publisher's pg_current_wal_lsn while replication is healthy. +# ============================================================================= + +# -------------------------------------------------------------------------- +# Helper: query on an arbitrary port +# -------------------------------------------------------------------------- +sub qport { + my ($pg_bin, $host, $port, $dbname, $user, $sql) = @_; + my $out = `$pg_bin/psql -X -h $host -p $port -d $dbname -U $user -t -c "$sql" 2>/dev/null`; + $out //= ''; + $out =~ s/^\s+|\s+$//g; + return $out; +} + +# -------------------------------------------------------------------------- +# Helper: poll until condition or timeout +# -------------------------------------------------------------------------- +sub wait_until { + my ($timeout, $poll, $cond) = @_; + my $deadline = time() + $timeout; + while (time() < $deadline) { + return 1 if $cond->(); + sleep($poll); + } + return 0; +} + +# ========================================================================== +# 1. Create 2-node Spock cluster +# ========================================================================== +create_cluster(2, 'Create 2-node Spock cluster'); + +my $config = get_test_config(); +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; +my $node_ports = $config->{node_ports}; +my $node_dirs = $config->{node_datadirs}; +my $publisher_port = $node_ports->[0]; # n1 +my $subscriber_port = $node_ports->[1]; # n2 +my $subscriber_dir = $node_dirs->[1]; + +# ========================================================================== +# 2. Create subscription n2 -> n1 +# ========================================================================== +psql_or_bail(2, "SELECT spock.sub_create( + 'sub_n2_n1', + 'host=$host dbname=$dbname port=$publisher_port user=$db_user password=$db_password', + ARRAY['default','default_insert_only','ddl_sql'], + true, true +)"); + +my $sub_active = wait_until(60, 3, sub { + my $s = scalar_query(2, + "SELECT sub_enabled FROM spock.subscription WHERE sub_name = 'sub_n2_n1'"); + $s =~ s/\s+//g; + return $s eq 't'; +}); +ok($sub_active, 'Subscription sub_n2_n1 active on n2'); + +# ========================================================================== +# 3. Create a physical replication slot on n2 for the sync standby +# ========================================================================== +psql_or_bail(2, + "SELECT pg_create_physical_replication_slot('n2_sync_slot')"); +pass('Physical replication slot created on n2 for sync standby'); + +# ========================================================================== +# 4. Build a physical standby of n2 via pg_basebackup +# ========================================================================== +my $standby_port = $subscriber_port + 20; +my $standby_datadir = '/tmp/tmp_spock_sync_standby'; +my $standby_logdir = "$standby_datadir/pg_log"; + +system("rm -rf $standby_datadir 2>/dev/null"); +system_or_bail("$pg_bin/pg_basebackup", + '-D', $standby_datadir, + '-h', $host, '-p', $subscriber_port, '-U', $db_user, + '-X', 'stream', '-R'); +pass('Physical standby of n2 created via pg_basebackup'); + +# ========================================================================== +# 5. Configure standby +# ========================================================================== +system_or_bail('mkdir', '-p', $standby_logdir); +{ + open(my $conf, '>>', "$standby_datadir/postgresql.conf") + or die "Cannot open standby postgresql.conf: $!"; + print $conf "\n# ---- sync standby overrides ----\n"; + print $conf "port = $standby_port\n"; + print $conf "hot_standby = on\n"; + print $conf "hot_standby_feedback = on\n"; + print $conf "primary_slot_name = 'n2_sync_slot'\n"; + print $conf "log_directory = '$standby_logdir'\n"; + print $conf "log_filename = 'standby.log'\n"; + close($conf); +} + +system_or_bail("$pg_bin/pg_ctl", 'start', + '-D', $standby_datadir, '-l', "$standby_datadir/startup.log", '-w'); +command_ok(["$pg_bin/pg_isready", '-h', $host, '-p', $standby_port], + 'Sync standby is accepting connections'); + +my $in_recovery = qport($pg_bin, $host, $standby_port, + $dbname, $db_user, "SELECT pg_is_in_recovery()"); +is($in_recovery, 't', 'Sync standby is in recovery (streaming from n2)'); + +# ========================================================================== +# 6. Configure n2 with synchronous_standby_names pointing at the standby +# ========================================================================== +# Wait for the standby walreceiver to register so synchronous_standby_names +# actually has someone to wait on (otherwise commits block forever). +my $standby_registered = wait_until(30, 2, sub { + my $c = scalar_query(2, + "SELECT count(*) FROM pg_stat_replication WHERE application_name = 'walreceiver'"); + $c =~ s/\s+//g; + return $c > 0; +}); +ok($standby_registered, 'n2_standby walreceiver is registered on n2'); + +psql_or_bail(2, "ALTER SYSTEM SET synchronous_standby_names = 'walreceiver'"); +# synchronous_commit = 'on' is the default and already means "wait for the +# named standbys to flush"; we only need to set it explicitly if the cluster +# config has overridden it. +psql_or_bail(2, "ALTER SYSTEM SET synchronous_commit = 'on'"); +psql_or_bail(2, "SELECT pg_reload_conf()"); +sleep(2); + +my $ssn = scalar_query(2, "SHOW synchronous_standby_names"); +$ssn =~ s/\s+//g; +is($ssn, 'walreceiver', + 'n2 synchronous_standby_names = walreceiver'); + +# ========================================================================== +# 7. Generate traffic from n1 and let it flow through n2 (with sync wait) +# ========================================================================== +psql_or_bail(1, + "CREATE TABLE IF NOT EXISTS feedback_test (id int PRIMARY KEY, v text)"); +sleep(3); + +# Insert a batch of rows on the publisher; each commit on n2 will block +# until the sync standby has flushed, exercising the feedback queue. +for my $i (1 .. 20) { + psql_or_bail(1, "INSERT INTO feedback_test VALUES ($i, 'row_$i')"); +} + +# Wait until the last row is visible on n2 +my $applied = wait_until(60, 2, sub { + my $v = scalar_query(2, "SELECT v FROM feedback_test WHERE id = 20"); + $v =~ s/\s+//g; + return $v eq 'row_20'; +}); +ok($applied, 'All 20 rows replicated n1 -> n2'); + +# ========================================================================== +# 8. Force the apply worker to send a feedback message so the publisher's +# confirmed_flush_lsn is up-to-date with whatever the subscriber is +# reporting (good or bad). A keepalive cycle is enough; sleeping +# longer than the default wal_receiver_status_interval covers it. +# ========================================================================== +sleep(12); + +# ========================================================================== +# 9. ASSERTION: publisher's slot confirmed_flush_lsn must not exceed +# publisher's current_wal_lsn. Before the fix the subscriber sent its +# local commit LSN, which lives in a different WAL stream and was +# typically far ahead of n1's own WAL position. +# ========================================================================== +my $cmp = scalar_query(1, " + SELECT + s.slot_name, + pg_wal_lsn_diff(pg_current_wal_lsn(), s.confirmed_flush_lsn) AS slack_bytes, + (s.confirmed_flush_lsn <= pg_current_wal_lsn()) AS ok + FROM pg_replication_slots s + WHERE s.slot_type = 'logical' + AND s.slot_name LIKE 'spk_${dbname}_n1_%' +"); +diag(" publisher slot vs current_wal_lsn: $cmp"); + +my $slot_ok = scalar_query(1, " + SELECT bool_and(s.confirmed_flush_lsn <= pg_current_wal_lsn()) + FROM pg_replication_slots s + WHERE s.slot_type = 'logical' + AND s.slot_name LIKE 'spk_${dbname}_n1_%' +"); +$slot_ok =~ s/\s+//g; +is($slot_ok, 't', + "publisher's confirmed_flush_lsn <= pg_current_wal_lsn (sync-standby feedback " + . "uses REMOTE LSN, not local)"); + +# Additional sanity: confirmed_flush_lsn must be > 0 (we did apply work) +my $cf_nonzero = scalar_query(1, " + SELECT bool_and(s.confirmed_flush_lsn > '0/0'::pg_lsn) + FROM pg_replication_slots s + WHERE s.slot_type = 'logical' + AND s.slot_name LIKE 'spk_${dbname}_n1_%' +"); +$cf_nonzero =~ s/\s+//g; +is($cf_nonzero, 't', + "publisher's confirmed_flush_lsn advanced past 0/0 (feedback is flowing)"); + +# ========================================================================== +# Cleanup +# ========================================================================== + +# Undo synchronous_standby_names on n2 so destroy_cluster can stop n2 cleanly +# even if we kill the standby first. +system_maybe("$pg_bin/psql", '-h', $host, '-p', $subscriber_port, + '-d', $dbname, '-U', $db_user, + '-c', "ALTER SYSTEM RESET synchronous_standby_names"); +system_maybe("$pg_bin/psql", '-h', $host, '-p', $subscriber_port, + '-d', $dbname, '-U', $db_user, + '-c', "ALTER SYSTEM RESET synchronous_commit"); +system_maybe("$pg_bin/psql", '-h', $host, '-p', $subscriber_port, + '-d', $dbname, '-U', $db_user, + '-c', "SELECT pg_reload_conf()"); + +system("$pg_bin/pg_ctl stop -D $standby_datadir -m immediate >> /dev/null 2>&1"); +system("rm -rf $standby_datadir 2>/dev/null"); + +destroy_cluster('Destroy test cluster'); +done_testing(); From 0cb6e0f1dc12352b2934f65a3fc0b75e97b61cce Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Fri, 5 Jun 2026 17:05:58 +0500 Subject: [PATCH 3/3] tests: cover sync-standby failover path in feedback LSN regression Promote the subscriber's sync standby after the publisher commits and assert every publisher row is present on the new primary. The original bug surfaced as silently dropped writes after promotion; the LSN symptom check alone does not exercise that path. --- tests/tap/t/020_sync_standby_feedback_lsn.pl | 74 ++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/tests/tap/t/020_sync_standby_feedback_lsn.pl b/tests/tap/t/020_sync_standby_feedback_lsn.pl index 6d914b96..a85b7027 100644 --- a/tests/tap/t/020_sync_standby_feedback_lsn.pl +++ b/tests/tap/t/020_sync_standby_feedback_lsn.pl @@ -237,6 +237,80 @@ sub wait_until { is($cf_nonzero, 't', "publisher's confirmed_flush_lsn advanced past 0/0 (feedback is flowing)"); +# ========================================================================== +# 10. Failover scenario the original bug surfaced through: stop the +# subscriber, promote its sync standby, and verify no writes from the +# publisher are missing. Before the fix the publisher's slot was +# poisoned with a subscriber-local LSN, so after promotion the new +# primary would resume from a wrong position and silently drop the +# window of recent commits. +# ========================================================================== +psql_or_bail(1, "INSERT INTO feedback_test SELECT g, 'pre_failover_'||g " + . "FROM generate_series(21, 40) g"); + +my $pre_failover_applied = wait_until(60, 2, sub { + my $v = scalar_query(2, "SELECT v FROM feedback_test WHERE id = 40"); + $v =~ s/\s+//g; + return $v eq 'pre_failover_40'; +}); +ok($pre_failover_applied, 'pre-failover batch (ids 21..40) replicated to n2'); + +my $expected_count = scalar_query(1, + "SELECT count(*) FROM feedback_test"); +$expected_count =~ s/\s+//g; + +# Wait briefly for n2_standby to flush the latest commits (synchronous +# replication makes this almost immediate, but a small grace handles the +# walreceiver round-trip). +sleep(2); + +# Stop the subscriber (n2). Use fast shutdown so the apply worker exits +# cleanly before the standby is promoted. +diag("Stopping n2 (subscriber) to simulate failover..."); +system("$pg_bin/pg_ctl stop -D $node_dirs->[1] -m fast >> /dev/null 2>&1"); +sleep(3); + +diag("Promoting n2_standby..."); +system("$pg_bin/pg_ctl promote -D $standby_datadir >> /dev/null 2>&1"); + +my $promoted = wait_until(30, 2, sub { + my $r = qport($pg_bin, $host, $standby_port, + $dbname, $db_user, "SELECT pg_is_in_recovery()"); + $r =~ s/\s+//g; + return $r eq 'f'; +}); +ok($promoted, 'n2_standby promoted to primary (no longer in recovery)'); + +# The decisive check: every row the publisher committed before the failover +# must be present on the promoted standby. With the bug, the publisher's +# slot.confirmed_flush_lsn pointed at a subscriber-local LSN; after +# promotion the new primary would resume from there and miss recent +# commits. With the fix, confirmed_flush_lsn tracks the publisher's WAL, +# so every committed row is durable on the standby before failover. +my $standby_count = qport($pg_bin, $host, $standby_port, + $dbname, $db_user, + "SELECT count(*) FROM feedback_test"); +$standby_count =~ s/\s+//g; +is($standby_count, $expected_count, + "promoted standby has all $expected_count publisher rows " + . "(no writes lost across failover)"); + +# Spot-check the boundary rows so a wrong count is not masked by an off-by-one +my $boundary_ok = qport($pg_bin, $host, $standby_port, + $dbname, $db_user, + "SELECT bool_and(v IS NOT NULL) FROM feedback_test " + . "WHERE id IN (1, 20, 21, 40)"); +$boundary_ok =~ s/\s+//g; +is($boundary_ok, 't', + 'promoted standby has the first, last, and failover-boundary rows'); + +# Stop the promoted standby so destroy_cluster can run; n2 is already down. +system("$pg_bin/pg_ctl stop -D $standby_datadir -m immediate >> /dev/null 2>&1"); + +# Restart n2 so destroy_cluster can connect cleanly (matches 018's pattern). +system("$pg_bin/postgres -D $node_dirs->[1] >> /dev/null 2>&1 &"); +sleep(10); + # ========================================================================== # Cleanup # ==========================================================================