Files

747 lines
24 KiB
Python

"""
@package pmsco.database.orm
pmsco results database object-relational mapper
this module declares the database schema and object mapping.
the object-relational mapping uses
the [sqlalchemy framework](https://docs.sqlalchemy.org/en/13/orm/tutorial.html).
the database backend is sqlite3.
for examples how to use the database, see the ingest module and the unit tests.
@author Matthias Muntwiler
@copyright (c) 2021 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
import datetime
from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import Column, Sequence, ForeignKey
from sqlalchemy import Boolean, Integer, Float, String, DateTime
from sqlalchemy.engine import Engine
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import object_session
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import validates
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm.exc import NoResultFound
import numpy as np
import sqlite3
from pmsco.dispatch import CalcID
import pmsco.database.util as db_util
# make sure sqlite understands numpy data types
sqlite3.register_adapter(np.float64, float)
sqlite3.register_adapter(np.float32, float)
sqlite3.register_adapter(np.int64, int)
sqlite3.register_adapter(np.int32, int)
Base = declarative_base()
engine = None
Session = sessionmaker()
class Project(Base):
"""
database object representing a project
@note there is an implicit constructor with keyword arguments that correspond to the attributes.
"""
## @var id
# (int, primary key) database id of the project
## @var name
# project name, should be short, must be unique within a project
## @var jobs
# collection of related jobs
#
# defines the relationship between Project and Job objects.
# the instance attribute maps job names (str) to Job objects.
__tablename__ = "Projects"
id = Column(Integer, Sequence('project_id_seq'), primary_key=True)
name = Column(String(50, collation='NOCASE'), nullable=False, unique=True)
code = Column(String(50, collation='NOCASE'))
jobs = relationship('Job', backref='project',
collection_class=attribute_mapped_collection('name'),
cascade="all, delete, delete-orphan", lazy='joined')
def __repr__(self):
return f'Project({repr(self.name), repr(self.code)})'
class Job(Base):
"""
database object representing a calculation job
a job object holds several descriptive values of a calculation job.
it also refers to a project.
tags are key-value pairs that describe the job in standardized terms.
they can provide a consistent classification scheme across jobs and projects.
for example, they can store special project arguments that may be important
to distinguish calculations in different stages or contexts.
the class also defines mapping and proxy objects that simplify the use of tags and models.
explicit creation of Tag and JobTag objects is then not necessary.
@attention after modifying the mapped collections job_tags, tags or models
make sure to call flush() or commit() on the session
before accessing those mappings in other objects
else integrity errors may occur!
"""
## @var id
# (int, primary key) database id of the job
## @var project_id
# (int, foreign key) database id of the related project
## @var name
# job name, should be short, must be unique within a project
## @var mode
# pmsco calculation mode
## @var machine
# name of the computing facility
## @var git_hash
# git hash of the used code if under version control
## @var datetime
# start date and time of the job, ISO format (yyyy-mm-dd hh:mm:ss)
## @var processes
# number of processes
## @var hours
# job run time (wall time) in hours
## @var description
# up to the user
## @var job_tags
# collection of related job tags
#
# defines the relationship between Job and JobTag objects.
# the instance attribute maps tag keys (str) to JobTag objects.
## @var tags
# collection of tags
#
# maps tag keys (str) to tag values (str).
# this is an association proxy of job_tags.
## @var models
# collection of related models
#
# defines the relationship between Job and Model objects.
# the instance attribute maps model numbers to Model objects
__tablename__ = "Jobs"
id = Column(Integer, Sequence('job_id_seq'), primary_key=True)
project_id = Column(Integer, ForeignKey('Projects.id'), index=True)
name = Column(String(50, collation='NOCASE'), nullable=False)
mode = Column(String(20, collation='NOCASE'))
machine = Column(String(50, collation='NOCASE'))
git_hash = Column(String(50, collation='NOCASE'))
datetime = Column(String(50))
processes = Column(Integer)
hours = Column(Float)
description = Column(String(200, collation='NOCASE'))
job_tags = relationship('JobTag', back_populates='job',
collection_class=attribute_mapped_collection('tag_key'),
cascade="all, delete, delete-orphan")
# mapping tag_key -> tag_value
tags = association_proxy('job_tags', 'value', creator=lambda k, v: JobTag(key=k, value=v))
models = relationship('Model', back_populates='job',
collection_class=attribute_mapped_collection('model'),
cascade="all, delete, delete-orphan")
def __repr__(self):
try:
project_name = repr(self.project.name)
except AttributeError:
project_name = None
try:
job_name = repr(self.name)
except AttributeError:
job_name = None
return f'Job({project_name}, {job_name}, {repr(self.mode)})'
class Tag(Base):
"""
database object representing a tag name
"""
## @var id
# (int, primary key) database id of the tag name
## @var key
# tag name/key, should be short, must be unique
## @var tag_jobs
# collection of related JobTag objects
#
# defines the relationship between Tag and JobTag objects.
__tablename__ = "Tags"
id = Column(Integer, Sequence('tag_id_seq'), primary_key=True)
key = Column(String(20, collation='NOCASE'), nullable=False, unique=True)
tag_jobs = relationship('JobTag', back_populates='tag', cascade="all, delete, delete-orphan")
def __init__(self, key):
self.key = key
def __repr__(self):
return f'Tag({repr(self.key)})'
class JobTag(Base):
"""
association object class for job tags
Job - Tag is a many-to-many relationship built using this association class.
by using the dictionary-like Job.tags proxy, explicit creation of association objects can be avoided.
the class applies the
[UniqueObjectValidateOnPending pattern](https://github.com/sqlalchemy/sqlalchemy/wiki/UniqueObjectValidatedOnPending)
to look up existing tags in the database when a Tag object is needed and only the key is given.
"""
## @var id
# (int, primary key) database id of the job tag
## @var tag_id
# (int, foreign key) database id of the related tag name
## @var job_id
# (int, foreign key) database id of the related job
## @var value
# value (str) of the job tag
## @var tag
# associated Tag object
#
# defines the relationship between JobTag and Tag objects
## @var job
# associated Job object
#
# defines the relationship between JobTag and Job objects
## @var tag_key
# key (name) of the asscoiated Tag object
#
# this is an association proxy that provides direct access to tag.key
# or links to or creates a Tag object behind the scenes.
__tablename__ = "JobTags"
id = Column(Integer, Sequence('jobtag_id_seq'), primary_key=True)
tag_id = Column(Integer, ForeignKey('Tags.id'), index=True)
job_id = Column(Integer, ForeignKey('Jobs.id'), index=True)
value = Column(String(200, collation='NOCASE'))
tag = relationship("Tag", back_populates="tag_jobs")
job = relationship("Job", back_populates="job_tags")
tag_key = association_proxy("tag", "key")
def __init__(self, key=None, value=None):
if key is not None:
self.tag_key = key
self.value = value
@validates("tag")
def _validate_tag(self, key, value):
"""
receive the event that occurs when `jobtag.tag` is set.
if the object is present in a Session, then make sure it's the Tag
object that we looked up from the database.
otherwise, do nothing and we'll fix it later when the object is
put into a Session.
@param key: attribute name, i.e., 'tag'
@param value: a JobTag object
"""
sess = object_session(self)
if sess is not None:
return _setup_tag(sess, value)
else:
return value
@event.listens_for(Session, "transient_to_pending")
def _validate_tag(session, object_):
"""
receive a JobTag object when it gets attached to a Session to correct its unique Tag relationship.
"""
if isinstance(object_, JobTag):
if object_.tag is not None and object_.tag.id is None:
old_tag = object_.tag
new_tag = _setup_tag(session, object_.tag)
if new_tag is not old_tag:
if old_tag in session:
session.expunge(old_tag)
object_.tag = new_tag
def _setup_tag(session, tag_object):
"""
given a Session and a Tag object, return the correct Tag object from the database.
"""
with session.no_autoflush:
try:
return session.query(Tag).filter_by(key=tag_object.key).one()
except NoResultFound:
return tag_object
class Model(Base):
"""
database object representing a model
the object holds the model number (which is unique within the context of a single job only),
the diagnostic generation and particle values, and refers to the job where the model is used.
the class also defines relationship properties that simplify access to referenced objects.
for instance, parameter values can be accessed via the values['param_key'] mapping proxy.
examples:
~~~~~~{.py}
model = Model(model=10, gen=5, particle=2)
model.job = job1_object
model.values['dA'] = 25.6
model.deltas['dA'] = 0.1
pv = ParamValue(value=39.0, delta=-0.3)
model.param_values['dB'] = pv
result = Result(calc_id=calc_id, rfac=0.77)
model.results.append(result)
~~~~~~
@attention after modifying the mapped collections param_values, values or deltas,
make sure to call flush() or commit() on the session
before accessing those mappings in another model
else integrity errors may occur!
"""
## @var id
# (int, primary key) database id of the model
## @var job_id
# (int, foreign key) database id of the related job
## @var model
# (int) model number as used in the task index of pmsco
#
# @note the model number is not unique in the database as multiple jobs can produce same task indices.
# the unique number, self.id is not used in pmsco code.
## @var gen
# (int) generation number assigned by some optimizers. defaults to None.
## @var particle
# (int) particle number assigned by some optimizers. defaults to None.
## @var job
# associated Job
#
# defines the relationship between Model and Job objects.
## @var results
# collection of Result objects
#
# defines the relationship between Model and Result objects.
## @var param_values
# collection of ParamValue objects
#
# defines the relationship between Model and ParamValue objects.
# the instance attribute maps parameter keys to ParamValue objects.
## @var values
# collection of parameter values
#
# this is an association proxy that maps parameter keys to parameter values (ParamValue.value).
# ParamValue objects are accessed and created behind the scene.
## @var deltas
# collection of delta values
#
# this is an association proxy that maps parameter keys to parameter deltas (ParamValue.delta.
# ParamValue objects are accessed and created behind the scene.
__tablename__ = "Models"
id = Column(Integer, Sequence('model_id_seq'), primary_key=True)
job_id = Column(Integer, ForeignKey('Jobs.id'), index=True)
model = Column(Integer, index=True)
gen = Column(Integer)
particle = Column(Integer)
job = relationship("Job", back_populates="models")
results = relationship('Result', back_populates='model', cascade="all, delete, delete-orphan")
# mapping param_key -> ParamValue object
param_values = relationship('ParamValue', back_populates='model',
collection_class=attribute_mapped_collection('param_key'),
cascade="all, delete, delete-orphan")
# mapping param_key -> param_value
values = association_proxy('param_values', 'value', creator=lambda k, v: ParamValue(key=k, value=v))
deltas = association_proxy('param_values', 'delta', creator=lambda k, v: ParamValue(key=k, delta=v))
def __repr__(self):
return f'Model(id={repr(self.id)}, job_id={repr(self.job_id)}, model={repr(self.model)})'
def as_dict(self):
"""
object properties in a dictionary.
the dictionary keys correspond to the column names of numpy arrays.
the mapping db_field -> column name is declared in pmsco.database.util.DB_SPECIAL_PARAMS
@return: (dict)
"""
d = {'_db_model_id': self.id}
for attr, key in db_util.DB_SPECIAL_PARAMS.items():
try:
d[key] = getattr(self, attr)
except AttributeError:
pass
return d
class Result(Base):
"""
database object representing a calculation result
the result object holds the calculated R-factor per job and calculation index.
the calculation index (CalcID) is not unique in the database because it may contain results from multiple jobs.
thus, the object links to a Model object which is unique.
the calc_id property can be used to reconstruct a CalcID.
"""
## @var id
# (int, primary key) database id of the result
## @var model_id
# (int, foreign key) database id of the related model
## @var model
# associated Model object
#
# defines the relationship between Result and Model objects.
#
# @attention do not confuse the Result.model and Model.model attributes of same name!
# to obtain the model number to which a result belongs, use Result.model.model.
## @var scan
# (int) scan index as used in the calculations
## @var domain
# (int) domain index as used in the calculations
## @var emit
# (int) emitter index as used in the calculations
## @var region
# (int) region index as used in the calculations
## @var rfac
# (float) calculated R-factor
## @var timestamp
# (float) end date and time of this calculation task
#
# the float value represents seconds since jan 1, 1970 (datetime.datetime.timestamp).
# the datetime proxy converts to and from python datetime.datetime.
## @var datetime
# (datetime.datetime) end date and time of this calculation task
#
# this is a conversion proxy for timestamp.
## @var secs
# (float) total duration of the calculation task in seconds
#
# total cpu time necessary to get this result (including child tasks) in seconds.
## @var calc_id
# (CalcID) calculation task index
#
# conversion proxy for the task index components.
#
# on assignment, the scan, domain, emit and region attributes are updated.
# it does not update the model index as it is not stored by this object!
# the model index must be set separately in the linked Model object.
__tablename__ = "Results"
id = Column(Integer, Sequence('result_id_seq'), primary_key=True)
model_id = Column(Integer, ForeignKey('Models.id'), index=True)
scan = Column(Integer, index=True)
domain = Column(Integer, index=True)
emit = Column(Integer, index=True)
region = Column(Integer, index=True)
rfac = Column(Float)
timestamp = Column(Float)
secs = Column(Float)
model = relationship("Model", back_populates="results")
def __init__(self, calc_id=None, scan=None, domain=None, emit=None, region=None,
rfac=None, timestamp=None, secs=None):
if calc_id is not None:
self.calc_id = calc_id
else:
self.scan = scan
self.domain = domain
self.emit = emit
self.region = region
self.rfac = rfac
self.timestamp = timestamp
self.secs = secs
def __repr__(self):
return f'Result(model_id={repr(self.model_id)}, calc_id={repr(self.calc_id)}, rfac={repr(self.rfac)})'
@property
def calc_id(self):
return CalcID(self.model.model, self.scan, self.domain, self.emit, self.region)
@calc_id.setter
def calc_id(self, calc_id):
self.scan = calc_id.scan
self.domain = calc_id.domain
self.emit = calc_id.emit
self.region = calc_id.region
@property
def datetime(self):
return datetime.datetime.fromtimestamp(self.timestamp)
@datetime.setter
def datetime(self, value):
self.timestamp = value.timestamp()
def as_dict(self):
"""
object properties in a dictionary.
the dictionary keys correspond to the column names of numpy arrays.
the mapping db_field -> column name is declared in pmsco.database.util.D.B_SPECIAL_PARAMS
@return: (dict)
"""
d = {'_db_result_id': self.id}
for attr, key in db_util.DB_SPECIAL_PARAMS.items():
try:
d[key] = getattr(self, attr)
except AttributeError:
pass
return d
class Param(Base):
"""
database object representing a parameter
the parameter object holds the name (or key) of a calculation parameter.
explicit creation of parameter objects can be avoided by using the mappings of the Model class.
"""
## @var id
# (int, primary key) database id of the parameter name
## @var key
# parameter name/key as used in calculations, should be very short, must be unique
## @var param_values
# collection of related ParamValue objects
#
# defines the relationship between Param and ParamValue objects.
__tablename__ = "Params"
id = Column(Integer, Sequence('param_id_seq'), primary_key=True)
key = Column(String(20, collation='NOCASE'), nullable=False, unique=True)
param_values = relationship('ParamValue', back_populates='param', cascade="all, delete, delete-orphan")
def __init__(self, key):
self.key = key
def __repr__(self):
return f'Param({repr(self.key)})'
class ParamValue(Base):
"""
association object class for parameter values
Model - Param is a many-to-many relationship built using this association class.
by using the dictionary-like Model.values and Model.deltas proxies,
explicit creation of association objects can be avoided.
the class applies the
[UniqueObjectValidateOnPending pattern](https://github.com/sqlalchemy/sqlalchemy/wiki/UniqueObjectValidatedOnPending)
to look up existing params in the database when a Param object is needed and only the key is given.
"""
## @var id
# (int, primary key) database id of the parameter value
## @var param_id
# (int, foreign key) database id of the related parameter name
## @var model_id
# (int, foreign key) database id of the related model
## @var value
# (float) numeric value of the parameter
## @var delta
# (float) numeric delta value of the parameter (reported by some optimizers)
## @var param
# associated Param object
#
# defines the relationship between ParamValue and Param objects
## @var model
# associated Model object
#
# defines the relationship between ParamValue and Model objects
## @var param_key
# key (name) of the asscoiated Param object
#
# this is an association proxy that provides direct access to param.key.
# it accesses or creates Param objects behind the scenes.
__tablename__ = "ParamValues"
id = Column(Integer, Sequence('paramvalue_id_seq'), primary_key=True)
param_id = Column(Integer, ForeignKey('Params.id'), index=True)
model_id = Column(Integer, ForeignKey('Models.id'), index=True)
value = Column(Float)
delta = Column(Float)
param = relationship("Param", back_populates="param_values")
model = relationship("Model", back_populates="param_values")
param_key = association_proxy('param', 'key')
def __init__(self, model=None, param=None, key=None, value=None, delta=None):
if model is not None:
self.model = model
if param is not None:
self.param = param
elif key is not None:
self.param_key = key
self.value = value
self.delta = delta
@validates("param")
def _validate_param(self, key, value):
"""
receive the event that occurs when `paramvalue.param` is set.
if the object is present in a Session, then make sure it's the Param
object that we looked up from the database.
otherwise, do nothing and we'll fix it later when the object is put into a Session.
"""
sess = object_session(self)
if sess is not None:
return _setup_param(sess, value)
else:
return value
@event.listens_for(Session, "transient_to_pending")
def _validate_param(session, object_):
"""
receive a ParamValue object when it gets attached to a Session to correct its unique Param relationship.
"""
if isinstance(object_, ParamValue):
if object_.param is not None and object_.param.id is None:
old_param = object_.param
new_param = _setup_param(session, object_.param)
if new_param is not old_param:
if old_param in session:
session.expunge(old_param)
object_.param = new_param
def _setup_param(session, param_object):
"""
given a Session and a Tag object, return the correct Tag object from the database.
"""
with session.no_autoflush:
try:
return session.query(Param).filter_by(key=param_object.key).one()
except NoResultFound:
return param_object
@event.listens_for(Engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
"""
set sqlite pragmas.
make sure sqlite enforces relational integrity.
@param dbapi_connection:
@param connection_record:
@return:
"""
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
def sqlite_link(path=None):
"""
format the sqlalchemy link to an sqlite3 database.
@param path: file path. if empty, an in-memory database is created.
@return: (str) database link for the sqlalchemy engine.
"""
if not path:
path = ':memory:'
return f'sqlite:///{path}'
def connect(db_link):
"""
connect to the database.
create the sqlalchemy engine and bind the session maker.
the database engine and session maker are global.
this function should be called only once in a process.
@param db_link: (str) database link expected by the sqlalchemy engine
@return: None
"""
global engine
engine = create_engine(db_link, echo=False)
Base.metadata.create_all(engine)
Session.configure(bind=engine)