Skip to content

Commit 41d2d51

Browse files
committed
Add mechanisms to suspend connection replacement during transactions.
1 parent 7d7bd59 commit 41d2d51

12 files changed

Lines changed: 555 additions & 74 deletions

DBUtils/PersistentDB.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
Robustness is provided by using "hardened" SteadyDB connections.
1010
Even if the underlying database is restarted and all connections
1111
are lost, they will be automatically and transparently reopened.
12+
However, since you don't want this to happen in the middle of a database
13+
transaction, you must explicitly start transactions with the begin()
14+
method so that SteadyDB knows that the underlying connection shall not
15+
be replaced and errors passed on until the transaction is completed.
1216
1317
Measures are taken to make the database connections thread-affine.
1418
This means the same thread always uses the same cached connection,
@@ -78,9 +82,14 @@
7882
the connection will be automatically closed when the thread dies.
7983
You can change this behavior be setting the closeable parameter.
8084
81-
Note that by setting the threadlocal parameter to threading.local,
82-
getting connections may become a bit faster, but this may not work in
83-
all environments (for instance, mod_wsgi is known to cause problems
85+
Note that you need to explicitly start transactions by calling the
86+
begin() method. This ensures that the transparent reopening will be
87+
suspended until the end of the transaction, and that the connection
88+
will be rolled back before being reused by the same thread.
89+
90+
By setting the threadlocal parameter to threading.local, getting
91+
connections may become a bit faster, but this may not work in all
92+
environments (for instance, mod_wsgi is known to cause problems
8493
since it clears the threading.local data between requests).
8594
8695

DBUtils/PersistentPg.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@
99
Robustness is provided by using "hardened" SteadyPg connections.
1010
Even if the underlying database is restarted and all connections
1111
are lost, they will be automatically and transparently reopened.
12+
However, since you don't want this to happen in the middle of a database
13+
transaction, you must explicitly start transactions with the begin()
14+
method so that SteadyPg knows that the underlying connection shall not
15+
be replaced and errors passed on until the transaction is completed.
1216
1317
Measures are taken to make the database connections thread-affine.
1418
This means the same thread always uses the same cached connection,
@@ -68,9 +72,15 @@
6872
the connection will be automatically closed when the thread dies.
6973
You can change this behavior be setting the closeable parameter.
7074
71-
Note that by setting the threadlocal parameter to threading.local,
72-
getting connections may become a bit faster, but this may not work in
73-
all environments (for instance, mod_wsgi is known to cause problems
75+
Note that you need to explicitly start transactions by calling the
76+
begin() method. This ensures that the transparent reopening will be
77+
suspended until the end of the transaction, and that the connection
78+
will be rolled back before being reused in the same thread. To end
79+
transactions, use on of the end(), commit() or rollback() methods.
80+
81+
By setting the threadlocal parameter to threading.local, getting
82+
connections may become a bit faster, but this may not work in all
83+
environments (for instance, mod_wsgi is known to cause problems
7484
since it clears the threading.local data between requests).
7585
7686

DBUtils/PooledDB.py

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
Robustness is provided by using "hardened" SteadyDB connections.
1111
Even if the underlying database is restarted and all connections
1212
are lost, they will be automatically and transparently reopened.
13+
However, since you don't want this to happen in the middle of a database
14+
transaction, you must explicitly start transactions with the begin()
15+
method so that SteadyDB knows that the underlying connection shall not
16+
be replaced and errors passed on until the transaction is completed.
1317
1418
Measures are taken to make the pool of connections thread-safe.
1519
If the underlying DB-API module is thread-safe at the connection level,
@@ -48,6 +52,9 @@
4852
the connection is automatically reset (closed and reopened).
4953
setsession: an optional list of SQL commands that may serve to
5054
prepare the session, e.g. ["set datestyle to german", ...]
55+
reset: how connections should be reset when returned to the pool
56+
(False or None to rollback transcations started with begin(),
57+
the default value True always issues a rollback for safety's sake)
5158
failures: an optional exception class or a tuple of exception classes
5259
for which the connection failover mechanism shall be applied,
5360
if the default (OperationalError, InternalError) is not adequate
@@ -107,6 +114,12 @@
107114
cur.close() # or del cur
108115
db.close() # or del db
109116
117+
Note that you need to explicitly start transactions by calling the
118+
begin() method. This ensures that the connection will not be shared
119+
with other threads, that the transparent reopening will be suspended
120+
until the end of the transaction, and that the connection will be rolled
121+
back before being given back to the connection pool.
122+
110123
111124
Ideas for improvement:
112125
@@ -162,7 +175,7 @@ class PooledDB:
162175
def __init__(self, creator,
163176
mincached=0, maxcached=0,
164177
maxshared=0, maxconnections=0, blocking=False,
165-
maxusage=None, setsession=None,
178+
maxusage=None, setsession=None, reset=True,
166179
failures=None, ping=1,
167180
*args, **kwargs):
168181
"""Set up the DB-API 2 connection pool.
@@ -188,6 +201,9 @@ def __init__(self, creator,
188201
the connection is automatically reset (closed and reopened).
189202
setsession: optional list of SQL commands that may serve to prepare
190203
the session, e.g. ["set datestyle to ...", "set time zone ..."]
204+
reset: how connections should be reset when returned to the pool
205+
(False or None to rollback transcations started with begin(),
206+
True to always issue a rollback for safety's sake)
191207
failures: an optional exception class or a tuple of exception classes
192208
for which the connection failover mechanism shall be applied,
193209
if the default (OperationalError, InternalError) is not adequate
@@ -215,6 +231,7 @@ def __init__(self, creator,
215231
self._args, self._kwargs = args, kwargs
216232
self._maxusage = maxusage
217233
self._setsession = setsession
234+
self._reset = reset
218235
self._failures = failures
219236
self._ping = ping
220237
if mincached is None:
@@ -287,6 +304,12 @@ def connection(self, shareable=True):
287304
else: # shared cache full or no more connections allowed
288305
self._shared_cache.sort() # least shared connection first
289306
con = self._shared_cache.pop(0) # get it
307+
while con.con._transaction:
308+
# do not share connections which are in a transaction
309+
self._shared_cache.insert(0, con)
310+
self._condition.wait()
311+
self._shared_cache.sort()
312+
con = self._shared_cache.pop(0)
290313
con.con._ping_check() # check the underlying connection
291314
con.share() # increase share of this connection
292315
# put the connection (back) into the shared cache
@@ -339,14 +362,8 @@ def cache(self, con):
339362
self._condition.acquire()
340363
try:
341364
if not self._maxcached or len(self._idle_cache) < self._maxcached:
342-
# the idle cache is not full, so put it there, but
343-
try: # before returning the connection back to the pool,
344-
con.rollback() # perform a rollback
345-
# in order to prevent uncommited actions from being
346-
# unintentionally commited by some other thread
347-
except Exception:
348-
# if an error occurs (no transaction, not supported)
349-
pass # then it will be silently ignored
365+
con._reset(force=self._reset) # rollback possible transaction
366+
# the idle cache is not full, so put it there
350367
self._idle_cache.append(con) # append it to the idle cache
351368
else: # if the idle cache is already full,
352369
con.close() # then close the connection
@@ -440,9 +457,30 @@ def __init__(self, con):
440457
self.con = con
441458
self.shared = 1
442459

443-
def __cmp__(self, other):
444-
"""Compare how often the connections are shared."""
445-
return self.shared - other.shared
460+
def __lt__(self, other):
461+
if self.con._transaction == other.con._transaction:
462+
return self.shared < other.shared
463+
else:
464+
return not self.con._transaction
465+
466+
def __le__(self, other):
467+
if self.con._transaction == other.con._transaction:
468+
return self.shared == other.shared
469+
else:
470+
return not self.con._transaction
471+
472+
def __eq__(self, other):
473+
return (self.con._transaction == other.con._transaction
474+
and self.shared == other.shared)
475+
476+
def __ne__(self, other):
477+
return not self.__eq__(other)
478+
479+
def __gt__(self, other):
480+
return other.__lt__(self)
481+
482+
def __ge__(self, other):
483+
return other.__le__(self)
446484

447485
def share(self):
448486
"""Increase the share of this connection."""

DBUtils/PooledPg.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010
Robustness is provided by using "hardened" SteadyPg connections.
1111
Even if the underlying database is restarted and all connections
1212
are lost, they will be automatically and transparently reopened.
13+
However, since you don't want this to happen in the middle of a database
14+
transaction, you must explicitly start transactions with the begin()
15+
method so that SteadyPg knows that the underlying connection shall not
16+
be replaced and errors passed on until the transaction is completed.
1317
1418
Measures are taken to make the pool of connections thread-safe
1519
regardless of the fact that the classic PyGreSQL pg module itself
@@ -79,6 +83,12 @@
7983
res = db.query(...).getresult()
8084
db.close() # or del db
8185
86+
Note that you need to explicitly start transactions by calling the
87+
begin() method. This ensures that the transparent reopening will be
88+
suspended until the end of the transaction, and that the connection will
89+
be rolled back before being given back to the connection pool. To end
90+
transactions, use on of the end(), commit() or rollback() methods.
91+
8292
8393
Ideas for improvement:
8494
@@ -131,7 +141,7 @@ class PooledPg:
131141
def __init__(self,
132142
mincached=0, maxcached=0,
133143
maxconnections=0, blocking=False,
134-
maxusage=None, setsession=None,
144+
maxusage=None, setsession=None, reset=None,
135145
*args, **kwargs):
136146
"""Set up the PostgreSQL connection pool.
137147
@@ -150,13 +160,17 @@ def __init__(self,
150160
the connection is automatically reset (closed and reopened).
151161
setsession: optional list of SQL commands that may serve to prepare
152162
the session, e.g. ["set datestyle to ...", "set time zone ..."]
163+
reset: how connections should be reset when returned to the pool
164+
(0 or None to rollback transcations started with begin(),
165+
1 to always issue a rollback, 2 for a complete reset)
153166
args, kwargs: the parameters that shall be used to establish
154167
the PostgreSQL connections using class PyGreSQL pg.DB()
155168
156169
"""
157170
self._args, self._kwargs = args, kwargs
158171
self._maxusage = maxusage
159172
self._setsession = setsession
173+
self._reset = reset or 0
160174
if mincached is None:
161175
mincached = 0
162176
if maxcached is None:
@@ -200,7 +214,15 @@ def connection(self):
200214
def cache(self, con):
201215
"""Put a connection back into the pool cache."""
202216
try:
203-
self._cache.put(con, 0)
217+
if self._reset == 2:
218+
con.reset() # reset the connection completely
219+
else:
220+
if self._reset or con._transaction:
221+
try:
222+
con.rollback() # rollback a possible transaction
223+
except Exception:
224+
pass
225+
self._cache.put(con, 0) # and then put it back into the cache
204226
except Full:
205227
con.close()
206228
if self._connections:
@@ -255,7 +277,7 @@ def reopen(self):
255277
"""Reopen the pooled connection."""
256278
# If the connection is already back in the pool,
257279
# get another connection from the pool,
258-
# otherwise reopen the unerlying connection.
280+
# otherwise reopen the underlying connection.
259281
if self._con:
260282
self._con.reopen()
261283
else:

0 commit comments

Comments
 (0)