212 lines
8.6 KiB
Python
212 lines
8.6 KiB
Python
"""
|
|
@package tests.database.test_ingest
|
|
unit tests for pmsco.database
|
|
|
|
the purpose of these tests is to help debugging the code.
|
|
|
|
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
|
|
|
|
@pre nose must be installed (python-nose package on Debian).
|
|
|
|
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
|
|
|
|
@copyright (c) 2016 by Paul Scherrer Institut @n
|
|
Licensed under the Apache License, Version 2.0 (the "License"); @n
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
"""
|
|
|
|
import unittest
|
|
import pmsco.database.access as db
|
|
import pmsco.database.ingest as db_ingest
|
|
import pmsco.database.orm as orm
|
|
import pmsco.dispatch as dispatch
|
|
from tests.database.test_common import setup_sample_database
|
|
|
|
|
|
class TestDatabase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.db = db.DatabaseAccess()
|
|
self.db.connect(":memory:")
|
|
|
|
def tearDown(self):
|
|
pass
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
# before any methods in this class
|
|
pass
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
# teardown_class() after any methods in this class
|
|
pass
|
|
|
|
def test_insert_result(self):
|
|
with self.db.session() as session:
|
|
objs = setup_sample_database(session)
|
|
index = dispatch.CalcID(15, 16, 17, 18, -1)
|
|
result_data = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8}
|
|
result_delta = {'parA': 0.4123, 'parB': 0.8567}
|
|
model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], index, result_data, result_delta)
|
|
session.commit()
|
|
|
|
# model
|
|
q = session.query(orm.Model)
|
|
q = q.filter(orm.Model.job_id == objs['j1'].id)
|
|
q = q.filter(orm.Model.model == index.model)
|
|
m = q.one()
|
|
self.assertIsNot(model_obj, objs['m1'])
|
|
self.assertIs(m, model_obj)
|
|
self.assertEqual(m.id, model_obj.id)
|
|
self.assertEqual(m.job_id, objs['j1'].id)
|
|
self.assertEqual(m.model, index.model)
|
|
self.assertEqual(m.gen, result_data['_gen'])
|
|
self.assertEqual(m.particle, result_data['_particle'])
|
|
|
|
# result
|
|
q = session.query(orm.Result)
|
|
q = q.filter(orm.Result.model_id == model_obj.id)
|
|
r = q.one()
|
|
self.assertIsNot(r, objs['r1'])
|
|
self.assertIs(r, result_obj)
|
|
self.assertEqual(r.id, result_obj.id)
|
|
self.assertIs(r.model, model_obj)
|
|
self.assertEqual(r.scan, index.scan)
|
|
self.assertEqual(r.domain, index.domain)
|
|
self.assertEqual(r.emit, index.emit)
|
|
self.assertEqual(r.region, index.region)
|
|
self.assertEqual(r.rfac, result_data['_rfac'])
|
|
self.assertEqual(r.secs, result_data['_secs'])
|
|
|
|
# param values
|
|
q = session.query(orm.ParamValue)
|
|
q = q.filter(orm.ParamValue.model_id == model_obj.id)
|
|
pvs = q.all()
|
|
values = {pv.param_key: pv.value for pv in pvs}
|
|
deltas = {pv.param_key: pv.delta for pv in pvs}
|
|
for k in result_data:
|
|
if k[0] != '_':
|
|
self.assertAlmostEqual(values[k], result_data[k])
|
|
self.assertAlmostEqual(deltas[k], result_delta[k])
|
|
self.assertAlmostEqual(m.values[k], result_data[k])
|
|
self.assertAlmostEqual(m.deltas[k], result_delta[k])
|
|
|
|
def test_update_result(self):
|
|
"""
|
|
test update an existing model and result
|
|
|
|
update parameters parA and parB and rfac of result (91, -1, -1, -1, -1)
|
|
|
|
@return: None
|
|
"""
|
|
with self.db.session() as session:
|
|
objs = setup_sample_database(session)
|
|
index = dispatch.CalcID(91, -1, -1, -1, -1)
|
|
result_data = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8}
|
|
result_delta = {'parA': 0.4123, 'parB': 0.8567}
|
|
model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], index, result_data, result_delta)
|
|
session.commit()
|
|
|
|
# model
|
|
q = session.query(orm.Model)
|
|
q = q.filter(orm.Model.job_id == objs['j1'].id)
|
|
q = q.filter(orm.Model.model == index.model)
|
|
m = q.one()
|
|
self.assertIs(model_obj, objs['m1'])
|
|
self.assertIs(m, objs['m1'])
|
|
self.assertEqual(m.id, model_obj.id)
|
|
self.assertEqual(m.job_id, objs['j1'].id)
|
|
self.assertEqual(m.model, index.model)
|
|
self.assertEqual(m.gen, result_data['_gen'])
|
|
self.assertEqual(m.particle, result_data['_particle'])
|
|
|
|
# result
|
|
q = session.query(orm.Result)
|
|
q = q.filter(orm.Result.model_id == model_obj.id)
|
|
r = q.one()
|
|
self.assertIs(result_obj, objs['r1'])
|
|
self.assertIs(r, objs['r1'])
|
|
self.assertEqual(r.id, result_obj.id)
|
|
self.assertIs(r.model, model_obj)
|
|
self.assertEqual(r.scan, index.scan)
|
|
self.assertEqual(r.domain, index.domain)
|
|
self.assertEqual(r.emit, index.emit)
|
|
self.assertEqual(r.region, index.region)
|
|
self.assertEqual(r.rfac, result_data['_rfac'])
|
|
self.assertEqual(r.secs, result_data['_secs'])
|
|
|
|
# param values
|
|
q = session.query(orm.ParamValue)
|
|
q = q.filter(orm.ParamValue.model_id == model_obj.id)
|
|
pvs = q.all()
|
|
values = {pv.param_key: pv.value for pv in pvs}
|
|
deltas = {pv.param_key: pv.delta for pv in pvs}
|
|
for k in result_data:
|
|
if k[0] != '_':
|
|
self.assertAlmostEqual(values[k], result_data[k])
|
|
self.assertAlmostEqual(deltas[k], result_delta[k])
|
|
self.assertAlmostEqual(m.values[k], result_data[k])
|
|
self.assertAlmostEqual(m.deltas[k], result_delta[k])
|
|
|
|
def test_update_result_dict(self):
|
|
"""
|
|
test update an existing model and result with dictionary arguments
|
|
|
|
update parameters parA and parB and rfac of result (91, -1, -1, -1, -1)
|
|
|
|
@return: None
|
|
"""
|
|
with self.db.session() as session:
|
|
objs = setup_sample_database(session)
|
|
result_data = {'_model': 91, '_scan': -1, '_domain': -1, '_emit': -1, '_region': -1,
|
|
'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8}
|
|
result_delta = {'parA': 0.4123, 'parB': 0.8567}
|
|
model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], result_data, result_data, result_delta)
|
|
session.commit()
|
|
|
|
# model
|
|
q = session.query(orm.Model)
|
|
q = q.filter(orm.Model.job_id == objs['j1'].id)
|
|
q = q.filter(orm.Model.model == result_data['_model'])
|
|
m = q.one()
|
|
self.assertIs(model_obj, objs['m1'])
|
|
self.assertIs(m, objs['m1'])
|
|
self.assertEqual(m.id, model_obj.id)
|
|
self.assertEqual(m.job_id, objs['j1'].id)
|
|
self.assertEqual(m.model, result_data['_model'])
|
|
self.assertEqual(m.gen, result_data['_gen'])
|
|
self.assertEqual(m.particle, result_data['_particle'])
|
|
|
|
# result
|
|
q = session.query(orm.Result)
|
|
q = q.filter(orm.Result.model_id == model_obj.id)
|
|
r = q.one()
|
|
self.assertIs(result_obj, objs['r1'])
|
|
self.assertIs(r, objs['r1'])
|
|
self.assertEqual(r.id, result_obj.id)
|
|
self.assertIs(r.model, model_obj)
|
|
self.assertEqual(r.scan, result_data['_scan'])
|
|
self.assertEqual(r.domain, result_data['_domain'])
|
|
self.assertEqual(r.emit, result_data['_emit'])
|
|
self.assertEqual(r.region, result_data['_region'])
|
|
self.assertEqual(r.rfac, result_data['_rfac'])
|
|
|
|
# param values
|
|
q = session.query(orm.ParamValue)
|
|
q = q.filter(orm.ParamValue.model_id == model_obj.id)
|
|
pvs = q.all()
|
|
values = {pv.param_key: pv.value for pv in pvs}
|
|
deltas = {pv.param_key: pv.delta for pv in pvs}
|
|
for k in result_data:
|
|
if k[0] != '_':
|
|
self.assertAlmostEqual(values[k], result_data[k])
|
|
self.assertAlmostEqual(deltas[k], result_delta[k])
|
|
self.assertAlmostEqual(m.values[k], result_data[k])
|
|
self.assertAlmostEqual(m.deltas[k], result_delta[k])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|