Files
pmsco-public/pmsco/database/common.py

330 lines
13 KiB
Python

"""
@package pmsco.database.common
common database operations
this module gathers a number of common database operations.
all functions require an open session object from pmsco.database.access.DatabaseAccess.
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2016-21 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
import logging
import sqlalchemy
import pmsco.database.orm as orm
logger = logging.getLogger(__name__)
def filter_project(query, project_or_name_or_id):
"""
filter a query by project
@param query: sqlalchemy query object
@param project_or_name_or_id: orm.Project object or project name or project id.
@return: modified query
"""
if isinstance(project_or_name_or_id, orm.Project):
query = query.filter(orm.Project == project_or_name_or_id)
elif isinstance(project_or_name_or_id, int):
query = query.filter(orm.Project.id == project_or_name_or_id)
else:
query = query.filter(orm.Project.name == project_or_name_or_id)
return query
def filter_job(query, job_or_name_or_id):
"""
filter a query by job
@param query: sqlalchemy query object
@param job_or_name_or_id: orm.Job object or job name or job id.
@return: modified query
"""
if isinstance(job_or_name_or_id, orm.Job):
query = query.filter(orm.Job == job_or_name_or_id)
elif isinstance(job_or_name_or_id, int):
query = query.filter(orm.Job.id == job_or_name_or_id)
else:
query = query.filter(orm.Job.name == job_or_name_or_id)
return query
def query_params(session, project=None, job=None):
"""
query parameter names and their associated objects from the database
the result is a dictionary of orm.Param objects mapped to their respective keys.
the parameters can be filtered by project and/or job.
if no arguments are given, parameters from all projects are returned.
@note make sure previous changes have been committed. else the query may not find all records.
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
@param project: orm.Project object or project name or project id.
default: don't filter projects.
@param job: orm.Job object or job name or job id.
default: don't filter jobs
@return: dictionary of parameters
"""
query = session.query(orm.Param).join(orm.ParamValue).join(orm.Model).join(orm.Job).join(orm.Project)
if project is not None:
query = filter_project(query, project)
if job is not None:
query = filter_job(query, job)
params = query.all()
params = {param.key: param for param in params}
return params
def query_tags(session, project=None, job=None):
"""
query tag names and their associated objects from the database
the result is a dictionary of orm.Tag objects mapped to their respective keys.
the tags can be filtered by project and/or job.
if no arguments are given, tags from all projects are returned.
@note the orm.Job.tags mapping is an alternative way to access job tags.
@note make sure previous changes have been committed. else the query may not find all records.
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
@param project: orm.Project object or project name or project id.
default: don't filter projects.
@param job: orm.Job object or job name or job id.
default: don't filter jobs
@return: dictionary of tags
"""
query = session.query(orm.Tag).join(orm.JobTag).join(orm.Job).join(orm.Project)
if project is not None:
query = filter_project(query, project)
if job is not None:
query = filter_job(query, job)
tags = query.all()
tags = {tag.key: tag for tag in tags}
return tags
def query_job_tags(session, project=None, job=None):
"""
query tags (keys and values) from the database
the result is a dictionary of tag values (str) mapped to their respective keys (str).
the tags can be filtered by project and/or job.
if no arguments are given, tags from all projects are returned.
@note for one specific job, this is equivalent to the orm.Job.tags mapping.
@note make sure previous changes have been committed. else the query may not find all records.
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
@param project: orm.Project object or project name or project id.
default: don't filter projects.
@param job: orm.Job object or job name or job id.
default: don't filter jobs
@return: tags dictionary {key: value}
"""
query = session.query(orm.JobTag).join(orm.Job).join(orm.Project)
if project is not None:
query = filter_project(query, project)
if job is not None:
query = filter_job(query, job)
job_tags = query.all()
job_tags = {jt.tag.key: jt.value for jt in job_tags}
return job_tags
def register_project(session, name, code, allow_existing=False):
"""
register (insert or query) a project with the database.
a new project record with the given parameters is inserted into the database.
if a project of the same name already exists, the existing record is returned.
@attention the orm.Project.id field is undefined until the session is committed!
it's better to identify a project by name or orm.Project object.
@note make sure previous changes have been committed. else the query may not find an existing project.
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
the session is committed if a new project entry has been added.
@param name: project name. must be unique within the database.
@param code: name of the project module.
@param allow_existing: selects the behaviour if a project record exists in the database:
return the corresponding orm.Project (True) or raise an exception (False, default).
the exception is ValueError.
@return: orm.Project object.
the object can be used and modified as long as the session is active.
note that the id attribute is invalid until the session is committed!
@raise ValueError if the job exists and allow_existing is False.
"""
query = session.query(orm.Project)
query = query.filter(orm.Project.name == name)
project = query.one_or_none()
if project is None:
project = orm.Project(name=name, code=code)
session.add(project)
session.commit()
elif not allow_existing:
raise ValueError(f"project {project.name} exists")
return project
def get_project(session, project_or_name_or_id):
"""
resolve a project by name or id.
this function resolves a project specification to an orm.Project object.
if `project_or_name_or_id` is an orm.Project object, it just returns that object without any checks.
else, the project is looked up in the database.
@attention if `project_or_name_or_id` is an orm.Project object the function returns it without checks!
that means if the object is detached, you cannot use it to query results from the database.
if you need an object that is valid and in sync with the database,
resolve it by name or id!
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
@param project_or_name_or_id: orm.Project object or project name or project id.
@return: orm.Project object
"""
if isinstance(project_or_name_or_id, orm.Project):
project = project_or_name_or_id
elif isinstance(project_or_name_or_id, int):
project = session.query(orm.Project).get(project_or_name_or_id)
else:
query = session.query(orm.Project)
query = query.filter(orm.Project.name == project_or_name_or_id)
project = query.one()
return project
def register_job(session, project, job_name, allow_existing=False, **job_attr):
"""
register (insert or query) a new job with the database.
a new job record with the given parameters is inserted into the database.
if a job of the same name exists within the given project, the existing record is returned
(without modifications!).
@note make sure previous changes have been committed. else the query may not find an existing project.
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
the session is committed if a new job entry has been added.
@param project: orm.Project object or project name or project id.
@param job_name: name of job. unique in the project
@param job_attr: optional attributes of the job.
the keywords correspond to attribute names of the pmsco.database.Job object.
@param allow_existing: selects the behaviour if a job record exists in the database:
return the corresponding orm.Job (True) or raise an exception (False, default).
the exception is ValueError.
@return: orm.Job object.
the object can be used and modified as long as the session is active.
note that the id attribute is invalid until the session is committed!
@raise ValueError if the job exists and allow_existing is False.
"""
project = get_project(session, project)
query = session.query(orm.Job).join(orm.Project)
query = query.filter(orm.Project.name == project.name)
query = query.filter(orm.Job.name == job_name)
job = query.one_or_none()
if job is None:
job = orm.Job()
job.name = job_name
job.project = project
optional_args = {'mode', 'machine', 'git_hash', 'datetime', 'processes', 'hours', 'description'}
for name, value in job_attr.items():
if name in optional_args:
setattr(job, name, value)
session.add(job)
session.commit()
elif not allow_existing:
raise ValueError(f"a job {job_name} exists in project {project.name}")
return job
def get_job(session, project_or_name_or_id, job_or_name_or_id):
"""
resolve a job by name or id.
this function resolves any combination of project and job specification to an orm.Job object.
if `job_or_name_or_id` is an orm.Job object, it just returns that object without any checks.
else, the job is looked up in the database.
@attention if `job_or_name_or_id` is an orm.Job object the function returns it without checks!
that means if the object is detached, you cannot query results from the database.
if you need an object that is valid and in sync with the database,
query the job by name or id!
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
@param project_or_name_or_id: orm.Project object or project name or project id.
@param job_or_name_or_id: orm.Job object or job name or job id.
@return: orm.Job object
"""
if isinstance(job_or_name_or_id, orm.Job):
job = job_or_name_or_id
elif isinstance(job_or_name_or_id, int):
job = session.query(orm.Job).get(job_or_name_or_id)
else:
project = get_project(session, project_or_name_or_id)
query = session.query(orm.Job).join(orm.Project)
query = query.filter(orm.Project.name == project.name)
query = query.filter(sqlalchemy.or_(orm.Job.id == job_or_name_or_id,
orm.Job.name == job_or_name_or_id))
job = query.one()
return job
def register_job_tags(session, job, tags):
"""
insert or update key-value tags of a job
this is one of many options to populate the Tag and JobTag tables.
it is not required to use this function.
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
@param job: orm.Job object
@param tags: dictionary of tags
@return: None
"""
for k, v in tags.items():
job.tags[k] = v
if tags:
session.commit()
def register_params(session, params):
"""
register (insert missing) parameter names
add new parameter names to the global list of parameter names.
this is one of many options to populate the Param table.
it is not required to use this function.
this function implies a session flush.
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
the session is committed if new parameters have been added
@param params: sequence of parameter names
param names with leading underscore are ignored.
@return: None
"""
existing_params = query_params(session).keys()
params = [param for param in params if param[0] != '_']
new_params = set(params) - set(existing_params)
for k in new_params:
session.add(orm.Param(key=k))
if new_params:
session.commit()