""" @package pmsco.database.ingest ingest existing data such as flat results files (.dat or .tasks.dat) into a database. the results file is a space-delimited, general text file such as produced by pmsco.optimizers.population.Population.save_array(). each line contains one result dataset, the columns correspond to the regular and special parameters. the first row contains the parameter names. the main function is ingest_job_results(). the other functions require an open database session from pmsco.database.access.DatabaseAccess.session(), and ingest the metadata and the actual results, respectively. @author Matthias Muntwiler, matthias.muntwiler@psi.ch @copyright (c) 2016-21 by Paul Scherrer Institut @n Licensed under the Apache License, Version 2.0 (the "License"); @n you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 """ import datetime import logging import numpy as np from pathlib import Path from pmsco.database.access import DatabaseAccess import pmsco.database.common as common import pmsco.database.orm as orm import pmsco.database.util as util logger = logging.getLogger(__name__) def insert_result(session, job, index, result, delta=None): """ add or update a calculation result including index and model to the database. @param session: (sqlalchemy.Session) database session. when updating an existing model, previous changes must have been committed, else the model may not be found. this function does not commit the transaction. @param job: (orm.Job) job object. use pmsco.database.common.get_object to retrieve by id or name. @param index: (pmsco.dispatch.CalcID or dict) calculation index. in case of dict, the keys must be the attribute names of CalcID prefixed with an underscore, i.e., '_model', '_scan', '_domain', '_emit', '_region'. extra values in the dictionary are ignored. undefined indices must be -1. @param result: (dict) dictionary containing the parameter values and the '_rfac' result. may also contain the special values '_gen', '_particle', '_timestamp'. '_gen' and '_particle' are integers and default to None. '_timestamp' can be numeric (seconds since jan 1, 1970) or an object that implements a timestamp function like datetime.datetime. it defaults to the current (local) time. @param delta: (dict) dictionary containing the delta values. the keys must correspond to model keys in the result dictionary. this argument is optional. @return: (orm.Model, orm.Result) model and result objects """ model_obj = store_model(session, job, index, result) result_obj = store_result_data(session, model_obj, index, result) store_param_values(session, model_obj, result, delta) return model_obj, result_obj def store_model(session, job, index, result): """ add or update the model entry for a calculation result in the database. the method updates the Models table. the model is identified by job and index.model. the result is identified by job and index. if the model exists in the database, it is updated. @param session: (sqlalchemy.Session) database session. when updating an existing model, previous changes must have been committed, else the model may not be found. this function does not commit the transaction. @param job: (orm.Job) job object. use pmsco.database.common.get_object to retrieve by id or name. @param index: (pmsco.dispatch.CalcID or dict) calculation index. in case of dict, the keys must be the attribute names of CalcID prefixed with an underscore, i.e., '_model', '_scan', '_domain', '_emit', '_region'. extra values in the dictionary are ignored. undefined indices must be -1. @param result: (dict) dictionary containing the parameter values and the '_rfac' result. may also contain the special values '_gen' and '_particle'. '_gen' and '_particle' default to None if not present. @return: (orm.Model) updated model object """ assert isinstance(job, orm.Job) model_dict = {'gen': None, 'particle': None} model_dict.update(util.special_params(result)) try: model_dict['model'] = index.model except AttributeError: model_dict['model'] = index['_model'] q = session.query(orm.Model) q = q.filter(orm.Model.job == job) q = q.filter(orm.Model.model == model_dict['model']) model_obj = q.one_or_none() if model_obj is None: model_obj = orm.Model() model_obj.job = job model_obj.model = model_dict['model'] session.add(model_obj) model_obj.gen = model_dict['gen'] model_obj.particle = model_dict['particle'] return model_obj def store_result_data(session, model_obj, index, result): """ add or update a result in the database. the method updates the Results table. the model is identified by model_id. the result is identified by model_id and index. if the result exists in the database, it is updated. @param session: (sqlalchemy.Session) database session. when updating an existing model, previous changes must have been committed, else the result entry may not be found. this function does not commit the transaction. @param model_obj: (orm.Model) model object that is already part of the session. @param index: (pmsco.dispatch.CalcID or dict) calculation index. in case of dict, the keys must be the attribute names of CalcID prefixed with an underscore, i.e., '_model', '_scan', '_domain', '_emit', '_region'. extra values in the dictionary are ignored. undefined indices must be -1. @param result: (dict) dictionary containing the parameter values and the '_rfac' result. may also contain the special values '_gen', '_particle', '_timestamp'. '_gen' and '_particle' are integers and default to None. '_timestamp' can be numeric (seconds since jan 1, 1970) or an object that implements a timestamp function like datetime.datetime. it defaults to the current (local) time. @return: (orm.Result) updated Results object. """ assert isinstance(model_obj, orm.Model) result_dict = util.special_params(result) result_dict.update(util.special_params(index)) q = session.query(orm.Result) q = q.filter(orm.Result.model == model_obj) q = q.filter(orm.Result.scan == result_dict['scan']) q = q.filter(orm.Result.domain == result_dict['domain']) q = q.filter(orm.Result.emit == result_dict['emit']) q = q.filter(orm.Result.region == result_dict['region']) result_obj = q.one_or_none() if result_obj is None: result_obj = orm.Result() result_obj.model = model_obj result_obj.scan = result_dict['scan'] result_obj.domain = result_dict['domain'] result_obj.emit = result_dict['emit'] result_obj.region = result_dict['region'] session.add(result_obj) result_obj.rfac = result_dict['rfac'] try: result_obj.timestamp = result_dict['timestamp'].timestamp() except KeyError: result_obj.timestamp = datetime.datetime.now().timestamp() except AttributeError: result_obj.timestamp = result_dict['timestamp'] try: result_obj.secs = result_dict['secs'] except KeyError: pass return result_obj def store_param_values(session, model_obj, result, delta=None): """ add or update parameter values of a model in the database. the method updates the ParamValues table. @param session: (sqlalchemy.Session) database session. when updating an existing model, previous changes must have been committed, else the result entry may not be found. this function flushes the session at the end. it does not commit the transaction. @param model_obj: (orm.Model) model object that is already part of the session. @param result: (dict) dictionary containing the parameter values. the parameter names must exist in the Params table and in the self._model_params dictionary. special values (with a leading underscore) are ignored. extra parameters may raise a KeyError. @param delta: (dict) dictionary containing the delta values. the keys must correspond to model keys in the result dictionary. this argument is optional. @return: None @raise: KeyError if a parameter key is not registered. """ assert isinstance(model_obj, orm.Model) for key in util.regular_params(result).keys(): pv = orm.ParamValue() pv.model = model_obj pv.param_key = key pv.value = result[key] try: pv.delta = delta[key] except (TypeError, KeyError): pass session.add(pv) session.flush() def ingest_results_file(session, project, job, filename): """ import a results file into the database. this is a sub-method used by ingest(). a job entry with the given id must exist, but there must be no model entries referencing the job. it is not possible to update existing models, results or parameter values using this method. instead, you have to delete the job (which also deletes all dependent entries) and re-import the results. @param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session() the session is flushed but not committed at the end of this function. @param project: orm.Project object or project name or project id. @param job: orm.Job object or job name or job id. @param filename: path and name of the results file. @return: None. @raise ValueError if the job already has model entries. """ job = common.get_job(session, project, job) assert isinstance(job, orm.Job) data = np.atleast_1d(np.genfromtxt(filename, names=True)) try: unique_models, unique_index = np.unique(data['_model'], True) except ValueError: unique_models = np.array([0]) unique_index = np.array([0]) unique_data = data[unique_index] special_params = util.special_params(data.dtype.names) model_objs = {} # iterate on models for _data in unique_data: try: _model = _data['_model'] except ValueError: _model = unique_models[0] model = orm.Model(job=job, model=_model) if 'gen' in special_params: model.gen = _data['_gen'] if 'particle' in special_params: model.particle = _data['_particle'] session.add(model) model_objs[_model] = model for key, value in util.regular_params(_data).items(): model.values[key] = value session.flush() # iterate on results for _data in data: try: _model = _data['_model'] except ValueError: _model = unique_models[0] result_entry = {'model': None, 'scan': -1, 'domain': -1, 'emit': -1, 'region': -1, 'rfac': None} result_entry.update(util.special_params(_data)) result_entry['model'] = model_objs[_model] result = orm.Result() for key, value in result_entry.items(): setattr(result, key, value) session.add(result) session.flush() def ingest_job_metadata(session, **kwargs): """ ingest job metadata @param session: (sqlalchemy.Session) database session created by pmsco.database.access.DatabaseAccess.session() the session is flushed but not committed at the end of this function. @param kwargs: dictionary of function arguments. the dictionary contains the following values. all arguments are required unless noted. @arg 'resultsfile' (required) name of the .tasks.dat results file. @arg 'project' (required) unique name of the project. @arg 'code' (optional) name of the project code. @arg 'job' (required) name of the calculation job. job name must not exist for the project yet. @arg 'mode' (required) pmsco optimization mode. @arg 'machine' (optional) name of the machine where the job ran. @arg 'processes' (optional) number of processes. @arg 'hours' (optional) run time in hours (wall time). @arg 'git_hash' (optional) git hash of the code revision. @arg 'datetime' (datetime.datetime) time stamp (optional). if not specified, the argument defaults to the time stamp of the results file. hint: the constructor of a datetime object is `datetime.datetime(year, month, day, hour, minute, second)`. @arg 'description' (optional) meaningful description of the calculation job, up to the user. @arg 'jobtags' (dict, optional) key=value tags to be associated with the job @return (orm.Project, orm.Job) orm objects of the inserted records. @raise sqlalchemy.exc.IntegrityError if the job already exists in the database. """ if 'datetime' not in kwargs: rf = Path(kwargs['resultsfile']) kwargs['datetime'] = datetime.datetime.fromtimestamp(rf.stat().st_mtime) project = common.register_project(session, kwargs['project'], kwargs['code']) job = common.register_job(session, project, kwargs['job'], **kwargs) try: common.register_job_tags(session, job, kwargs['jobtags']) except KeyError: pass session.flush() return project, job def ingest_job_results(**kwargs): """ import results from a calculation job. this function contains all steps necessary to import the results (tasks.dat) from a calculation job into a database. it registers the project and job, and imports the results data. the project may exist in the database, the job must not exist (raises an exception). arguments can be specified as dict (**d) or in keyword=value form. @param kwargs: dictionary of function arguments. the dictionary contains the following values. all arguments are required unless noted. @arg 'workdir' (optional) path to the working directory. the working directory of the operating system is changed. this is the root for relative paths of the database and results files. if not specified, the working directory is unchanged. @arg 'dbfile' (required) name of the database file. @arg 'project' (required) unique name of the project. @arg 'code' (optional) name of the project code. @arg 'job' (required) name of the calculation job. job name must not exist for the project yet. @arg 'mode' (required) pmsco optimization mode. @arg 'machine' (optional) name of the machine where the job ran. @arg 'processes' (optional) number of processes. @arg 'hours' (optional) run time in hours (wall time). @arg 'git_hash' (optional) git hash of the code revision. @arg 'datetime' (datetime.datetime) time stamp (optional). if not specified, the argument defaults to the time stamp of the results file. hint: the constructor of a datetime object is `datetime.datetime(year, month, day, hour, minute, second)`. @arg 'description' (optional) meaningful description of the calculation job, up to the user. @arg 'jobtags' (dict, optional) key=value tags to be associated with the job @arg 'resultsfile' (required) name of the .tasks.dat results file. @return dict with 'project_id' and 'job_id'. these are the database ids of the project and job records. @raise sqlalchemy.exc.IntegrityError if the job already exists in the database. """ try: wd = Path(kwargs['workdir']) except KeyError: pass else: wd.cwd() dba = DatabaseAccess() dba.connect(kwargs['dbfile']) with dba.session() as session: project, job = ingest_job_metadata(session, **kwargs) ingest_results_file(session, project, job, kwargs['resultsfile']) session.commit() ref = {'project_id': project.id, 'job_id': job.id} return ref