""" @package pmsco.database.orm pmsco results database object-relational mapper this module declares the database schema and object mapping. the object-relational mapping uses the [sqlalchemy framework](https://docs.sqlalchemy.org/en/13/orm/tutorial.html). the database backend is sqlite3. for examples how to use the database, see the ingest module and the unit tests. @author Matthias Muntwiler @copyright (c) 2021 by Paul Scherrer Institut @n Licensed under the Apache License, Version 2.0 (the "License"); @n you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 """ import datetime from sqlalchemy import create_engine from sqlalchemy import event from sqlalchemy import Column, Sequence, ForeignKey from sqlalchemy import Boolean, Integer, Float, String, DateTime from sqlalchemy.engine import Engine from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import object_session from sqlalchemy.orm import relationship from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import validates from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.orm.exc import NoResultFound import numpy as np import sqlite3 from pmsco.dispatch import CalcID import pmsco.database.util as db_util # make sure sqlite understands numpy data types sqlite3.register_adapter(np.float64, float) sqlite3.register_adapter(np.float32, float) sqlite3.register_adapter(np.int64, int) sqlite3.register_adapter(np.int32, int) Base = declarative_base() engine = None Session = sessionmaker() class Project(Base): """ database object representing a project @note there is an implicit constructor with keyword arguments that correspond to the attributes. """ ## @var id # (int, primary key) database id of the project ## @var name # project name, should be short, must be unique within a project ## @var jobs # collection of related jobs # # defines the relationship between Project and Job objects. # the instance attribute maps job names (str) to Job objects. __tablename__ = "Projects" id = Column(Integer, Sequence('project_id_seq'), primary_key=True) name = Column(String(50, collation='NOCASE'), nullable=False, unique=True) code = Column(String(50, collation='NOCASE')) jobs = relationship('Job', backref='project', collection_class=attribute_mapped_collection('name'), cascade="all, delete, delete-orphan", lazy='joined') def __repr__(self): return f'Project({repr(self.name), repr(self.code)})' class Job(Base): """ database object representing a calculation job a job object holds several descriptive values of a calculation job. it also refers to a project. tags are key-value pairs that describe the job in standardized terms. they can provide a consistent classification scheme across jobs and projects. for example, they can store special project arguments that may be important to distinguish calculations in different stages or contexts. the class also defines mapping and proxy objects that simplify the use of tags and models. explicit creation of Tag and JobTag objects is then not necessary. @attention after modifying the mapped collections job_tags, tags or models make sure to call flush() or commit() on the session before accessing those mappings in other objects else integrity errors may occur! """ ## @var id # (int, primary key) database id of the job ## @var project_id # (int, foreign key) database id of the related project ## @var name # job name, should be short, must be unique within a project ## @var mode # pmsco calculation mode ## @var machine # name of the computing facility ## @var git_hash # git hash of the used code if under version control ## @var datetime # start date and time of the job, ISO format (yyyy-mm-dd hh:mm:ss) ## @var processes # number of processes ## @var hours # job run time (wall time) in hours ## @var description # up to the user ## @var job_tags # collection of related job tags # # defines the relationship between Job and JobTag objects. # the instance attribute maps tag keys (str) to JobTag objects. ## @var tags # collection of tags # # maps tag keys (str) to tag values (str). # this is an association proxy of job_tags. ## @var models # collection of related models # # defines the relationship between Job and Model objects. # the instance attribute maps model numbers to Model objects __tablename__ = "Jobs" id = Column(Integer, Sequence('job_id_seq'), primary_key=True) project_id = Column(Integer, ForeignKey('Projects.id'), index=True) name = Column(String(50, collation='NOCASE'), nullable=False) mode = Column(String(20, collation='NOCASE')) machine = Column(String(50, collation='NOCASE')) git_hash = Column(String(50, collation='NOCASE')) datetime = Column(String(50)) processes = Column(Integer) hours = Column(Float) description = Column(String(200, collation='NOCASE')) job_tags = relationship('JobTag', back_populates='job', collection_class=attribute_mapped_collection('tag_key'), cascade="all, delete, delete-orphan") # mapping tag_key -> tag_value tags = association_proxy('job_tags', 'value', creator=lambda k, v: JobTag(key=k, value=v)) models = relationship('Model', back_populates='job', collection_class=attribute_mapped_collection('model'), cascade="all, delete, delete-orphan") def __repr__(self): try: project_name = repr(self.project.name) except AttributeError: project_name = None try: job_name = repr(self.name) except AttributeError: job_name = None return f'Job({project_name}, {job_name}, {repr(self.mode)})' class Tag(Base): """ database object representing a tag name """ ## @var id # (int, primary key) database id of the tag name ## @var key # tag name/key, should be short, must be unique ## @var tag_jobs # collection of related JobTag objects # # defines the relationship between Tag and JobTag objects. __tablename__ = "Tags" id = Column(Integer, Sequence('tag_id_seq'), primary_key=True) key = Column(String(20, collation='NOCASE'), nullable=False, unique=True) tag_jobs = relationship('JobTag', back_populates='tag', cascade="all, delete, delete-orphan") def __init__(self, key): self.key = key def __repr__(self): return f'Tag({repr(self.key)})' class JobTag(Base): """ association object class for job tags Job - Tag is a many-to-many relationship built using this association class. by using the dictionary-like Job.tags proxy, explicit creation of association objects can be avoided. the class applies the [UniqueObjectValidateOnPending pattern](https://github.com/sqlalchemy/sqlalchemy/wiki/UniqueObjectValidatedOnPending) to look up existing tags in the database when a Tag object is needed and only the key is given. """ ## @var id # (int, primary key) database id of the job tag ## @var tag_id # (int, foreign key) database id of the related tag name ## @var job_id # (int, foreign key) database id of the related job ## @var value # value (str) of the job tag ## @var tag # associated Tag object # # defines the relationship between JobTag and Tag objects ## @var job # associated Job object # # defines the relationship between JobTag and Job objects ## @var tag_key # key (name) of the asscoiated Tag object # # this is an association proxy that provides direct access to tag.key # or links to or creates a Tag object behind the scenes. __tablename__ = "JobTags" id = Column(Integer, Sequence('jobtag_id_seq'), primary_key=True) tag_id = Column(Integer, ForeignKey('Tags.id'), index=True) job_id = Column(Integer, ForeignKey('Jobs.id'), index=True) value = Column(String(200, collation='NOCASE')) tag = relationship("Tag", back_populates="tag_jobs") job = relationship("Job", back_populates="job_tags") tag_key = association_proxy("tag", "key") def __init__(self, key=None, value=None): if key is not None: self.tag_key = key self.value = value @validates("tag") def _validate_tag(self, key, value): """ receive the event that occurs when `jobtag.tag` is set. if the object is present in a Session, then make sure it's the Tag object that we looked up from the database. otherwise, do nothing and we'll fix it later when the object is put into a Session. @param key: attribute name, i.e., 'tag' @param value: a JobTag object """ sess = object_session(self) if sess is not None: return _setup_tag(sess, value) else: return value @event.listens_for(Session, "transient_to_pending") def _validate_tag(session, object_): """ receive a JobTag object when it gets attached to a Session to correct its unique Tag relationship. """ if isinstance(object_, JobTag): if object_.tag is not None and object_.tag.id is None: old_tag = object_.tag new_tag = _setup_tag(session, object_.tag) if new_tag is not old_tag: if old_tag in session: session.expunge(old_tag) object_.tag = new_tag def _setup_tag(session, tag_object): """ given a Session and a Tag object, return the correct Tag object from the database. """ with session.no_autoflush: try: return session.query(Tag).filter_by(key=tag_object.key).one() except NoResultFound: return tag_object class Model(Base): """ database object representing a model the object holds the model number (which is unique within the context of a single job only), the diagnostic generation and particle values, and refers to the job where the model is used. the class also defines relationship properties that simplify access to referenced objects. for instance, parameter values can be accessed via the values['param_key'] mapping proxy. examples: ~~~~~~{.py} model = Model(model=10, gen=5, particle=2) model.job = job1_object model.values['dA'] = 25.6 model.deltas['dA'] = 0.1 pv = ParamValue(value=39.0, delta=-0.3) model.param_values['dB'] = pv result = Result(calc_id=calc_id, rfac=0.77) model.results.append(result) ~~~~~~ @attention after modifying the mapped collections param_values, values or deltas, make sure to call flush() or commit() on the session before accessing those mappings in another model else integrity errors may occur! """ ## @var id # (int, primary key) database id of the model ## @var job_id # (int, foreign key) database id of the related job ## @var model # (int) model number as used in the task index of pmsco # # @note the model number is not unique in the database as multiple jobs can produce same task indices. # the unique number, self.id is not used in pmsco code. ## @var gen # (int) generation number assigned by some optimizers. defaults to None. ## @var particle # (int) particle number assigned by some optimizers. defaults to None. ## @var job # associated Job # # defines the relationship between Model and Job objects. ## @var results # collection of Result objects # # defines the relationship between Model and Result objects. ## @var param_values # collection of ParamValue objects # # defines the relationship between Model and ParamValue objects. # the instance attribute maps parameter keys to ParamValue objects. ## @var values # collection of parameter values # # this is an association proxy that maps parameter keys to parameter values (ParamValue.value). # ParamValue objects are accessed and created behind the scene. ## @var deltas # collection of delta values # # this is an association proxy that maps parameter keys to parameter deltas (ParamValue.delta. # ParamValue objects are accessed and created behind the scene. __tablename__ = "Models" id = Column(Integer, Sequence('model_id_seq'), primary_key=True) job_id = Column(Integer, ForeignKey('Jobs.id'), index=True) model = Column(Integer, index=True) gen = Column(Integer) particle = Column(Integer) job = relationship("Job", back_populates="models") results = relationship('Result', back_populates='model', cascade="all, delete, delete-orphan") # mapping param_key -> ParamValue object param_values = relationship('ParamValue', back_populates='model', collection_class=attribute_mapped_collection('param_key'), cascade="all, delete, delete-orphan") # mapping param_key -> param_value values = association_proxy('param_values', 'value', creator=lambda k, v: ParamValue(key=k, value=v)) deltas = association_proxy('param_values', 'delta', creator=lambda k, v: ParamValue(key=k, delta=v)) def __repr__(self): return f'Model(id={repr(self.id)}, job_id={repr(self.job_id)}, model={repr(self.model)})' def as_dict(self): """ object properties in a dictionary. the dictionary keys correspond to the column names of numpy arrays. the mapping db_field -> column name is declared in pmsco.database.util.DB_SPECIAL_PARAMS @return: (dict) """ d = {'_db_model_id': self.id} for attr, key in db_util.DB_SPECIAL_PARAMS.items(): try: d[key] = getattr(self, attr) except AttributeError: pass return d class Result(Base): """ database object representing a calculation result the result object holds the calculated R-factor per job and calculation index. the calculation index (CalcID) is not unique in the database because it may contain results from multiple jobs. thus, the object links to a Model object which is unique. the calc_id property can be used to reconstruct a CalcID. """ ## @var id # (int, primary key) database id of the result ## @var model_id # (int, foreign key) database id of the related model ## @var model # associated Model object # # defines the relationship between Result and Model objects. # # @attention do not confuse the Result.model and Model.model attributes of same name! # to obtain the model number to which a result belongs, use Result.model.model. ## @var scan # (int) scan index as used in the calculations ## @var domain # (int) domain index as used in the calculations ## @var emit # (int) emitter index as used in the calculations ## @var region # (int) region index as used in the calculations ## @var rfac # (float) calculated R-factor ## @var timestamp # (float) end date and time of this calculation task # # the float value represents seconds since jan 1, 1970 (datetime.datetime.timestamp). # the datetime proxy converts to and from python datetime.datetime. ## @var datetime # (datetime.datetime) end date and time of this calculation task # # this is a conversion proxy for timestamp. ## @var secs # (float) total duration of the calculation task in seconds # # total cpu time necessary to get this result (including child tasks) in seconds. ## @var calc_id # (CalcID) calculation task index # # conversion proxy for the task index components. # # on assignment, the scan, domain, emit and region attributes are updated. # it does not update the model index as it is not stored by this object! # the model index must be set separately in the linked Model object. __tablename__ = "Results" id = Column(Integer, Sequence('result_id_seq'), primary_key=True) model_id = Column(Integer, ForeignKey('Models.id'), index=True) scan = Column(Integer, index=True) domain = Column(Integer, index=True) emit = Column(Integer, index=True) region = Column(Integer, index=True) rfac = Column(Float) timestamp = Column(Float) secs = Column(Float) model = relationship("Model", back_populates="results") def __init__(self, calc_id=None, scan=None, domain=None, emit=None, region=None, rfac=None, timestamp=None, secs=None): if calc_id is not None: self.calc_id = calc_id else: self.scan = scan self.domain = domain self.emit = emit self.region = region self.rfac = rfac self.timestamp = timestamp self.secs = secs def __repr__(self): return f'Result(model_id={repr(self.model_id)}, calc_id={repr(self.calc_id)}, rfac={repr(self.rfac)})' @property def calc_id(self): return CalcID(self.model.model, self.scan, self.domain, self.emit, self.region) @calc_id.setter def calc_id(self, calc_id): self.scan = calc_id.scan self.domain = calc_id.domain self.emit = calc_id.emit self.region = calc_id.region @property def datetime(self): return datetime.datetime.fromtimestamp(self.timestamp) @datetime.setter def datetime(self, value): self.timestamp = value.timestamp() def as_dict(self): """ object properties in a dictionary. the dictionary keys correspond to the column names of numpy arrays. the mapping db_field -> column name is declared in pmsco.database.util.D.B_SPECIAL_PARAMS @return: (dict) """ d = {'_db_result_id': self.id} for attr, key in db_util.DB_SPECIAL_PARAMS.items(): try: d[key] = getattr(self, attr) except AttributeError: pass return d class Param(Base): """ database object representing a parameter the parameter object holds the name (or key) of a calculation parameter. explicit creation of parameter objects can be avoided by using the mappings of the Model class. """ ## @var id # (int, primary key) database id of the parameter name ## @var key # parameter name/key as used in calculations, should be very short, must be unique ## @var param_values # collection of related ParamValue objects # # defines the relationship between Param and ParamValue objects. __tablename__ = "Params" id = Column(Integer, Sequence('param_id_seq'), primary_key=True) key = Column(String(20, collation='NOCASE'), nullable=False, unique=True) param_values = relationship('ParamValue', back_populates='param', cascade="all, delete, delete-orphan") def __init__(self, key): self.key = key def __repr__(self): return f'Param({repr(self.key)})' class ParamValue(Base): """ association object class for parameter values Model - Param is a many-to-many relationship built using this association class. by using the dictionary-like Model.values and Model.deltas proxies, explicit creation of association objects can be avoided. the class applies the [UniqueObjectValidateOnPending pattern](https://github.com/sqlalchemy/sqlalchemy/wiki/UniqueObjectValidatedOnPending) to look up existing params in the database when a Param object is needed and only the key is given. """ ## @var id # (int, primary key) database id of the parameter value ## @var param_id # (int, foreign key) database id of the related parameter name ## @var model_id # (int, foreign key) database id of the related model ## @var value # (float) numeric value of the parameter ## @var delta # (float) numeric delta value of the parameter (reported by some optimizers) ## @var param # associated Param object # # defines the relationship between ParamValue and Param objects ## @var model # associated Model object # # defines the relationship between ParamValue and Model objects ## @var param_key # key (name) of the asscoiated Param object # # this is an association proxy that provides direct access to param.key. # it accesses or creates Param objects behind the scenes. __tablename__ = "ParamValues" id = Column(Integer, Sequence('paramvalue_id_seq'), primary_key=True) param_id = Column(Integer, ForeignKey('Params.id'), index=True) model_id = Column(Integer, ForeignKey('Models.id'), index=True) value = Column(Float) delta = Column(Float) param = relationship("Param", back_populates="param_values") model = relationship("Model", back_populates="param_values") param_key = association_proxy('param', 'key') def __init__(self, model=None, param=None, key=None, value=None, delta=None): if model is not None: self.model = model if param is not None: self.param = param elif key is not None: self.param_key = key self.value = value self.delta = delta @validates("param") def _validate_param(self, key, value): """ receive the event that occurs when `paramvalue.param` is set. if the object is present in a Session, then make sure it's the Param object that we looked up from the database. otherwise, do nothing and we'll fix it later when the object is put into a Session. """ sess = object_session(self) if sess is not None: return _setup_param(sess, value) else: return value @event.listens_for(Session, "transient_to_pending") def _validate_param(session, object_): """ receive a ParamValue object when it gets attached to a Session to correct its unique Param relationship. """ if isinstance(object_, ParamValue): if object_.param is not None and object_.param.id is None: old_param = object_.param new_param = _setup_param(session, object_.param) if new_param is not old_param: if old_param in session: session.expunge(old_param) object_.param = new_param def _setup_param(session, param_object): """ given a Session and a Tag object, return the correct Tag object from the database. """ with session.no_autoflush: try: return session.query(Param).filter_by(key=param_object.key).one() except NoResultFound: return param_object @event.listens_for(Engine, "connect") def set_sqlite_pragma(dbapi_connection, connection_record): """ set sqlite pragmas. make sure sqlite enforces relational integrity. @param dbapi_connection: @param connection_record: @return: """ cursor = dbapi_connection.cursor() cursor.execute("PRAGMA foreign_keys=ON") cursor.close() def sqlite_link(path=None): """ format the sqlalchemy link to an sqlite3 database. @param path: file path. if empty, an in-memory database is created. @return: (str) database link for the sqlalchemy engine. """ if not path: path = ':memory:' return f'sqlite:///{path}' def connect(db_link): """ connect to the database. create the sqlalchemy engine and bind the session maker. the database engine and session maker are global. this function should be called only once in a process. @param db_link: (str) database link expected by the sqlalchemy engine @return: None """ global engine engine = create_engine(db_link, echo=False) Base.metadata.create_all(engine) Session.configure(bind=engine)