""" @package tests.database.test_ingest unit tests for pmsco.database the purpose of these tests is to help debugging the code. to run the tests, change to the directory which contains the tests directory, and execute =nosetests=. @pre nose must be installed (python-nose package on Debian). @author Matthias Muntwiler, matthias.muntwiler@psi.ch @copyright (c) 2016 by Paul Scherrer Institut @n Licensed under the Apache License, Version 2.0 (the "License"); @n you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 """ import unittest import pmsco.database.access as db import pmsco.database.ingest as db_ingest import pmsco.database.orm as orm import pmsco.dispatch as dispatch from tests.database.test_common import setup_sample_database class TestDatabase(unittest.TestCase): def setUp(self): self.db = db.DatabaseAccess() self.db.connect(":memory:") def tearDown(self): pass @classmethod def setup_class(cls): # before any methods in this class pass @classmethod def teardown_class(cls): # teardown_class() after any methods in this class pass def test_insert_result(self): with self.db.session() as session: objs = setup_sample_database(session) index = dispatch.CalcID(15, 16, 17, 18, -1) result_data = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8} result_delta = {'parA': 0.4123, 'parB': 0.8567} model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], index, result_data, result_delta) session.commit() # model q = session.query(orm.Model) q = q.filter(orm.Model.job_id == objs['j1'].id) q = q.filter(orm.Model.model == index.model) m = q.one() self.assertIsNot(model_obj, objs['m1']) self.assertIs(m, model_obj) self.assertEqual(m.id, model_obj.id) self.assertEqual(m.job_id, objs['j1'].id) self.assertEqual(m.model, index.model) self.assertEqual(m.gen, result_data['_gen']) self.assertEqual(m.particle, result_data['_particle']) # result q = session.query(orm.Result) q = q.filter(orm.Result.model_id == model_obj.id) r = q.one() self.assertIsNot(r, objs['r1']) self.assertIs(r, result_obj) self.assertEqual(r.id, result_obj.id) self.assertIs(r.model, model_obj) self.assertEqual(r.scan, index.scan) self.assertEqual(r.domain, index.domain) self.assertEqual(r.emit, index.emit) self.assertEqual(r.region, index.region) self.assertEqual(r.rfac, result_data['_rfac']) self.assertEqual(r.secs, result_data['_secs']) # param values q = session.query(orm.ParamValue) q = q.filter(orm.ParamValue.model_id == model_obj.id) pvs = q.all() values = {pv.param_key: pv.value for pv in pvs} deltas = {pv.param_key: pv.delta for pv in pvs} for k in result_data: if k[0] != '_': self.assertAlmostEqual(values[k], result_data[k]) self.assertAlmostEqual(deltas[k], result_delta[k]) self.assertAlmostEqual(m.values[k], result_data[k]) self.assertAlmostEqual(m.deltas[k], result_delta[k]) def test_update_result(self): """ test update an existing model and result update parameters parA and parB and rfac of result (91, -1, -1, -1, -1) @return: None """ with self.db.session() as session: objs = setup_sample_database(session) index = dispatch.CalcID(91, -1, -1, -1, -1) result_data = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8} result_delta = {'parA': 0.4123, 'parB': 0.8567} model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], index, result_data, result_delta) session.commit() # model q = session.query(orm.Model) q = q.filter(orm.Model.job_id == objs['j1'].id) q = q.filter(orm.Model.model == index.model) m = q.one() self.assertIs(model_obj, objs['m1']) self.assertIs(m, objs['m1']) self.assertEqual(m.id, model_obj.id) self.assertEqual(m.job_id, objs['j1'].id) self.assertEqual(m.model, index.model) self.assertEqual(m.gen, result_data['_gen']) self.assertEqual(m.particle, result_data['_particle']) # result q = session.query(orm.Result) q = q.filter(orm.Result.model_id == model_obj.id) r = q.one() self.assertIs(result_obj, objs['r1']) self.assertIs(r, objs['r1']) self.assertEqual(r.id, result_obj.id) self.assertIs(r.model, model_obj) self.assertEqual(r.scan, index.scan) self.assertEqual(r.domain, index.domain) self.assertEqual(r.emit, index.emit) self.assertEqual(r.region, index.region) self.assertEqual(r.rfac, result_data['_rfac']) self.assertEqual(r.secs, result_data['_secs']) # param values q = session.query(orm.ParamValue) q = q.filter(orm.ParamValue.model_id == model_obj.id) pvs = q.all() values = {pv.param_key: pv.value for pv in pvs} deltas = {pv.param_key: pv.delta for pv in pvs} for k in result_data: if k[0] != '_': self.assertAlmostEqual(values[k], result_data[k]) self.assertAlmostEqual(deltas[k], result_delta[k]) self.assertAlmostEqual(m.values[k], result_data[k]) self.assertAlmostEqual(m.deltas[k], result_delta[k]) def test_update_result_dict(self): """ test update an existing model and result with dictionary arguments update parameters parA and parB and rfac of result (91, -1, -1, -1, -1) @return: None """ with self.db.session() as session: objs = setup_sample_database(session) result_data = {'_model': 91, '_scan': -1, '_domain': -1, '_emit': -1, '_region': -1, 'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_gen': 3, '_particle': 21, '_secs': 27.8} result_delta = {'parA': 0.4123, 'parB': 0.8567} model_obj, result_obj = db_ingest.insert_result(session, objs['j1'], result_data, result_data, result_delta) session.commit() # model q = session.query(orm.Model) q = q.filter(orm.Model.job_id == objs['j1'].id) q = q.filter(orm.Model.model == result_data['_model']) m = q.one() self.assertIs(model_obj, objs['m1']) self.assertIs(m, objs['m1']) self.assertEqual(m.id, model_obj.id) self.assertEqual(m.job_id, objs['j1'].id) self.assertEqual(m.model, result_data['_model']) self.assertEqual(m.gen, result_data['_gen']) self.assertEqual(m.particle, result_data['_particle']) # result q = session.query(orm.Result) q = q.filter(orm.Result.model_id == model_obj.id) r = q.one() self.assertIs(result_obj, objs['r1']) self.assertIs(r, objs['r1']) self.assertEqual(r.id, result_obj.id) self.assertIs(r.model, model_obj) self.assertEqual(r.scan, result_data['_scan']) self.assertEqual(r.domain, result_data['_domain']) self.assertEqual(r.emit, result_data['_emit']) self.assertEqual(r.region, result_data['_region']) self.assertEqual(r.rfac, result_data['_rfac']) # param values q = session.query(orm.ParamValue) q = q.filter(orm.ParamValue.model_id == model_obj.id) pvs = q.all() values = {pv.param_key: pv.value for pv in pvs} deltas = {pv.param_key: pv.delta for pv in pvs} for k in result_data: if k[0] != '_': self.assertAlmostEqual(values[k], result_data[k]) self.assertAlmostEqual(deltas[k], result_delta[k]) self.assertAlmostEqual(m.values[k], result_data[k]) self.assertAlmostEqual(m.deltas[k], result_delta[k]) if __name__ == '__main__': unittest.main()