Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 90 additions & 45 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,37 @@ static ActionErrCallbackArg errcallback_arg;
TransactionId remote_xid;

/*
* Structure of RemoteSyncPosition to Save the LSN in case of
* Synchronous replica is attached
* Per-transaction feedback hold for the synchronous-replica path.
*
* When a local synchronous standby is configured (synchronous_standby_names
* is set), we cannot report a remote commit LSN back to the publisher until
* the corresponding local commit has been flushed on that sync standby --
* otherwise the publisher could advance its slot past changes that a
* fail-over of THIS node has not yet preserved.
*
* The two LSNs are not interchangeable:
* - local_commit_lsn is the subscriber's XactLastCommitEnd, used ONLY to
* compare against WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH]
* to decide when the hold can be released.
* - remote_recvpos / remote_writepos / remote_flushpos are positions in
* the publisher's WAL stream. These are what we send
* in the standby-status feedback message; the
* publisher writes flushpos into the slot's
* confirmed_flush_lsn.
*
* A prior version of this struct stored only "recvpos" and reused it for
* both purposes -- the local LSN ended up being sent to the publisher,
* which then poisoned the slot's confirmed_flush_lsn with a value from a
* different WAL space. After a failover, the new primary would resume from
* that bogus LSN and silently drop every transaction in the window.
*/
typedef struct RemoteSyncPosition
{
dlist_node node;
XLogRecPtr recvpos;
XLogRecPtr flushpos;
XLogRecPtr writepos;
dlist_node node;
XLogRecPtr local_commit_lsn; /* gating: subscriber's XactLastCommitEnd */
XLogRecPtr remote_recvpos; /* feedback: publisher write LSN */
XLogRecPtr remote_flushpos; /* feedback: publisher flush LSN */
XLogRecPtr remote_writepos; /* feedback: publisher apply LSN */
} RemoteSyncPosition;

/*
Expand Down Expand Up @@ -305,7 +327,8 @@ static void apply_replay_entry_free(ApplyReplayEntry *entry);
static void apply_replay_queue_reset(void);
static void maybe_send_feedback(PGconn *applyconn, XLogRecPtr lsn_to_send,
TimestampTz *last_receive_timestamp);
static void append_feedback_position(XLogRecPtr recvpos);
static void append_feedback_position(XLogRecPtr local_commit_lsn,
XLogRecPtr remote_commit_lsn);
static void get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos,
XLogRecPtr *flushpos, XLogRecPtr *max_recvpos);

Expand Down Expand Up @@ -1082,7 +1105,7 @@ handle_commit(StringInfo s)
CommitTransactionCommand();

if (WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED)
append_feedback_position(XactLastCommitEnd);
append_feedback_position(XactLastCommitEnd, end_lsn);

remoteTransactionStopTimestamp = 0;

Expand Down Expand Up @@ -2768,55 +2791,71 @@ get_flush_position(XLogRecPtr *write, XLogRecPtr *flush)
}

/*
* If a synchronous replica is attached, in code we set synchronous commit off.
* This can be dangerous because feedback might be sent before receiving
* acknowledgment from the remote synchronous replica. To handle this,
* a list of locally committed LSNs is maintained. Feedback is delayed
* until acknowledgment is received from the remote synchronous replica,
* thus avoiding blocking the transaction while ensuring data consistency.
* If a synchronous replica is attached, we cannot send feedback to the
* publisher the instant the apply transaction commits locally: the local
* commit is not yet durable until the sync standby has flushed it, and
* if we tell the publisher we have flushed the change it may advance its
* slot past changes that a fail-over of THIS node would lose.
*
* To bridge that gap without blocking apply (we run with synchronous_commit
* off internally), we record a hold entry per transaction with two
* independent positions:
*
* local_commit_lsn -- subscriber's XactLastCommitEnd; compared against
* WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH] in
* get_feedback_position() to know when the sync
* standby has the commit.
* remote_* -- the publisher-side positions we will eventually
* report (recv = write position in the publisher's
* WAL stream we have processed, flush/write are the
* equivalent positions returned by get_flush_position
* once the local commit is durable).
*/
static void
append_feedback_position(XLogRecPtr recvpos)
append_feedback_position(XLogRecPtr local_commit_lsn, XLogRecPtr remote_commit_lsn)
{
XLogRecPtr writepos;
XLogRecPtr flushpos;
XLogRecPtr remote_writepos;
XLogRecPtr remote_flushpos;
RemoteSyncPosition *syncpos;
MemoryContext oldctx;

Assert(WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED);

if (get_flush_position(&writepos, &flushpos))
if (get_flush_position(&remote_writepos, &remote_flushpos))
{
/*
* No outstanding transactions to flush, we can report the latest
* received position. This is important for synchronous replication.
* Nothing else outstanding -- the position we just committed is the
* tip of what we have applied from the publisher. This must be the
* REMOTE LSN, not the local one.
*/
flushpos = writepos = recvpos;
remote_flushpos = remote_writepos = remote_commit_lsn;
}

/* Ensure that we are allocating in the top memory context */
oldctx = MemoryContextSwitchTo(TopMemoryContext);
syncpos = (RemoteSyncPosition *) palloc0(sizeof(RemoteSyncPosition));
MemoryContextSwitchTo(oldctx);

syncpos->recvpos = recvpos;
syncpos->writepos = writepos;
syncpos->flushpos = flushpos;
syncpos->local_commit_lsn = local_commit_lsn;
syncpos->remote_recvpos = remote_commit_lsn;
syncpos->remote_writepos = remote_writepos;
syncpos->remote_flushpos = remote_flushpos;
dlist_push_tail(&sync_replica_lsn, &syncpos->node);
elog(DEBUG2, "SPOCK %s: appended feedback to list %X/%X, write %X/%X, flush %X/%X",
elog(DEBUG2,
"SPOCK %s: queued sync-hold local=%X/%X remote_recv=%X/%X remote_write=%X/%X remote_flush=%X/%X",
MySubscription->name,
(uint32) (recvpos >> 32), (uint32) recvpos,
(uint32) (writepos >> 32), (uint32) writepos,
(uint32) (flushpos >> 32), (uint32) flushpos
);
LSN_FORMAT_ARGS(local_commit_lsn),
LSN_FORMAT_ARGS(remote_commit_lsn),
LSN_FORMAT_ARGS(remote_writepos),
LSN_FORMAT_ARGS(remote_flushpos));
}

