Skip to content

Commit d18120d

Browse files
committed
WIP fix(postgres): use non-prepared statements for metadata queries (2)
1 parent 07580d5 commit d18120d

File tree

3 files changed

+99
-64
lines changed

3 files changed

+99
-64
lines changed

sqlx-postgres/src/connection/executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::io::{PortalId, StatementId};
44
use crate::logger::QueryLogger;
55
use crate::message::{
66
self, BackendMessageFormat, Bind, Close, CommandComplete, DataRow, ParameterDescription, Parse,
7-
ParseComplete, Query, RowDescription,
7+
ParseComplete, RowDescription,
88
};
99
use crate::statement::PgStatementMetadata;
1010
use crate::{

sqlx-postgres/src/connection/resolve.rs

Lines changed: 97 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ use crate::types::Oid;
99
use crate::{HashMap, PgRow, PgValueRef, Postgres};
1010
use crate::{PgColumn, PgConnection, PgTypeInfo};
1111
use sqlx_core::column::{ColumnOrigin, TableColumn};
12-
use sqlx_core::database::Database;
1312
use sqlx_core::decode::Decode;
1413
use sqlx_core::error::BoxDynError;
1514
use sqlx_core::from_row::FromRow;
1615
use sqlx_core::raw_sql::raw_sql;
1716
use sqlx_core::row::Row;
18-
use sqlx_core::sql_str::{AssertSqlSafe, SqlSafeStr};
17+
use sqlx_core::sql_str::AssertSqlSafe;
1918
use sqlx_core::types::Type;
19+
use std::collections::VecDeque;
2020
use std::fmt::Display;
2121
use std::mem;
2222
use std::ops::ControlFlow;
@@ -358,33 +358,36 @@ WHERE rngtypid = $1
358358
) -> Result<Vec<Oid>, Error> {
359359
let mut oids = Vec::with_capacity(types.len());
360360

361-
let mut unresolved_types = types.iter();
361+
let mut unresolved_types = types.iter().peekable();
362362

363-
for ty in &mut unresolved_types {
363+
// Eagerly try to resolve types, stopping at the first unresolved type
364+
while let Some(ty) = unresolved_types.peek() {
364365
let Some(oid) = self.try_type_to_oid(ty) else {
365366
break;
366367
};
367368

368369
oids.push(oid);
370+
unresolved_types.next();
369371
}
370372

371-
// Fast-path
373+
// Fast-path: all types resolved
372374
if oids.len() == types.len() {
373375
return Ok(oids);
374376
}
375377

376378
let mut resolver = TypesResolver::default();
377379

378380
for ty in unresolved_types.clone() {
379-
if let Some(_) = self.try_type_to_oid(ty) {
381+
// Skip over subsequent types that are already resolved
382+
if self.try_type_to_oid(ty).is_some() {
380383
continue;
381384
}
382385

383386
resolver.push_type(
384387
// `escape_default()` should produce a valid SQL string literal
385388
// https://doc.rust-lang.org/stable/std/primitive.char.html#method.escape_default
386389
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-STRINGS-ESCAPE
387-
//
390+
format_args!("'{}'", ty.name().escape_default()),
388391
// `to_regtype()` evaluates to `NULL` if the type does not exist,
389392
// instead of throwing an exception
390393
format_args!("to_regtype('{}')::oid", ty.name().escape_default()),
@@ -394,13 +397,12 @@ WHERE rngtypid = $1
394397
resolver.fill_cache(self).await?;
395398

396399
for ty in unresolved_types {
397-
let Some(oid) = self.try_type_to_oid(ty) else {
398-
return Err(Error::TypeNotFound {
399-
type_name: ty.name().to_string(),
400-
});
401-
};
402-
403-
oids.push(oid);
400+
oids.push(
401+
self.try_type_to_oid(ty)
402+
.ok_or_else(|| Error::TypeNotFound {
403+
type_name: ty.name().to_string(),
404+
})?,
405+
);
404406
}
405407

406408
Ok(oids)
@@ -414,26 +416,14 @@ WHERE rngtypid = $1
414416
match &ty.0 {
415417
PgType::DeclareWithName(name) => self.inner.cache_type_oid.get(name).copied(),
416418
PgType::DeclareArrayOf(array) => {
417-
self.inner.cache_type_oid.get(&array.elem_name).copied()
419+
let typelem = self.inner.cache_type_oid.get(&array.elem_name).copied()?;
420+
self.inner.cache_elem_type_to_array.get(&typelem).copied()
418421
}
419422
// `.try_oid()` should return `Some()` or it should be covered here
420423
_ => unreachable!("(bug) OID should be resolvable for type {ty:?}"),
421424
}
422425
}
423426

424-
pub(crate) async fn resolve_type_id(&mut self, ty: &PgType) -> Result<Oid, Error> {
425-
if let Some(oid) = ty.try_oid() {
426-
return Ok(oid);
427-
}
428-
429-
match ty {
430-
PgType::DeclareWithName(name) => self.fetch_type_id_by_name(name).await,
431-
PgType::DeclareArrayOf(array) => self.fetch_array_type_id(array).await,
432-
// `.try_oid()` should return `Some()` or it should be covered here
433-
_ => unreachable!("(bug) OID should be resolvable for type {ty:?}"),
434-
}
435-
}
436-
437427
pub(crate) async fn fetch_type_id_by_name(&mut self, name: &str) -> Result<Oid, Error> {
438428
if let Some(oid) = self.inner.cache_type_oid.get(name) {
439429
return Ok(*oid);
@@ -492,6 +482,25 @@ WHERE rngtypid = $1
492482

493483
fn try_cache_type(&mut self, ty: &TypeResolverRow) -> Result<ControlFlow<Oid>, Error> {
494484
if self.try_type_by_oid(ty.oid).is_some() {
485+
// We hit this code path because one of these names didn't resolve,
486+
// cache them both.
487+
self.inner
488+
.cache_type_oid
489+
.insert(UStr::new(&ty.catalog_name), ty.oid);
490+
self.inner
491+
.cache_type_oid
492+
.insert(UStr::new(&ty.pretty_name), ty.oid);
493+
494+
if let Some(original_name) = &ty.original_name {
495+
self.inner
496+
.cache_type_oid
497+
.insert(UStr::new(original_name), ty.oid);
498+
}
499+
500+
return Ok(ControlFlow::Continue(()));
501+
}
502+
503+
if self.inner.cache_type_info.contains_key(&ty.oid) {
495504
return Ok(ControlFlow::Continue(()));
496505
}
497506

@@ -521,6 +530,8 @@ WHERE rngtypid = $1
521530
return Ok(ControlFlow::Break(typelem));
522531
};
523532

533+
self.inner.cache_elem_type_to_array.insert(typelem, ty.oid);
534+
524535
PgTypeKind::Array(elem_type)
525536
}
526537

@@ -559,13 +570,27 @@ WHERE rngtypid = $1
559570
_ => PgTypeKind::Simple,
560571
};
561572

562-
let typname = UStr::new(&ty.typname);
573+
let typname = UStr::new(&ty.pretty_name);
563574

564575
self.inner
565576
.cache_type_oid
566577
.entry_ref(&typname)
567578
.or_insert(ty.oid);
568579

580+
if ty.pretty_name != ty.catalog_name {
581+
self.inner
582+
.cache_type_oid
583+
.entry(UStr::new(&ty.catalog_name))
584+
.or_insert(ty.oid);
585+
}
586+
587+
if let Some(original_name) = &ty.original_name {
588+
self.inner
589+
.cache_type_oid
590+
.entry(UStr::new(original_name))
591+
.or_insert(ty.oid);
592+
}
593+
569594
self.inner.cache_type_info.entry(ty.oid).or_insert_with(|| {
570595
PgTypeInfo(PgType::Custom(Arc::new(PgCustomType {
571596
kind: custom_type_kind,
@@ -584,7 +609,7 @@ struct TypesResolver {
584609
}
585610

586611
impl TypesResolver {
587-
fn push_type(&mut self, oid_expr: impl Display) {
612+
fn push_type(&mut self, original_name: impl Display, oid_expr: impl Display) {
588613
use std::fmt::Write;
589614

590615
// Lazily push the preamble to `self.query` so we don't allocate in the fast path
@@ -593,30 +618,33 @@ impl TypesResolver {
593618
write!(
594619
&mut self.query,
595620
"SELECT pg_type.oid,\n\
596-
pg_type.oid::regtype::text typname,\n\
621+
pg_type.oid::regtype::text pretty_name,\n\
622+
typname catalog_name,\n\
623+
original_name,\n\
597624
typtype,\n\
598625
typcategory,\n\
599-
typrelid,\n\
600626
typelem,\n\
601627
typbasetype,\n\
602628
rngsubtype,\n\
603-
(SELECT array_agg(enumlabel)\n\
604-
FROM pg_catalog.pg_enum\n\
605-
WHERE enumtypid = pg_type.oid\n\
606-
ORDER BY enumsortorder) enum_labels,\n\
607-
(SELECT array_agg((attname, atttypid))\n\
608-
FROM pg_catalog.pg_attribute\n\
609-
WHERE attrelid = pg_type.oid\n\
610-
AND NOT attisdropped\n\
611-
AND attnum > 0\n\
612-
ORDER BY attnum) record_attributes\n\
613-
FROM pg_catalog.pg_type\n\
614-
LEFT JOIN pg_catalog.pg_range ON pg_type.oid = pg_range.oid\n\
615-
WHERE oid IN ({oid_expr}"
629+
COALESCE(\
630+
(SELECT array_agg(enumlabel) OVER (ORDER BY enumsortorder)\n\
631+
FROM pg_catalog.pg_enum\n\
632+
WHERE enumtypid = pg_type.oid),\n\
633+
'{{}}') enum_labels,\n\
634+
COALESCE(\n\
635+
(SELECT array_agg((attname, atttypid)) FROM (SELECT *\n\
636+
FROM pg_catalog.pg_attribute\n\
637+
WHERE attrelid = pg_type.typrelid\n\
638+
AND NOT attisdropped\n\
639+
AND attnum > 0\n\
640+
ORDER BY attnum)),\n\
641+
'{{}}') record_attributes\n\
642+
FROM (SELECT DISTINCT ON(lookup_oid) original_name, lookup_oid\n\
643+
FROM (VALUES ({original_name}, {oid_expr})"
616644
)
617645
.expect("error writing type expression to query string")
618646
} else {
619-
write!(&mut self.query, ", {oid_expr}")
647+
write!(&mut self.query, ", ({original_name}, {oid_expr})")
620648
.expect("error writing type expression to query string")
621649
}
622650
}
@@ -630,7 +658,12 @@ impl TypesResolver {
630658
// * Makes this type reusable if we want to for whatever reason
631659
// * Avoids an allocation when converting to `SqlStr`
632660
let mut query = mem::take(&mut self.query);
633-
query.push(')');
661+
query.push_str(
662+
") lookup_inner(original_name, lookup_oid)\n\
663+
ORDER BY lookup_oid) type_lookup\n\
664+
INNER JOIN pg_catalog.pg_type ON type_lookup.lookup_oid = pg_type.oid\n\
665+
LEFT JOIN pg_catalog.pg_range ON pg_type.oid = pg_range.rngtypid",
666+
);
634667

635668
let types = raw_sql(AssertSqlSafe(query)).fetch_all(&mut *conn).await?;
636669

@@ -639,30 +672,28 @@ impl TypesResolver {
639672
'outer: for row in types {
640673
let mut type_row = TypeResolverRow::from_row(&row)?;
641674

642-
let mut dependent_types = existing_dependencies
643-
.remove(&type_row.oid)
644-
.unwrap_or_default();
675+
let mut resolved_dependencies = VecDeque::new();
645676

646677
loop {
647678
if let ControlFlow::Break(missing_oid) = conn.try_cache_type(&type_row)? {
648-
if !dependent_types.is_empty() {
649-
new_dependencies
650-
.entry(type_row.oid)
651-
.or_default()
652-
.extend(dependent_types);
653-
}
654-
655679
new_dependencies
656680
.entry(missing_oid)
657681
.or_default()
658682
.push(type_row);
659683

660-
self.push_type(missing_oid.0);
684+
self.push_type("NULL", missing_oid.0);
661685

662686
continue 'outer;
663687
}
664688

665-
if let Some(next_row) = dependent_types.pop() {
689+
resolved_dependencies.extend(
690+
existing_dependencies
691+
.remove(&type_row.oid)
692+
.unwrap_or_default(),
693+
);
694+
695+
// Iteratively mark existing dependencies as resolved
696+
if let Some(next_row) = resolved_dependencies.pop_back() {
666697
type_row = next_row
667698
} else {
668699
break;
@@ -687,10 +718,12 @@ impl TypesResolver {
687718
#[derive(Debug)]
688719
struct TypeResolverRow {
689720
oid: Oid,
690-
typname: String,
721+
// Most of the time, these are the same but not necessarily for arrays
722+
pretty_name: String,
723+
catalog_name: String,
724+
original_name: Option<String>,
691725
typtype: TypType,
692726
typcategory: TypCategory,
693-
typrelid: Option<Oid>,
694727
typelem: Option<Oid>,
695728
typbasetype: Option<Oid>,
696729
rngsubtype: Option<Oid>,
@@ -703,10 +736,11 @@ impl<'r> FromRow<'r, PgRow> for TypeResolverRow {
703736
fn from_row(row: &'r PgRow) -> Result<Self, Error> {
704737
Ok(Self {
705738
oid: row.try_get("oid")?,
706-
typname: row.try_get("typname")?,
739+
pretty_name: row.try_get("pretty_name")?,
740+
catalog_name: row.try_get("catalog_name")?,
741+
original_name: row.try_get("original_name")?,
707742
typtype: row.try_get("typtype")?,
708743
typcategory: row.try_get("typcategory")?,
709-
typrelid: row.try_get("typrelid")?,
710744
typelem: row.try_get("typelem")?,
711745
typbasetype: row.try_get("typbasetype")?,
712746
rngsubtype: row.try_get("rngsubtype")?,

tests/postgres/postgres.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2086,6 +2086,7 @@ async fn test_issue_3052() {
20862086
}
20872087

20882088
#[sqlx_macros::test]
2089+
#[cfg(feature = "chrono")]
20892090
async fn test_bind_iter() -> anyhow::Result<()> {
20902091
use sqlx::postgres::PgBindIterExt;
20912092
use sqlx::types::chrono::{DateTime, Utc};

0 commit comments

Comments
 (0)