471 lines
17 KiB
Python
471 lines
17 KiB
Python
"""
|
|
@package pmsco.database.query
|
|
specialized query functions for the pmsco database
|
|
|
|
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
|
|
|
|
@copyright (c) 2016-21 by Paul Scherrer Institut @n
|
|
Licensed under the Apache License, Version 2.0 (the "License"); @n
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
"""
|
|
|
|
import logging
|
|
import numpy as np
|
|
from sqlalchemy import func
|
|
import pmsco.database.orm as orm
|
|
import pmsco.database.util as util
|
|
import pmsco.dispatch as dispatch
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def query_newest_job(session):
|
|
"""
|
|
retrieve the entry of the newest job
|
|
|
|
the newest entry is determined by the datetime field.
|
|
|
|
@param session:
|
|
|
|
@return: pmsco.database.orm.Job object
|
|
"""
|
|
q = session.query(orm.Job)
|
|
q = q.order_by(orm.Job.datetime.desc(), orm.Job.id.desc())
|
|
job = q.first()
|
|
return job
|
|
|
|
|
|
def query_model(session, job_id=None, model_id=None, model=None):
|
|
"""
|
|
retrieve model parameters and control variables from the database.
|
|
|
|
@param model_id: id of the model in the database.
|
|
|
|
@return: (dict, dict) value dictionary and delta dictionary.
|
|
dictionary keys are parameter values.
|
|
the special value '_model' is included.
|
|
"""
|
|
query = session.query(orm.ParamValue)
|
|
if job_id is not None:
|
|
query = query.filter(orm.Job.id == job_id)
|
|
if model_id is not None:
|
|
query = query.filter(orm.Model.id == model_id)
|
|
if model is not None:
|
|
query = query.filter(orm.Model.model == model)
|
|
result = query.all()
|
|
|
|
param_value = {}
|
|
param_delta = {}
|
|
model_obj = None
|
|
for pv in result:
|
|
if model_obj is None:
|
|
model_obj = pv.model
|
|
param_value[pv.param.key] = pv.value
|
|
param_delta[pv.param.key] = pv.delta
|
|
|
|
param_value['_model_id'] = model_obj.id
|
|
param_value['_model'] = model_obj.model
|
|
param_value['_gen'] = model_obj.gen
|
|
param_value['_particle'] = model_obj.particle
|
|
param_delta['_model_id'] = model_obj.id
|
|
param_delta['_model'] = model_obj.model
|
|
param_delta['_gen'] = model_obj.gen
|
|
param_delta['_particle'] = model_obj.particle
|
|
|
|
return param_value, param_delta
|
|
|
|
|
|
def query_results(session, job_id):
|
|
query = session.query(orm.Result)
|
|
query = query.join(orm.Model)
|
|
query = query.filter(orm.Job == job_id)
|
|
return None
|
|
|
|
|
|
def query_tasks(session, job_id):
|
|
"""
|
|
query the task index used in a calculation job.
|
|
|
|
this query neglects the model index
|
|
and returns the unique tuples (-1, scan, domain, emit, region).
|
|
|
|
@param job_id: (int) id of the associated Jobs entry.
|
|
|
|
@return list of pmsco.dispatch.CalcID tuples of task indices.
|
|
the model attribute is -1 in all elements.
|
|
"""
|
|
query = session.query(orm.Result.scan, orm.Result.domain, orm.Result.emit, orm.Result.region)
|
|
query = query.join(orm.Model)
|
|
query = query.filter(orm.Model.job_id == job_id)
|
|
query = query.distinct()
|
|
query = query.order_by(orm.Result.scan, orm.Result.domain, orm.Result.emit, orm.Result.region)
|
|
results = query.all()
|
|
|
|
output = []
|
|
for row in results:
|
|
d = row._asdict()
|
|
d['model'] = -1
|
|
output.append(dispatch.CalcID(**d))
|
|
|
|
return output
|
|
|
|
|
|
def query_best_task_models(session, job_id, level, count):
|
|
"""
|
|
query N best models per task.
|
|
|
|
this query is used by the file tracker to determine the models to keep.
|
|
|
|
@param job_id: (int) id of the associated Jobs entry.
|
|
@param level: level up to which to query.
|
|
the level can be specified by level name (str) or numeric index (0..4).
|
|
if it is scan (equivalent to 1), the method queries the model and scan levels.
|
|
@param count: number of models to query per task.
|
|
|
|
@return set of matching model numbers (Models.model field).
|
|
"""
|
|
|
|
try:
|
|
level = int(level)
|
|
except ValueError:
|
|
level = dispatch.CALC_LEVELS.index(level)
|
|
assert 0 <= level < len(dispatch.CALC_LEVELS)
|
|
|
|
def _query_models(t):
|
|
query = session.query(orm.Model.model).join(orm.Job).join(orm.Result)
|
|
query = query.filter(orm.Job.id == job_id)
|
|
query = query.filter(orm.Result.scan == t.scan)
|
|
query = query.filter(orm.Result.domain == t.domain)
|
|
query = query.filter(orm.Result.emit == t.emit)
|
|
query = query.filter(orm.Result.region == t.region)
|
|
query = query.order_by(orm.Result.rfac)
|
|
results = query[0:count]
|
|
return set((row.model for row in results))
|
|
|
|
tasks = query_tasks(session, job_id)
|
|
models = set()
|
|
for task in tasks:
|
|
if task.numeric_level <= level:
|
|
q_models = _query_models(task)
|
|
models |= q_models
|
|
|
|
return models
|
|
|
|
|
|
def query_model_params_array(session, jobs=None, models=None, order=None, limit=None):
|
|
"""
|
|
query parameter values and return them in a numpy array
|
|
|
|
the models table can be filtered by job and/or model.
|
|
else, the whole database is returned (which might be huge!).
|
|
|
|
@param session:
|
|
@param jobs: filter by job.
|
|
the argument can be a singleton or sequence of orm.Job objects or numeric id.
|
|
@param models: filter by model.
|
|
the argument can be a singleton or sequence of orm.Model objects or their id.
|
|
@param order: ordering of results. this can be a sequence of orm.Model attributes.
|
|
the default order is by job_id and model.
|
|
@param limit: maximum number of models to return
|
|
@return: dict['values']: numpy values array, dict['deltas']: numpy deltas array
|
|
"""
|
|
count_query = session.query(orm.Model)
|
|
pn_query = session.query(orm.Param.key)
|
|
pv_query = session.query(orm.ParamValue)
|
|
|
|
if jobs:
|
|
try:
|
|
jobs = [int(jobs)]
|
|
except TypeError:
|
|
pass
|
|
job_ids = [j if isinstance(j, int) else j.id for j in jobs]
|
|
count_query = count_query.filter(orm.Model.job_id.in_(job_ids))
|
|
pn_query = pn_query.filter(orm.Model.job_id.in_(job_ids))
|
|
pv_query = pv_query.filter(orm.Model.job_id.in_(job_ids))
|
|
|
|
if models:
|
|
try:
|
|
models = [int(models)]
|
|
except TypeError:
|
|
pass
|
|
model_ids = [m if isinstance(m, int) else m.id for m in models]
|
|
count_query = count_query.filter(orm.ParamValue.model_id.in_(model_ids))
|
|
pn_query = pn_query.filter(orm.ParamValue.model_id.in_(model_ids))
|
|
pv_query = pv_query.filter(orm.ParamValue.model_id.in_(model_ids))
|
|
|
|
if order is not None:
|
|
pv_query = pv_query.order_by(*order)
|
|
else:
|
|
pv_query = pv_query.order_by(orm.Model.job_id, orm.Model.model)
|
|
if limit:
|
|
pv_query = pv_query[0:limit]
|
|
|
|
n_models = count_query.count()
|
|
param_names = pn_query.all()
|
|
param_values = pv_query.all()
|
|
|
|
special_names = orm.Model().as_dict().keys()
|
|
dt_names = special_names + param_names
|
|
dt = np.dtype([(n, util.field_to_numpy_type(n)) for n in sorted(dt_names, key=str.lower)])
|
|
values = np.zeros((n_models,), dtype=dt)
|
|
deltas = np.zeros((n_models,), dtype=dt)
|
|
|
|
for i, pv in enumerate(param_values):
|
|
for k, v in pv.model.as_dict():
|
|
values[i][k] = deltas[i][k] = v
|
|
values[i][pv.param_key] = pv.value
|
|
deltas[i][pv.param_key] = pv.delta
|
|
|
|
return {'values': values, 'deltas': deltas}
|
|
|
|
|
|
calc_id_props = {'model': orm.Model.model,
|
|
'scan': orm.Result.scan,
|
|
'domain': orm.Result.domain,
|
|
'emit': orm.Result.emit,
|
|
'region': orm.Result.region}
|
|
|
|
|
|
def query_model_results_array(session, jobs=None, models=None, order=None, limit=None,
|
|
query_hook=None, hook_data=None, include_params=False, **index):
|
|
"""
|
|
query a results table with flexible filtering options
|
|
|
|
the function returns a structured numpy array of the results and, optionally, parameter values.
|
|
the database is fully flattened, row of the array represents one result.
|
|
|
|
the jobs and models arguments filter for specific jobs and/or models.
|
|
|
|
custom filters can be added in a query hook function.
|
|
the hook function receives an sqlalchemy Query object of the Result table,
|
|
joined with the Model and Job tables.
|
|
other joins must be added explicitly.
|
|
the hook function can add more filters and return the modified query.
|
|
|
|
the hook function is called after the filters from the other function arguments
|
|
(job, models, index) have been applied,
|
|
and before the ordering and limit are applied.
|
|
|
|
@param session:
|
|
@param jobs: filter by job.
|
|
the argument can be a singleton or sequence of orm.Job objects or numeric id.
|
|
@param models: filter by model.
|
|
the argument can be a singleton or sequence of orm.Model objects or their id.
|
|
@param order: ordering of results. this can be a sequence of orm.Result attributes.
|
|
the default order is by `orm.Result.rfac`.
|
|
to override the default ascending order, append a modifier, e.g., `orm.Result.rfac.desc()`.
|
|
@param limit: maximum number of models to return
|
|
@param query_hook: hook function that modifies an sqlalchemy.orm.Query object.
|
|
the function receives the query as first argument, and any data from hook_data as keyword arguments.
|
|
it must return the modified query object.
|
|
@param hook_data: (dict) keyword arguments to be passed to the query_hook function.
|
|
@param include_params: include parameter values of each model in the result.
|
|
by default, only data from the Model and Result records is included.
|
|
@param index: filters the results list by scan, domain, emit, and/or region index.
|
|
for example, to get only the final results per model, specify `scan=-1`.
|
|
@return: numpy values array
|
|
"""
|
|
results_query = session.query(orm.Result).join(orm.Model).join(orm.Job)
|
|
|
|
if jobs:
|
|
results_query = filter_objects(results_query, orm.Job, jobs)
|
|
|
|
if models:
|
|
results_query = filter_objects(results_query, orm.Model, models)
|
|
|
|
for k, v in index.items():
|
|
results_query = results_query.filter(calc_id_props[k] == v)
|
|
|
|
if query_hook is not None:
|
|
results_query = query_hook(results_query, **hook_data)
|
|
|
|
if order is not None:
|
|
results_query = results_query.order_by(*order)
|
|
if limit:
|
|
results = results_query[0:limit]
|
|
else:
|
|
results = results_query.all()
|
|
n_results = len(results)
|
|
logger.debug(f"query_model_results_array: {results_query.statement} ({n_results} rows)")
|
|
|
|
dt_names = [n for n in util.DB_SPECIAL_PARAMS.values()]
|
|
if include_params:
|
|
model_ids = {r.model_id for r in results}
|
|
pn_query = session.query(orm.Param.key).join(orm.ParamValue)
|
|
pn_query = pn_query.filter(orm.ParamValue.model_id.in_(model_ids))
|
|
pn_query = pn_query.distinct()
|
|
pn_query = pn_query.order_by(orm.Param.key)
|
|
p_names = [r.key for r in pn_query.all()]
|
|
dt_names.extend(p_names)
|
|
logger.debug(f"query_model_results_array: {pn_query.statement} ({len(p_names)} rows)")
|
|
|
|
dt = []
|
|
v0 = []
|
|
for n in dt_names:
|
|
ft = util.field_to_numpy_type(n)
|
|
dt.append((n, ft))
|
|
v0.append(np.nan if ft[0] == 'f' else 0)
|
|
dt = np.dtype(dt)
|
|
v0 = np.array([tuple(v0)], dtype=dt)
|
|
values_array = np.full((n_results,), v0, dtype=dt)
|
|
deltas_array = np.full((n_results,), v0, dtype=dt)
|
|
|
|
for i, r in enumerate(results):
|
|
d = {**r.as_dict(), **r.model.as_dict()}
|
|
for k, v in d.items():
|
|
try:
|
|
values_array[i][k] = v
|
|
except TypeError:
|
|
values_array[i][k] = 0
|
|
deltas_array[i] = values_array[i]
|
|
if include_params:
|
|
for k, v in r.model.values.items():
|
|
values_array[i][k] = v
|
|
for k, v in r.model.deltas.items():
|
|
deltas_array[i][k] = v
|
|
|
|
return values_array, deltas_array
|
|
|
|
|
|
def query_best_models_per_job(session, projects=None, jobs=None, task_level='model', order=None, limit=None):
|
|
"""
|
|
return the best model (by rfac) of each selected job
|
|
|
|
the query gathers the R-factors of the selected jobs at the selected task levels
|
|
and, for each job, returns the (database) model id where the lowest R-factor is reported
|
|
among the gathered results.
|
|
|
|
this can be useful if you want to compile a report of the best model per job.
|
|
|
|
@param session:
|
|
@param projects: filter by project.
|
|
the argument can be a singleton or sequence of orm.Project objects or numeric id.
|
|
@param jobs: filter by job.
|
|
the argument can be a singleton or sequence of orm.Job objects or numeric id.
|
|
@param task_level: element of or index into @ref pmsco.dispatch.CALC_LEVELS.
|
|
deepest task_level to include in the query.
|
|
results on deeper levels are not considered.
|
|
e.g. if you pass 'scan', R-factors of individual scans are included in the query.
|
|
note that including deeper levels will not increase the number of results returned.
|
|
the lowest level that can be specified is `emit`.
|
|
@param order: ordering of results. this can be a sequence of orm.Result attributes.
|
|
the default order is by `orm.Result.rfac`.
|
|
@param limit: maximum number of models to return
|
|
|
|
@return sequence of (orm.Model, orm.Result) tuples.
|
|
the number of results corresponds to the number of jobs in the filter scope.
|
|
to find out details of the models, execute another query that filters on these model ids.
|
|
|
|
the method produces an SQL query similar to:
|
|
@code{.sql}
|
|
select Models.id from Models
|
|
join Results on Models.id = Results.model_id
|
|
join Jobs on Models.job_id = Jobs.id
|
|
where scan=-1
|
|
and project_id=1
|
|
and job_id in (1,2,3)
|
|
group by Models.job_id
|
|
having min(rfac)
|
|
order by rfac
|
|
@endcode
|
|
"""
|
|
|
|
try:
|
|
level = dispatch.CALC_LEVELS.index(task_level) + 1
|
|
except ValueError:
|
|
level = task_level + 1
|
|
try:
|
|
level_name = dispatch.CALC_LEVELS[level]
|
|
except IndexError:
|
|
level_name = dispatch.CALC_LEVELS[4]
|
|
|
|
query = session.query(orm.Model, orm.Result).join(orm.Result)
|
|
|
|
if projects:
|
|
query = filter_objects(query, orm.Project, projects)
|
|
|
|
if jobs:
|
|
query = filter_objects(query, orm.Job, jobs)
|
|
|
|
query = query.filter(getattr(orm.Result, level_name) == -1)
|
|
query = query.group_by(orm.Model.job_id)
|
|
query = query.having(func.min(orm.Result.rfac))
|
|
|
|
if order is not None:
|
|
query = query.order_by(*order)
|
|
else:
|
|
query = query.order_by(orm.Result.rfac)
|
|
if limit:
|
|
query = query[0:limit]
|
|
else:
|
|
query = query.all()
|
|
|
|
return query
|
|
|
|
|
|
def filter_objects(query, entity, objects):
|
|
"""
|
|
filter a query for the given objects
|
|
|
|
apply a simple object filter to a database query.
|
|
the criteria can be a single object or a sequence of objects.
|
|
the objects can be specified either by their object representation or numeric id.
|
|
the query is filtered by id.
|
|
thus, in the first case, the objects must have a valid id.
|
|
|
|
@param query: sqlalchemy.orm.Query object that queries a table that is linked to the entity table.
|
|
the function joins the entity table.
|
|
a table with a direct foreign key relationship to the entity table must already be in the query.
|
|
@param entity: orm entity class, e.g. pmsco.database.orm.Project.
|
|
@param objects: singleton or sequence of orm objects or their numeric ids.
|
|
|
|
@return: modified query
|
|
"""
|
|
# avoid duplicate joins
|
|
if str(query.statement).find(entity.__tablename__) < 0:
|
|
query = query.join(entity)
|
|
try:
|
|
objects = [p if isinstance(p, int) else p.id for p in objects]
|
|
query = query.filter(entity.id.in_(objects))
|
|
except TypeError:
|
|
object = objects if isinstance(objects, int) else objects.id
|
|
query = query.filter(entity.id == object)
|
|
return query
|
|
|
|
|
|
def filter_task_levels(query, level='model', include_parents=False):
|
|
"""
|
|
refine a query by filtering by task level.
|
|
|
|
@param query: sqlalchemy.orm.Query object that queries the Result table
|
|
(possibly joined with others).
|
|
@param level: element of or index into @ref pmsco.dispatch.CALC_LEVELS.
|
|
deepest task_level to include in the query.
|
|
results on deeper levels are not considered.
|
|
e.g. if you pass 'scan', R-factors of individual scans are included in the query.
|
|
the lowest level that can be specified is `emit`.
|
|
@param include_parents: by default, the query will return only results from the given level.
|
|
if True, combined results (parents) will be returned as well.
|
|
"""
|
|
|
|
try:
|
|
level = dispatch.CALC_LEVELS.index(level)
|
|
except ValueError:
|
|
level = int(level)
|
|
child_level = level + 1
|
|
|
|
try:
|
|
child_level_name = dispatch.CALC_LEVELS[child_level]
|
|
level_name = dispatch.CALC_LEVELS[level]
|
|
except IndexError:
|
|
child_level_name = dispatch.CALC_LEVELS[4]
|
|
level_name = dispatch.CALC_LEVELS[3]
|
|
|
|
query = query.filter(getattr(orm.Result, child_level_name) == -1)
|
|
if not include_parents:
|
|
query = query.filter(getattr(orm.Result, level_name) >= 0)
|
|
|
|
return query
|