@@ -190,12 +190,19 @@ def executor(state, timeout_):
190190 async def execute_baked (self , baked_query , timeout , args , one ):
191191 conn , timeout = await self ._acquire (timeout )
192192 stmt = conn .baked_queries .get (baked_query )
193- if not stmt :
194- raise RuntimeError ("query is not baked yet" )
195-
196- # work around prepared statement limit per connection acquisition
197- # https://github.com/MagicStack/asyncpg/issues/190
198- stmt ._con_release_ctr = conn ._pool_release_ctr
193+ if stmt :
194+ # work around prepared statement limit per connection acquisition
195+ # https://github.com/MagicStack/asyncpg/issues/190
196+ stmt ._con_release_ctr = conn ._pool_release_ctr
197+ else :
198+ if timeout :
199+ before = time .monotonic ()
200+ stmt = await conn .prepare (baked_query .sql , timeout = timeout )
201+ after = time .monotonic ()
202+ timeout -= after - before
203+ else :
204+ stmt = await conn .prepare (baked_query .sql )
205+ conn .baked_queries [baked_query ] = stmt
199206
200207 if one :
201208 rv = await stmt .fetchrow (* args , timeout = timeout )
@@ -218,17 +225,17 @@ def get_statusmsg(self):
218225
219226
220227class Pool (base .Pool ):
221- def __init__ (self , url , loop , bakery = None , ** kwargs ):
228+ def __init__ (self , url , loop , bakery = None , prebake = True , ** kwargs ):
222229 self ._url = url
223230 self ._loop = loop
224231 self ._kwargs = kwargs
225232 self ._pool = None
226233 self ._bakery = bakery
227234 self ._init_hook = None
235+ self ._prebake = prebake
228236
229237 async def _init (self ):
230238 args = self ._kwargs .copy ()
231- self ._init_hook = args .pop ("init" , None )
232239
233240 class Connection (args .pop ("connection_class" , asyncpg .Connection )):
234241 __slots__ = ("baked_queries" ,)
@@ -244,9 +251,11 @@ def __init__(self, *pargs, **kwargs):
244251 user = self ._url .username ,
245252 database = self ._url .database ,
246253 password = self ._url .password ,
247- init = self ._bake ,
248254 connection_class = Connection ,
249255 )
256+ if self ._prebake and self ._bakery :
257+ self ._init_hook = args .pop ("init" , None )
258+ args ["init" ] = self ._bake
250259 self ._pool = await asyncpg .create_pool (** args )
251260 return self
252261
@@ -474,10 +483,11 @@ class AsyncpgDialect(PGDialect, base.AsyncDialectMixin):
474483 cursor_cls = DBAPICursor
475484 init_kwargs = set (
476485 itertools .chain (
486+ ("bakery" , "prebake" ),
477487 * [
478488 inspect .getfullargspec (f ).kwonlydefaults .keys ()
479489 for f in [asyncpg .create_pool , asyncpg .connect ]
480- ]
490+ ],
481491 )
482492 )
483493 colspecs = util .update_copy (
@@ -490,7 +500,7 @@ class AsyncpgDialect(PGDialect, base.AsyncDialectMixin):
490500 },
491501 )
492502
493- def __init__ (self , * args , ** kwargs ):
503+ def __init__ (self , * args , bakery = None , ** kwargs ):
494504 self ._pool_kwargs = {}
495505 self ._init_hook = None
496506 for k in self .init_kwargs :
@@ -500,7 +510,7 @@ def __init__(self, *args, **kwargs):
500510 else :
501511 self ._pool_kwargs [k ] = kwargs .pop (k )
502512 super ().__init__ (* args , ** kwargs )
503- self ._init_mixin ()
513+ self ._init_mixin (bakery )
504514
505515 async def init_pool (self , url , loop , pool_class = None ):
506516 if pool_class is None :
0 commit comments