Files
pmsco-public/tests/database/test_ingest.py

212 lines
8.6 KiB
Python

"""
@package tests.database.test_ingest
unit tests for pmsco.database
the purpose of these tests is to help debugging the code.
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
@pre nose must be installed (python-nose package on Debian).
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2016 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
import unittest
import pmsco.database.access as db
import pmsco.database.ingest as db_ingest
import pmsco.database.orm as orm
import pmsco.dispatch as dispatch
from tests.database.test_common import setup_sample_database
class TestDatabase(unittest.TestCase):
def setUp(self):
self.db = db.DatabaseAccess()
self.db.connect(":memory:")
def tearDown(self):
pass
@classmethod
def setup_class(cls):
# before any methods in this class
pass
@classmethod
def teardown_class(cls):
# teardown_class() after any methods in this class
pass
def test_insert_result(self):
with self.db.session() as session:
objs = setup_sample_database(session)
index = dispatch.CalcID(15, 16, 17, 18, -1)
result_data = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8}
result_delta = {'parA': 0.4123, 'parB': 0.8567}
model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], index, result_data, result_delta)
session.commit()
# model
q = session.query(orm.Model)
q = q.filter(orm.Model.job_id == objs['j1'].id)
q = q.filter(orm.Model.model == index.model)
m = q.one()
self.assertIsNot(model_obj, objs['m1'])
self.assertIs(m, model_obj)
self.assertEqual(m.id, model_obj.id)
self.assertEqual(m.job_id, objs['j1'].id)
self.assertEqual(m.model, index.model)
self.assertEqual(m.gen, result_data['_gen'])
self.assertEqual(m.particle, result_data['_particle'])
# result
q = session.query(orm.Result)
q = q.filter(orm.Result.model_id == model_obj.id)
r = q.one()
self.assertIsNot(r, objs['r1'])
self.assertIs(r, result_obj)
self.assertEqual(r.id, result_obj.id)
self.assertIs(r.model, model_obj)
self.assertEqual(r.scan, index.scan)
self.assertEqual(r.domain, index.domain)
self.assertEqual(r.emit, index.emit)
self.assertEqual(r.region, index.region)
self.assertEqual(r.rfac, result_data['_rfac'])
self.assertEqual(r.secs, result_data['_secs'])
# param values
q = session.query(orm.ParamValue)
q = q.filter(orm.ParamValue.model_id == model_obj.id)
pvs = q.all()
values = {pv.param_key: pv.value for pv in pvs}
deltas = {pv.param_key: pv.delta for pv in pvs}
for k in result_data:
if k[0] != '_':
self.assertAlmostEqual(values[k], result_data[k])
self.assertAlmostEqual(deltas[k], result_delta[k])
self.assertAlmostEqual(m.values[k], result_data[k])
self.assertAlmostEqual(m.deltas[k], result_delta[k])
def test_update_result(self):
"""
test update an existing model and result
update parameters parA and parB and rfac of result (91, -1, -1, -1, -1)
@return: None
"""
with self.db.session() as session:
objs = setup_sample_database(session)
index = dispatch.CalcID(91, -1, -1, -1, -1)
result_data = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8}
result_delta = {'parA': 0.4123, 'parB': 0.8567}
model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], index, result_data, result_delta)
session.commit()
# model
q = session.query(orm.Model)
q = q.filter(orm.Model.job_id == objs['j1'].id)
q = q.filter(orm.Model.model == index.model)
m = q.one()
self.assertIs(model_obj, objs['m1'])
self.assertIs(m, objs['m1'])
self.assertEqual(m.id, model_obj.id)
self.assertEqual(m.job_id, objs['j1'].id)
self.assertEqual(m.model, index.model)
self.assertEqual(m.gen, result_data['_gen'])
self.assertEqual(m.particle, result_data['_particle'])
# result
q = session.query(orm.Result)
q = q.filter(orm.Result.model_id == model_obj.id)
r = q.one()
self.assertIs(result_obj, objs['r1'])
self.assertIs(r, objs['r1'])
self.assertEqual(r.id, result_obj.id)
self.assertIs(r.model, model_obj)
self.assertEqual(r.scan, index.scan)
self.assertEqual(r.domain, index.domain)
self.assertEqual(r.emit, index.emit)
self.assertEqual(r.region, index.region)
self.assertEqual(r.rfac, result_data['_rfac'])
self.assertEqual(r.secs, result_data['_secs'])
# param values
q = session.query(orm.ParamValue)
q = q.filter(orm.ParamValue.model_id == model_obj.id)
pvs = q.all()
values = {pv.param_key: pv.value for pv in pvs}
deltas = {pv.param_key: pv.delta for pv in pvs}
for k in result_data:
if k[0] != '_':
self.assertAlmostEqual(values[k], result_data[k])
self.assertAlmostEqual(deltas[k], result_delta[k])
self.assertAlmostEqual(m.values[k], result_data[k])
self.assertAlmostEqual(m.deltas[k], result_delta[k])
def test_update_result_dict(self):
"""
test update an existing model and result with dictionary arguments
update parameters parA and parB and rfac of result (91, -1, -1, -1, -1)
@return: None
"""
with self.db.session() as session:
objs = setup_sample_database(session)
result_data = {'_model': 91, '_scan': -1, '_domain': -1, '_emit': -1, '_region': -1,
'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8}
result_delta = {'parA': 0.4123, 'parB': 0.8567}
model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], result_data, result_data, result_delta)
session.commit()
# model
q = session.query(orm.Model)
q = q.filter(orm.Model.job_id == objs['j1'].id)
q = q.filter(orm.Model.model == result_data['_model'])
m = q.one()
self.assertIs(model_obj, objs['m1'])
self.assertIs(m, objs['m1'])
self.assertEqual(m.id, model_obj.id)
self.assertEqual(m.job_id, objs['j1'].id)
self.assertEqual(m.model, result_data['_model'])
self.assertEqual(m.gen, result_data['_gen'])
self.assertEqual(m.particle, result_data['_particle'])
# result
q = session.query(orm.Result)
q = q.filter(orm.Result.model_id == model_obj.id)
r = q.one()
self.assertIs(result_obj, objs['r1'])
self.assertIs(r, objs['r1'])
self.assertEqual(r.id, result_obj.id)
self.assertIs(r.model, model_obj)
self.assertEqual(r.scan, result_data['_scan'])
self.assertEqual(r.domain, result_data['_domain'])
self.assertEqual(r.emit, result_data['_emit'])
self.assertEqual(r.region, result_data['_region'])
self.assertEqual(r.rfac, result_data['_rfac'])
# param values
q = session.query(orm.ParamValue)
q = q.filter(orm.ParamValue.model_id == model_obj.id)
pvs = q.all()
values = {pv.param_key: pv.value for pv in pvs}
deltas = {pv.param_key: pv.delta for pv in pvs}
for k in result_data:
if k[0] != '_':
self.assertAlmostEqual(values[k], result_data[k])
self.assertAlmostEqual(deltas[k], result_delta[k])
self.assertAlmostEqual(m.values[k], result_data[k])
self.assertAlmostEqual(m.deltas[k], result_delta[k])
if __name__ == '__main__':
unittest.main()