Skip to content

Commit b3df1e5

Browse files
authored
fix(postgres): make advisory lock cancel safe (#4199)
* fix(postgres): make advisory lock cancel safe * fix(postgres): prepare advisory lock acquire query first * document cancel safety
1 parent 4ba3043 commit b3df1e5

File tree

1 file changed

+31
-14
lines changed

1 file changed

+31
-14
lines changed

sqlx-postgres/src/advisory_lock.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use crate::Either;
33
use crate::PgConnection;
44
use hkdf::Hkdf;
55
use sha2::Sha256;
6+
use sqlx_core::executor::Executor;
7+
use sqlx_core::sql_str::SqlSafeStr;
68
use std::ops::{Deref, DerefMut};
79
use std::sync::Arc;
810
use std::sync::OnceLock;
@@ -199,27 +201,36 @@ impl PgAdvisoryLock {
199201
/// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
200202
///
201203
/// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
204+
///
205+
/// # Cancel Safety
206+
///
207+
/// This method is cancel safe. If the future is dropped before the query completes, a
208+
/// `pg_advisory_unlock()` call is queued and run the next time the connection is used.
202209
pub async fn acquire<C: AsMut<PgConnection>>(
203210
&self,
204211
mut conn: C,
205212
) -> Result<PgAdvisoryLockGuard<C>> {
213+
let query = match &self.key {
214+
PgAdvisoryLockKey::BigInt(_) => "SELECT pg_advisory_lock($1)",
215+
PgAdvisoryLockKey::IntPair(_, _) => "SELECT pg_advisory_lock($1, $2)",
216+
};
217+
218+
let stmt = conn.as_mut().prepare(query.into_sql_str()).await?;
219+
let query = crate::query::query_statement(&stmt);
220+
221+
// We're wrapping the connection in a `PgAdvisoryLockGuard` early here on purpose. If this
222+
// future is dropped, the lock will be released in the drop impl.
223+
let mut guard = PgAdvisoryLockGuard::new(self.clone(), conn);
224+
let conn = guard.conn.as_mut().unwrap();
225+
206226
match &self.key {
207-
PgAdvisoryLockKey::BigInt(key) => {
208-
crate::query::query("SELECT pg_advisory_lock($1)")
209-
.bind(key)
210-
.execute(conn.as_mut())
211-
.await?;
212-
}
213-
PgAdvisoryLockKey::IntPair(key1, key2) => {
214-
crate::query::query("SELECT pg_advisory_lock($1, $2)")
215-
.bind(key1)
216-
.bind(key2)
217-
.execute(conn.as_mut())
218-
.await?;
219-
}
227+
PgAdvisoryLockKey::BigInt(key) => query.bind(key),
228+
PgAdvisoryLockKey::IntPair(key1, key2) => query.bind(key1).bind(key2),
220229
}
230+
.execute(conn.as_mut())
231+
.await?;
221232

222-
Ok(PgAdvisoryLockGuard::new(self.clone(), conn))
233+
Ok(guard)
223234
}
224235

225236
/// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
@@ -242,6 +253,12 @@ impl PgAdvisoryLock {
242253
/// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
243254
///
244255
/// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
256+
///
257+
/// # Cancel Safety
258+
///
259+
/// This method is **not** cancel safe. If the future is dropped while the query is in-flight,
260+
/// it is not possible to know whether the lock was acquired, so it cannot be safely released.
261+
/// The lock may remain held until the connection is closed.
245262
pub async fn try_acquire<C: AsMut<PgConnection>>(
246263
&self,
247264
mut conn: C,

0 commit comments

Comments
 (0)