import datetime import numpy as np from pathlib import Path import unittest import pmsco.database.access as db_access import pmsco.database.orm as db_orm import pmsco.reports.results as rp_results import pmsco.dispatch as dispatch def setup_sample_database(session): p1 = db_orm.Project(name="oldproject", code="oldcode") p2 = db_orm.Project(name="unittest", code="testcode") j1 = db_orm.Job(project=p1, name="oldjob", mode="oldmode", machine="oldhost", datetime=datetime.datetime.now()) j2 = db_orm.Job(project=p2, name="testjob", mode="testmode", machine="testhost", datetime=datetime.datetime.now()) pk1 = db_orm.Param(key='parA') pk2 = db_orm.Param(key='parB') pk3 = db_orm.Param(key='parC') m1 = db_orm.Model(job=j1, model=91) m2 = db_orm.Model(job=j2, model=92) r1 = db_orm.Result(calc_id=dispatch.CalcID(91, -1, -1, -1, -1), rfac=0.534, secs=37.9) r1.model = m1 pv1 = db_orm.ParamValue(model=m1, param=pk1, value=1.234, delta=0.1234) pv2 = db_orm.ParamValue(model=m1, param=pk2, value=5.678, delta=-0.5678) pv3 = db_orm.ParamValue(model=m2, param=pk3, value=6.785, delta=0.6785) objects = {'p1': p1, 'p2': p2, 'j1': j1, 'j2': j2, 'm1': m1, 'm2': m2, 'r1': r1, 'pv1': pv1, 'pv2': pv2, 'pv3': pv3, 'pk1': pk1, 'pk2': pk2, 'pk3': pk3} session.add_all(objects.values()) session.commit() return objects class TestResultsMethods(unittest.TestCase): def test_array_range(self): dtype = [('A', 'f8'), ('B', 'f8'), ('C', 'f8')] data = np.array([(1.5, 3.5, 3.5), (1.6, 2.6, 2.6), (1.7, 2.7, 3.7), (1.8, 2.8, 3.8)], dtype=dtype) exp_rmin = {'A': 1.5, 'B': 2.6, 'C': 2.6} exp_rmax = {'A': 1.8, 'B': 3.5, 'C': 3.8} rmin, rmax = rp_results.array_range(data) self.assertEqual(exp_rmin, rmin) self.assertEqual(exp_rmax, rmax) class TestResultData(unittest.TestCase): def setUp(self): self.db = db_access.DatabaseAccess() self.db.connect(":memory:") def test_update_collections(self): data_dir = Path(__file__).parent.parent data_file = data_dir / "test_swarm.setup_with_results.1.dat" raw_values = np.atleast_1d(np.genfromtxt(data_file, names=True)) rd = rp_results.ResultData() rd.values = raw_values rd.update_collections() np.testing.assert_array_equal(rd.generations, np.array((1, 2, 3, 4))) def test_load_from_db(self): with self.db.session() as session: setup_sample_database(session) rd = rp_results.ResultData() rd.levels = {'scan': -1} rd.load_from_db(session) if __name__ == '__main__': unittest.main()