Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 68 additions & 17 deletions pychron/database/core/database_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,35 @@ def __enter__(self):
return self._session

def __exit__(self, exc_type, exc_val, exc_tb):
if self._session:
self._session.close()
else:
self._parent.close_session()

if self._psession:
self._parent.session = self._psession
self._psession = None
# Rollback on exception so the connection is returned to the pool
# in a clean state. Otherwise the next checkout can inherit a
# half-committed transaction and surface as a phantom failure
# several call sites away from the original error.
sess = self._session or self._parent.session
try:
if sess is not None and not isinstance(sess, MockSession):
if exc_type is not None:
try:
sess.rollback()
except Exception as e:
self._parent.warning("SessionCTX rollback failed: {}".format(e))
finally:
try:
if self._session is not None:
# Owned session — close directly without flushing during
# an exception unwind (flush can re-raise and mask the
# original error).
try:
self._session.close()
except Exception as e:
self._parent.warning("SessionCTX close failed: {}".format(e))
else:
self._parent.close_session(skip_flush=exc_type is not None)
finally:
if self._psession is not None:
self._parent.session = self._psession
self._psession = None
self._session = None


class MockQuery:
Expand Down Expand Up @@ -237,15 +258,31 @@ def create_session(self, force=False):
self.critical("using Mock session")
self.session = MockSession()

def close_session(self):
def close_session(self, skip_flush=False):
if self.session and not isinstance(self.session, MockSession):
self.session.flush()

self._session_cnt -= 1
if not self._session_cnt:
self.debug("close session {}".format(id(self)))
self.session.close()
self.session = None
# Decrement first in a try/finally so an exception during
# flush/close cannot leave the counter stuck above 0 and
# leak the session forever.
try:
if not skip_flush:
try:
self.session.flush()
except Exception as e:
self.warning("flush failed during close_session: {}".format(e))
try:
self.session.rollback()
except Exception:
pass
finally:
self._session_cnt -= 1
if self._session_cnt <= 0:
self._session_cnt = 0
self.debug("close session {}".format(id(self)))
try:
self.session.close()
except Exception as e:
self.warning("session.close failed: {}".format(e))
self.session = None

@property
def enabled(self) -> bool:
Expand Down Expand Up @@ -567,10 +604,17 @@ def _create_engine(self, url: str, pool_recycle: int, connect_args: dict):
if self.connection_method == "cloudsql_iam":
return self._create_cloudsql_engine(url, pool_recycle)

# pool_pre_ping issues a cheap liveness check before each
# checkout and discards stale connections (idle drops from the
# server, network blips). Skip it for SQLite — there is no
# remote peer to lose contact with and the extra round trip
# is pure overhead.
pre_ping = self.kind != "sqlite"
return create_engine(
url,
echo=self.echo,
pool_recycle=pool_recycle,
pool_pre_ping=pre_ping,
connect_args=connect_args,
)

Expand Down Expand Up @@ -603,14 +647,21 @@ def get_connection():
# windows evict warm conns faster than IAM token TTL requires
# and force repeated cold-path connects.
iam_recycle = max(pool_recycle, 1800)
# pool_pre_ping catches Cloud SQL idle-disconnects (the server
# silently drops idle conns well before our recycle window).
# Without it the first query after a quiet period fails with
# "server has gone away" / "connection reset" and the user
# sees a transient error. The pre_ping is a single cheap
# round-trip on a connection that already has a warm IAM
# tunnel, so cost is negligible compared to a full reconnect.
return create_engine(
url,
echo=self.echo,
pool_recycle=iam_recycle,
creator=get_connection,
pool_size=10,
max_overflow=10,
pool_pre_ping=False,
pool_pre_ping=True,
)

def _get_cloudsql_url(self, kind: str) -> str:
Expand Down
32 changes: 28 additions & 4 deletions pychron/dvc/dvc_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1502,22 +1502,46 @@ def get_associated_repositories(self, idn, verbose_query=False):

return self._query_all(q, verbose_query=verbose_query)

def _analysis_eager_options(self):
# Eager-load the lazy='select' chains walked by AnalysisTbl.bind()
# so callers can safely use returned rows after session close.
return (
selectinload(AnalysisTbl.measured_positions).joinedload(MeasuredPositionTbl.load),
joinedload(AnalysisTbl.irradiation_position)
.joinedload(IrradiationPositionTbl.sample)
.joinedload(SampleTbl.material),
joinedload(AnalysisTbl.irradiation_position)
.joinedload(IrradiationPositionTbl.sample)
.joinedload(SampleTbl.project)
.joinedload(ProjectTbl.principal_investigator),
joinedload(AnalysisTbl.irradiation_position)
.joinedload(IrradiationPositionTbl.level)
.joinedload(LevelTbl.irradiation),
)

def get_analysis(self, value):
return self._retrieve_item(AnalysisTbl, value, key="id")
with self.session_ctx() as sess:
q = sess.query(AnalysisTbl).options(*self._analysis_eager_options())
q = q.filter(AnalysisTbl.id == value)
return self._query_one(q)

def get_analysis_uuid(self, value):
return self._retrieve_item(AnalysisTbl, value, key="uuid")
with self.session_ctx() as sess:
q = sess.query(AnalysisTbl).options(*self._analysis_eager_options())
q = q.filter(AnalysisTbl.uuid == value)
return self._query_one(q)

