407 lines
16 KiB
Python
407 lines
16 KiB
Python
"""
|
|
@package pmsco.database.ingest
|
|
|
|
ingest existing data such as flat results files (.dat or .tasks.dat) into a database.
|
|
|
|
the results file is a space-delimited, general text file
|
|
such as produced by pmsco.optimizers.population.Population.save_array().
|
|
each line contains one result dataset, the columns correspond to the regular and special parameters.
|
|
the first row contains the parameter names.
|
|
|
|
the main function is ingest_job_results().
|
|
the other functions require an open database session from pmsco.database.access.DatabaseAccess.session(),
|
|
and ingest the metadata and the actual results, respectively.
|
|
|
|
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
|
|
|
|
@copyright (c) 2016-21 by Paul Scherrer Institut @n
|
|
Licensed under the Apache License, Version 2.0 (the "License"); @n
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
"""
|
|
|
|
import datetime
|
|
import logging
|
|
import numpy as np
|
|
from pathlib import Path
|
|
from pmsco.database.access import DatabaseAccess
|
|
import pmsco.database.common as common
|
|
import pmsco.database.orm as orm
|
|
import pmsco.database.util as util
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def insert_result(session, job, index, result, delta=None):
|
|
"""
|
|
add or update a calculation result including index and model to the database.
|
|
|
|
@param session: (sqlalchemy.Session) database session.
|
|
when updating an existing model, previous changes must have been committed,
|
|
else the model may not be found.
|
|
this function does not commit the transaction.
|
|
@param job: (orm.Job) job object.
|
|
use pmsco.database.common.get_object to retrieve by id or name.
|
|
@param index: (pmsco.dispatch.CalcID or dict)
|
|
calculation index.
|
|
in case of dict, the keys must be the attribute names of CalcID prefixed with an underscore, i.e.,
|
|
'_model', '_scan', '_domain', '_emit', '_region'.
|
|
extra values in the dictionary are ignored.
|
|
undefined indices must be -1.
|
|
@param result: (dict) dictionary containing the parameter values and the '_rfac' result.
|
|
may also contain the special values '_gen', '_particle', '_timestamp'.
|
|
'_gen' and '_particle' are integers and default to None.
|
|
'_timestamp' can be numeric (seconds since jan 1, 1970)
|
|
or an object that implements a timestamp function like datetime.datetime.
|
|
it defaults to the current (local) time.
|
|
@param delta: (dict) dictionary containing the delta values.
|
|
the keys must correspond to model keys in the result dictionary.
|
|
this argument is optional.
|
|
|
|
@return: (orm.Model, orm.Result) model and result objects
|
|
"""
|
|
model_obj = store_model(session, job, index, result)
|
|
result_obj = store_result_data(session, model_obj, index, result)
|
|
store_param_values(session, model_obj, result, delta)
|
|
return model_obj, result_obj
|
|
|
|
|
|
def store_model(session, job, index, result):
|
|
"""
|
|
add or update the model entry for a calculation result in the database.
|
|
|
|
the method updates the Models table.
|
|
the model is identified by job and index.model.
|
|
the result is identified by job and index.
|
|
if the model exists in the database, it is updated.
|
|
|
|
@param session: (sqlalchemy.Session) database session.
|
|
when updating an existing model, previous changes must have been committed,
|
|
else the model may not be found.
|
|
this function does not commit the transaction.
|
|
@param job: (orm.Job) job object.
|
|
use pmsco.database.common.get_object to retrieve by id or name.
|
|
@param index: (pmsco.dispatch.CalcID or dict)
|
|
calculation index.
|
|
in case of dict, the keys must be the attribute names of CalcID prefixed with an underscore, i.e.,
|
|
'_model', '_scan', '_domain', '_emit', '_region'.
|
|
extra values in the dictionary are ignored.
|
|
undefined indices must be -1.
|
|
@param result: (dict) dictionary containing the parameter values and the '_rfac' result.
|
|
may also contain the special values '_gen' and '_particle'.
|
|
'_gen' and '_particle' default to None if not present.
|
|
|
|
@return: (orm.Model) updated model object
|
|
"""
|
|
assert isinstance(job, orm.Job)
|
|
|
|
model_dict = {'gen': None, 'particle': None}
|
|
model_dict.update(util.special_params(result))
|
|
try:
|
|
model_dict['model'] = index.model
|
|
except AttributeError:
|
|
model_dict['model'] = index['_model']
|
|
|
|
q = session.query(orm.Model)
|
|
q = q.filter(orm.Model.job == job)
|
|
q = q.filter(orm.Model.model == model_dict['model'])
|
|
model_obj = q.one_or_none()
|
|
|
|
if model_obj is None:
|
|
model_obj = orm.Model()
|
|
model_obj.job = job
|
|
model_obj.model = model_dict['model']
|
|
session.add(model_obj)
|
|
|
|
model_obj.gen = model_dict['gen']
|
|
model_obj.particle = model_dict['particle']
|
|
|
|
return model_obj
|
|
|
|
|
|
def store_result_data(session, model_obj, index, result):
|
|
"""
|
|
add or update a result in the database.
|
|
|
|
the method updates the Results table.
|
|
the model is identified by model_id.
|
|
the result is identified by model_id and index.
|
|
if the result exists in the database, it is updated.
|
|
|
|
@param session: (sqlalchemy.Session) database session.
|
|
when updating an existing model, previous changes must have been committed,
|
|
else the result entry may not be found.
|
|
this function does not commit the transaction.
|
|
@param model_obj: (orm.Model) model object that is already part of the session.
|
|
@param index: (pmsco.dispatch.CalcID or dict)
|
|
calculation index.
|
|
in case of dict, the keys must be the attribute names of CalcID prefixed with an underscore, i.e.,
|
|
'_model', '_scan', '_domain', '_emit', '_region'.
|
|
extra values in the dictionary are ignored.
|
|
undefined indices must be -1.
|
|
@param result: (dict) dictionary containing the parameter values and the '_rfac' result.
|
|
may also contain the special values '_gen', '_particle', '_timestamp'.
|
|
'_gen' and '_particle' are integers and default to None.
|
|
'_timestamp' can be numeric (seconds since jan 1, 1970)
|
|
or an object that implements a timestamp function like datetime.datetime.
|
|
it defaults to the current (local) time.
|
|
|
|
@return: (orm.Result) updated Results object.
|
|
"""
|
|
assert isinstance(model_obj, orm.Model)
|
|
|
|
result_dict = util.special_params(result)
|
|
result_dict.update(util.special_params(index))
|
|
|
|
q = session.query(orm.Result)
|
|
q = q.filter(orm.Result.model == model_obj)
|
|
q = q.filter(orm.Result.scan == result_dict['scan'])
|
|
q = q.filter(orm.Result.domain == result_dict['domain'])
|
|
q = q.filter(orm.Result.emit == result_dict['emit'])
|
|
q = q.filter(orm.Result.region == result_dict['region'])
|
|
|
|
result_obj = q.one_or_none()
|
|
if result_obj is None:
|
|
result_obj = orm.Result()
|
|
result_obj.model = model_obj
|
|
result_obj.scan = result_dict['scan']
|
|
result_obj.domain = result_dict['domain']
|
|
result_obj.emit = result_dict['emit']
|
|
result_obj.region = result_dict['region']
|
|
session.add(result_obj)
|
|
|
|
result_obj.rfac = result_dict['rfac']
|
|
try:
|
|
result_obj.timestamp = result_dict['timestamp'].timestamp()
|
|
except KeyError:
|
|
result_obj.timestamp = datetime.datetime.now().timestamp()
|
|
except AttributeError:
|
|
result_obj.timestamp = result_dict['timestamp']
|
|
try:
|
|
result_obj.secs = result_dict['secs']
|
|
except KeyError:
|
|
pass
|
|
|
|
return result_obj
|
|
|
|
|
|
def store_param_values(session, model_obj, result, delta=None):
|
|
"""
|
|
add or update parameter values of a model in the database.
|
|
|
|
the method updates the ParamValues table.
|
|
|
|
@param session: (sqlalchemy.Session) database session.
|
|
when updating an existing model, previous changes must have been committed,
|
|
else the result entry may not be found.
|
|
this function flushes the session at the end.
|
|
it does not commit the transaction.
|
|
@param model_obj: (orm.Model) model object that is already part of the session.
|
|
@param result: (dict) dictionary containing the parameter values.
|
|
the parameter names must exist in the Params table and in the self._model_params dictionary.
|
|
special values (with a leading underscore) are ignored.
|
|
extra parameters may raise a KeyError.
|
|
@param delta: (dict) dictionary containing the delta values.
|
|
the keys must correspond to model keys in the result dictionary.
|
|
this argument is optional.
|
|
|
|
@return: None
|
|
|
|
@raise: KeyError if a parameter key is not registered.
|
|
"""
|
|
assert isinstance(model_obj, orm.Model)
|
|
|
|
for key in util.regular_params(result).keys():
|
|
pv = orm.ParamValue()
|
|
pv.model = model_obj
|
|
pv.param_key = key
|
|
pv.value = result[key]
|
|
try:
|
|
pv.delta = delta[key]
|
|
except (TypeError, KeyError):
|
|
pass
|
|
session.add(pv)
|
|
session.flush()
|
|
|
|
|
|
def ingest_results_file(session, project, job, filename):
|
|
"""
|
|
import a results file into the database.
|
|
|
|
this is a sub-method used by ingest().
|
|
|
|
a job entry with the given id must exist,
|
|
but there must be no model entries referencing the job.
|
|
it is not possible to update existing models, results or parameter values using this method.
|
|
instead, you have to delete the job (which also deletes all dependent entries)
|
|
and re-import the results.
|
|
|
|
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
|
|
the session is flushed but not committed at the end of this function.
|
|
@param project: orm.Project object or project name or project id.
|
|
@param job: orm.Job object or job name or job id.
|
|
@param filename: path and name of the results file.
|
|
|
|
@return: None.
|
|
|
|
@raise ValueError if the job already has model entries.
|
|
"""
|
|
job = common.get_job(session, project, job)
|
|
assert isinstance(job, orm.Job)
|
|
|
|
data = np.atleast_1d(np.genfromtxt(filename, names=True))
|
|
|
|
try:
|
|
unique_models, unique_index = np.unique(data['_model'], True)
|
|
except ValueError:
|
|
unique_models = np.array([0])
|
|
unique_index = np.array([0])
|
|
unique_data = data[unique_index]
|
|
|
|
special_params = util.special_params(data.dtype.names)
|
|
|
|
model_objs = {}
|
|
# iterate on models
|
|
for _data in unique_data:
|
|
try:
|
|
_model = _data['_model']
|
|
except ValueError:
|
|
_model = unique_models[0]
|
|
model = orm.Model(job=job, model=_model)
|
|
if 'gen' in special_params:
|
|
model.gen = _data['_gen']
|
|
if 'particle' in special_params:
|
|
model.particle = _data['_particle']
|
|
session.add(model)
|
|
model_objs[_model] = model
|
|
for key, value in util.regular_params(_data).items():
|
|
model.values[key] = value
|
|
session.flush()
|
|
|
|
# iterate on results
|
|
for _data in data:
|
|
try:
|
|
_model = _data['_model']
|
|
except ValueError:
|
|
_model = unique_models[0]
|
|
result_entry = {'model': None,
|
|
'scan': -1,
|
|
'domain': -1,
|
|
'emit': -1,
|
|
'region': -1,
|
|
'rfac': None}
|
|
result_entry.update(util.special_params(_data))
|
|
result_entry['model'] = model_objs[_model]
|
|
result = orm.Result()
|
|
for key, value in result_entry.items():
|
|
setattr(result, key, value)
|
|
session.add(result)
|
|
|
|
session.flush()
|
|
|
|
|
|
def ingest_job_metadata(session, **kwargs):
|
|
"""
|
|
ingest job metadata
|
|
|
|
@param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session()
|
|
the session is flushed but not committed at the end of this function.
|
|
|
|
@param kwargs: dictionary of function arguments.
|
|
the dictionary contains the following values.
|
|
all arguments are required unless noted.
|
|
@arg 'resultsfile' (required) name of the .tasks.dat results file.
|
|
@arg 'project' (required) unique name of the project.
|
|
@arg 'code' (optional) name of the project code.
|
|
@arg 'job' (required) name of the calculation job. job name must not exist for the project yet.
|
|
@arg 'mode' (required) pmsco optimization mode.
|
|
@arg 'machine' (optional) name of the machine where the job ran.
|
|
@arg 'processes' (optional) number of processes.
|
|
@arg 'hours' (optional) run time in hours (wall time).
|
|
@arg 'git_hash' (optional) git hash of the code revision.
|
|
@arg 'datetime' (datetime.datetime) time stamp (optional).
|
|
if not specified, the argument defaults to the time stamp of the results file.
|
|
hint: the constructor of a datetime object is
|
|
`datetime.datetime(year, month, day, hour, minute, second)`.
|
|
@arg 'description' (optional) meaningful description of the calculation job, up to the user.
|
|
@arg 'jobtags' (dict, optional) key=value tags to be associated with the job
|
|
|
|
@return (orm.Project, orm.Job) orm objects of the inserted records.
|
|
|
|
@raise sqlalchemy.exc.IntegrityError if the job already exists in the database.
|
|
|
|
"""
|
|
|
|
if 'datetime' not in kwargs:
|
|
rf = Path(kwargs['resultsfile'])
|
|
kwargs['datetime'] = datetime.datetime.fromtimestamp(rf.stat().st_mtime)
|
|
|
|
project = common.register_project(session, kwargs['project'], kwargs['code'])
|
|
job = common.register_job(session, project, kwargs['job'], **kwargs)
|
|
try:
|
|
common.register_job_tags(session, job, kwargs['jobtags'])
|
|
except KeyError:
|
|
pass
|
|
|
|
session.flush()
|
|
return project, job
|
|
|
|
|
|
def ingest_job_results(**kwargs):
|
|
"""
|
|
import results from a calculation job.
|
|
|
|
this function contains all steps necessary to import the results (tasks.dat)
|
|
from a calculation job into a database.
|
|
it registers the project and job, and imports the results data.
|
|
the project may exist in the database, the job must not exist (raises an exception).
|
|
|
|
arguments can be specified as dict (**d) or in keyword=value form.
|
|
|
|
@param kwargs: dictionary of function arguments.
|
|
the dictionary contains the following values.
|
|
all arguments are required unless noted.
|
|
@arg 'workdir' (optional) path to the working directory.
|
|
the working directory of the operating system is changed.
|
|
this is the root for relative paths of the database and results files.
|
|
if not specified, the working directory is unchanged.
|
|
@arg 'dbfile' (required) name of the database file.
|
|
@arg 'project' (required) unique name of the project.
|
|
@arg 'code' (optional) name of the project code.
|
|
@arg 'job' (required) name of the calculation job. job name must not exist for the project yet.
|
|
@arg 'mode' (required) pmsco optimization mode.
|
|
@arg 'machine' (optional) name of the machine where the job ran.
|
|
@arg 'processes' (optional) number of processes.
|
|
@arg 'hours' (optional) run time in hours (wall time).
|
|
@arg 'git_hash' (optional) git hash of the code revision.
|
|
@arg 'datetime' (datetime.datetime) time stamp (optional).
|
|
if not specified, the argument defaults to the time stamp of the results file.
|
|
hint: the constructor of a datetime object is
|
|
`datetime.datetime(year, month, day, hour, minute, second)`.
|
|
@arg 'description' (optional) meaningful description of the calculation job, up to the user.
|
|
@arg 'jobtags' (dict, optional) key=value tags to be associated with the job
|
|
@arg 'resultsfile' (required) name of the .tasks.dat results file.
|
|
|
|
@return dict with 'project_id' and 'job_id'.
|
|
these are the database ids of the project and job records.
|
|
|
|
@raise sqlalchemy.exc.IntegrityError if the job already exists in the database.
|
|
"""
|
|
try:
|
|
wd = Path(kwargs['workdir'])
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
wd.cwd()
|
|
|
|
dba = DatabaseAccess()
|
|
dba.connect(kwargs['dbfile'])
|
|
with dba.session() as session:
|
|
project, job = ingest_job_metadata(session, **kwargs)
|
|
ingest_results_file(session, project, job, kwargs['resultsfile'])
|
|
session.commit()
|
|
ref = {'project_id': project.id, 'job_id': job.id}
|
|
|
|
return ref
|