/*
* As we have maintained a list of LSNs that are waiting for
* acknowledgment from the synchronous replica, we need to get the
* feedback position from the list and send it to the Spock node attached to it.
* This ensures that we only send feedback that is committed and acknowledged
* by the synchronous replica.
* Walk the queue of sync-replica holds populated by append_feedback_position()
* and release any whose LOCAL commit has been flushed on the local sync
* standby. For each released entry we overwrite the caller's *recvpos /
* *writepos / *flushpos with the entry's REMOTE positions -- those are
* what go into the standby-status reply to the publisher.
*/
static void
get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos, XLogRecPtr *flushpos, XLogRecPtr *max_recvpos)
Expand All @@ -2838,24 +2877,30 @@ get_feedback_position(XLogRecPtr *recvpos, XLogRecPtr *writepos, XLogRecPtr *flu
if (syncpos == NULL)
break;

if (syncpos->recvpos <= WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH])
/*
* Gate on the LOCAL commit reaching the sync standby's flush LSN
* (WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH] is a LOCAL LSN). Once that
* holds, it is safe to tell the publisher about the corresponding
* REMOTE position.
*/
if (syncpos->local_commit_lsn <= WalSndCtl->lsn[SYNC_REP_WAIT_FLUSH])
{
*recvpos = syncpos->recvpos;
*writepos = syncpos->writepos;
*flushpos = syncpos->flushpos;
elog(DEBUG2, "SPOCK %s: received feedback %X/%X, "
"write %X/%X, flush %X/%X",
*recvpos = syncpos->remote_recvpos;
*writepos = syncpos->remote_writepos;
*flushpos = syncpos->remote_flushpos;
elog(DEBUG2,
"SPOCK %s: releasing sync-hold local=%X/%X -> remote recv=%X/%X write=%X/%X flush=%X/%X",
MySubscription->name,
(uint32) (*recvpos >> 32), (uint32) *recvpos,
(uint32) (*writepos >> 32), (uint32) *writepos,
(uint32) (*flushpos >> 32), (uint32) *flushpos
);
LSN_FORMAT_ARGS(syncpos->local_commit_lsn),
LSN_FORMAT_ARGS(*recvpos),
LSN_FORMAT_ARGS(*writepos),
LSN_FORMAT_ARGS(*flushpos));
dlist_delete(iter1.cur);
pfree(syncpos);
syncpos = NULL;
}
if (syncpos != NULL)
*max_recvpos = syncpos->recvpos;
*max_recvpos = syncpos->remote_recvpos;
}
/* Release the lock */
LWLockRelease(SyncRepLock);
Expand Down
Loading
Loading