def get_analyses_uuid(self, uuids, verbose_query=False):
with self.session_ctx() as sess:
q = sess.query(AnalysisTbl)
q = q.filter(AnalysisTbl.uuid.in_(uuids))
q = q.order_by(AnalysisTbl.uuid.asc())
q = q.options(*self._analysis_eager_options())
return self._query_all(q, verbose_query=verbose_query)

def get_analysis_runid(self, idn, aliquot, step=None):
with self.session_ctx() as sess:
q = sess.query(AnalysisTbl)
q = sess.query(AnalysisTbl).options(*self._analysis_eager_options())
q = q.join(IrradiationPositionTbl)
if step:
if isinstance(step, (str,)):
Expand All @@ -1532,7 +1556,7 @@ def get_analysis_runid(self, idn, aliquot, step=None):

def get_analysis_by_attr(self, **kw):
with self.session_ctx() as sess:
q = sess.query(AnalysisTbl)
q = sess.query(AnalysisTbl).options(*self._analysis_eager_options())
use_ident = False
if "identifier" in kw:
q = q.join(IrradiationPositionTbl)
Expand Down
57 changes: 16 additions & 41 deletions pychron/envisage/browser/sample_browser_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
from datetime import datetime, timedelta
from operator import attrgetter

import sqlalchemy.orm.exc

# ============= enthought library imports =======================
from apptools.preferences.preference_binding import bind_preference
from traits.api import Str
Expand Down Expand Up @@ -50,22 +48,15 @@ def __init__(self, *args, **kw):
def reattach(self):
self.debug("reattach")

try:
ans = sorted(self.table.oanalyses, key=attrgetter("uuid"))
uuids = [ai.uuid for ai in ans]
nans = self.db.get_analyses_uuid(uuids)

for ni, ai in zip(nans, ans):
ai.dbrecord = ni

if self.selected_projects:
self._load_associated_groups(self.selected_projects)
except sqlalchemy.orm.exc.DetachedInstanceError:
self.warning_dialog(
"There is an issue with pychron's connection to the database. "
"Please restart pychron and try again"
)
self.application.stop()
ans = sorted(self.table.oanalyses, key=attrgetter("uuid"))
uuids = [ai.uuid for ai in ans]
nans = self.db.get_analyses_uuid(uuids)

for ni, ai in zip(nans, ans):
ai.dbrecord = ni

if self.selected_projects:
self._load_associated_groups(self.selected_projects)

def dump_browser(self):
super(SampleBrowserModel, self).dump_browser()
Expand Down Expand Up @@ -96,9 +87,7 @@ def get_analysis_records(self):
# return self.table.get_analysis_records()
return self.table.get_analysis_records()

def get_selection(
self, low_post, high_post, unks=None, selection=None, make_records=True
):
def get_selection(self, low_post, high_post, unks=None, selection=None, make_records=True):
ret = None
if selection is None:
if self.table.selected:
Expand Down Expand Up @@ -131,9 +120,7 @@ def load_chrono_view(self):
ss = [si.labnumber for si in self.selected_samples]
bt = self.search_criteria.reference_hours_padding
if not bt:
self.information_dialog(
'Set "References Window" in Preferences.\n\nDefaulting to 2hrs'
)
self.information_dialog('Set "References Window" in Preferences.\n\nDefaulting to 2hrs')
bt = 2

# ss = ['bu-FD-O']
Expand Down Expand Up @@ -194,11 +181,7 @@ def add_analysis_group(self, ans):
project, pp = tuple({(a.project, a.principal_investigator) for a in ans})[0]
try:
project = next(
(
p
for p in projects
if p.name == project and p.principal_investigator == pp
)
(p for p in projects if p.name == project and p.principal_investigator == pp)
)
agv.project = project
except StopIteration:
Expand Down Expand Up @@ -236,9 +219,7 @@ def _selected_samples_changed_hook(self, new):

uuids = [ai.uuid for ai in self.table.analyses]

kw = dict(
limit=lim, include_invalid=not at.omit_invalid, exclude_uuids=uuids
)
kw = dict(limit=lim, include_invalid=not at.omit_invalid, exclude_uuids=uuids)

lp = self.low_post # if self.use_low_post else None
hp = self.high_post # if self.use_high_post else None
Expand All @@ -247,9 +228,7 @@ def _selected_samples_changed_hook(self, new):
if self.load_enabled and self.selected_loads:
ls = [l.name for l in self.selected_loads]

ans = self._retrieve_analyses(
samples=new, loads=ls, low_post=lp, high_post=hp, **kw
)
ans = self._retrieve_analyses(samples=new, loads=ls, low_post=lp, high_post=hp, **kw)

self.debug(
"selected samples changed. loading analyses. "
Expand All @@ -276,9 +255,7 @@ def _load_recent(self):
now = datetime.now()
lp = now - timedelta(hours=v.nhours)
ls = self.db.get_labnumbers(
mass_spectrometers=(
v.mass_spectrometers if v.use_mass_spectrometers else None
),
mass_spectrometers=(v.mass_spectrometers if v.use_mass_spectrometers else None),
analysis_types=v.analysis_types,
high_post=now,
low_post=lp,
Expand All @@ -303,9 +280,7 @@ def _find_references_hook(self):
ans = self.table.analyses
ms = list({a.mass_spectrometer.lower() for a in ans if a.mass_spectrometer})
es = list({a.extract_device.lower() for a in ans if a.extract_device})
irs = list(
{"{},{}".format(a.irradiation, a.irradiation_level.upper()) for a in ans}
)
irs = list({"{},{}".format(a.irradiation, a.irradiation_level.upper()) for a in ans})

samples = []
for il in irs:
Expand Down
Loading