312 lines
17 KiB
Python
312 lines
17 KiB
Python
"""
|
|
@package tests.database.test_query
|
|
unit tests for pmsco.database
|
|
|
|
the purpose of these tests is to help debugging the code.
|
|
|
|
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
|
|
|
|
@pre nose must be installed (python-nose package on Debian).
|
|
|
|
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
|
|
|
|
@copyright (c) 2016 by Paul Scherrer Institut @n
|
|
Licensed under the Apache License, Version 2.0 (the "License"); @n
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
"""
|
|
|
|
import unittest
|
|
import numpy as np
|
|
import pmsco.database.access as db
|
|
import pmsco.database.common as db_common
|
|
import pmsco.database.ingest as db_ingest
|
|
import pmsco.database.orm as db_orm
|
|
import pmsco.database.query as db_query
|
|
import pmsco.database.util as db_util
|
|
from tests.database.test_common import setup_sample_database
|
|
|
|
|
|
def pop_query_hook(query, gen):
|
|
return query.filter(db_orm.Model.gen == gen)
|
|
|
|
|
|
class TestDatabase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.db = db.DatabaseAccess()
|
|
self.db.connect(":memory:")
|
|
|
|
def tearDown(self):
|
|
pass
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
# before any methods in this class
|
|
pass
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
# teardown_class() after any methods in this class
|
|
pass
|
|
|
|
def test_query_model_results_array(self):
|
|
with self.db.session() as session:
|
|
objs = setup_sample_database(session)
|
|
job = objs['j1']
|
|
|
|
index = {'_scan': -1, '_domain': -1, '_emit': -1, '_region': -1}
|
|
model2 = {'parA': 4.123, 'parB': 8.567, '_model': 92, '_rfac': 0.654, '_gen': 1, '_particle': 1, '_secs': 0.1}
|
|
model3 = {'parA': 3.412, 'parB': 7.856, '_model': 93, '_rfac': 0.345, '_gen': 2, '_particle': 2, '_secs': 0.2}
|
|
model4 = {'parA': 4.123, 'parB': 8.567, '_model': 94, '_rfac': 0.354, '_gen': 2, '_particle': 3, '_secs': 0.3}
|
|
model5 = {'parA': 2.341, 'parC': 6.785, '_model': 95, '_rfac': 0.453}
|
|
model6 = {'parA': 4.123, 'parB': 8.567, '_model': 96, '_rfac': 0.354, '_gen': 3, '_particle': 5, '_secs': 0.5}
|
|
model2.update(index)
|
|
model3.update(index)
|
|
model4.update(index)
|
|
model5.update(index)
|
|
model6.update(index)
|
|
m2, r2 = db_ingest.insert_result(session, job, model2, model2, model2)
|
|
m3, r3 = db_ingest.insert_result(session, job, model3, model3, model3)
|
|
m4, r4 = db_ingest.insert_result(session, job, model4, model4, model4)
|
|
m5, r5 = db_ingest.insert_result(session, job, model5, model5, model5)
|
|
m6, r6 = db_ingest.insert_result(session, job, model6, model6, model6)
|
|
session.commit()
|
|
|
|
models = [m3, m4, m5]
|
|
result_values, result_deltas = db_query.query_model_results_array(session, models=models, include_params=True)
|
|
|
|
template = ['parA', 'parB', 'parC', '_model', '_rfac', '_gen', '_particle', '_secs']
|
|
dt = [(field, db_util.field_to_numpy_type(field)) for field in template]
|
|
expected = np.zeros((len(models),), dtype=dt)
|
|
expected['parA'] = np.array([3.412, 4.123, 2.341])
|
|
expected['parB'] = np.array([7.856, 8.567, None])
|
|
expected['parC'] = np.array([None, None, 6.785])
|
|
expected['_model'] = np.array([93, 94, 95])
|
|
expected['_rfac'] = np.array([0.345, 0.354, 0.453])
|
|
expected['_gen'] = np.array([2, 2, 0])
|
|
expected['_particle'] = np.array([2, 3, 0])
|
|
expected['_secs'] = np.array([0.2, 0.3, None])
|
|
|
|
self.assertEqual(result_values.shape, expected.shape)
|
|
np.testing.assert_array_almost_equal(result_values['parA'], expected['parA'])
|
|
np.testing.assert_array_almost_equal(result_values['parB'], expected['parB'])
|
|
np.testing.assert_array_almost_equal(result_values['parC'], expected['parC'])
|
|
np.testing.assert_array_almost_equal(result_values['_model'], expected['_model'])
|
|
np.testing.assert_array_almost_equal(result_values['_gen'], expected['_gen'])
|
|
np.testing.assert_array_almost_equal(result_values['_particle'], expected['_particle'])
|
|
np.testing.assert_array_almost_equal(result_values['_rfac'], expected['_rfac'])
|
|
np.testing.assert_array_almost_equal(result_values['_secs'], expected['_secs'])
|
|
|
|
self.assertEqual(result_deltas.shape, expected.shape)
|
|
np.testing.assert_array_almost_equal(result_deltas['parA'], expected['parA'])
|
|
np.testing.assert_array_almost_equal(result_deltas['parB'], expected['parB'])
|
|
np.testing.assert_array_almost_equal(result_deltas['parC'], expected['parC'])
|
|
np.testing.assert_array_almost_equal(result_deltas['_model'], expected['_model'])
|
|
np.testing.assert_array_almost_equal(result_deltas['_gen'], expected['_gen'])
|
|
np.testing.assert_array_almost_equal(result_deltas['_particle'], expected['_particle'])
|
|
|
|
def test_query_model_results_array_index(self):
|
|
with self.db.session() as session:
|
|
objs = setup_sample_database(session)
|
|
job = objs['j1']
|
|
|
|
model = {'parA': 4.123, 'parB': 8.567, 'parC': 6.785}
|
|
|
|
index1 = {'_model': 99, '_scan': -1, '_domain': -1, '_emit': -1, '_region': -1}
|
|
index2 = {'_model': 99, '_scan': 1, '_domain': -1, '_emit': -1, '_region': -1}
|
|
index3 = {'_model': 99, '_scan': 1, '_domain': 1, '_emit': -1, '_region': -1}
|
|
index4 = {'_model': 99, '_scan': 1, '_domain': 1, '_emit': 1, '_region': -1}
|
|
index5 = {'_model': 99, '_scan': 1, '_domain': 1, '_emit': 1, '_region': 1}
|
|
|
|
result1 = {'_rfac': 0.154, '_gen': 1, '_particle': 1}
|
|
result1.update(model)
|
|
result2 = {'_rfac': 0.254, '_gen': 1, '_particle': 1}
|
|
result2.update(model)
|
|
result3 = {'_rfac': 0.354, '_gen': 1, '_particle': 1}
|
|
result3.update(model)
|
|
result4 = {'_rfac': 0.454, '_gen': 1, '_particle': 1}
|
|
result4.update(model)
|
|
result5 = {'_rfac': 0.554, '_gen': 1, '_particle': 1}
|
|
result5.update(model)
|
|
|
|
m1, r1 = db_ingest.insert_result(session, job, index1, result1, result1)
|
|
m2, r2 = db_ingest.insert_result(session, job, index2, result2, result2)
|
|
m3, r3 = db_ingest.insert_result(session, job, index3, result3, result3)
|
|
m4, r4 = db_ingest.insert_result(session, job, index4, result4, result4)
|
|
m5, r5 = db_ingest.insert_result(session, job, index5, result5, result5)
|
|
session.commit()
|
|
|
|
self.assertEqual(m1.id, m2.id)
|
|
self.assertEqual(m1.id, m3.id)
|
|
self.assertEqual(m1.id, m4.id)
|
|
self.assertEqual(m1.id, m5.id)
|
|
|
|
result_values, result_deltas = db_query.query_model_results_array(session,
|
|
model=99, domain=1, include_params=True)
|
|
|
|
pars = ['parA', 'parB', 'parC']
|
|
dt = [(k, 'f8') for k in pars]
|
|
controls = ['_model', '_scan', '_domain', '_emit', '_region', '_rfac']
|
|
dt.extend(((k, db_util.field_to_numpy_type(k)) for k in controls))
|
|
expected = np.zeros((3,), dtype=dt)
|
|
expected['parA'] = np.array([4.123, 4.123, 4.123])
|
|
expected['parB'] = np.array([8.567, 8.567, 8.567])
|
|
expected['parC'] = np.array([6.785, 6.785, 6.785])
|
|
expected['_model'] = np.array([99, 99, 99])
|
|
expected['_scan'] = np.array([1, 1, 1])
|
|
expected['_domain'] = np.array([1, 1, 1])
|
|
expected['_emit'] = np.array([-1, 1, 1])
|
|
expected['_region'] = np.array([-1, -1, 1])
|
|
expected['_rfac'] = np.array([0.354, 0.454, 0.554])
|
|
|
|
self.assertEqual(result_values.shape, expected.shape)
|
|
np.testing.assert_array_almost_equal(result_values['parA'], expected['parA'])
|
|
np.testing.assert_array_almost_equal(result_values['parB'], expected['parB'])
|
|
np.testing.assert_array_almost_equal(result_values['parC'], expected['parC'])
|
|
np.testing.assert_array_almost_equal(result_values['_model'], expected['_model'])
|
|
np.testing.assert_array_almost_equal(result_values['_scan'], expected['_scan'])
|
|
np.testing.assert_array_almost_equal(result_values['_domain'], expected['_domain'])
|
|
np.testing.assert_array_almost_equal(result_values['_emit'], expected['_emit'])
|
|
np.testing.assert_array_almost_equal(result_values['_region'], expected['_region'])
|
|
np.testing.assert_array_almost_equal(result_values['_rfac'], expected['_rfac'])
|
|
|
|
def test_query_model_results_hook(self):
|
|
with self.db.session() as session:
|
|
objs = setup_sample_database(session)
|
|
job = objs['j1']
|
|
|
|
index = {'_scan': -1, '_domain': -1, '_emit': -1, '_region': -1}
|
|
model2 = {'parA': 4.123, 'parB': 8.567, '_model': 92, '_rfac': 0.654, '_gen': 1, '_particle': 1}
|
|
model3 = {'parA': 3.412, 'parB': 7.856, '_model': 93, '_rfac': 0.345, '_gen': 2, '_particle': 2}
|
|
model4 = {'parA': 4.123, 'parB': 8.567, '_model': 94, '_rfac': 0.354, '_gen': 2, '_particle': 3}
|
|
model5 = {'parA': 2.341, 'parC': 6.785, '_model': 95, '_rfac': 0.453}
|
|
model6 = {'parA': 4.123, 'parB': 8.567, '_model': 96, '_rfac': 0.354, '_gen': 3, '_particle': 5}
|
|
model2.update(index)
|
|
model3.update(index)
|
|
model4.update(index)
|
|
model5.update(index)
|
|
model6.update(index)
|
|
m2, r2 = db_ingest.insert_result(session, job, model2, model2, model2)
|
|
m3, r3 = db_ingest.insert_result(session, job, model3, model3, model3)
|
|
m4, r4 = db_ingest.insert_result(session, job, model4, model4, model4)
|
|
m5, r5 = db_ingest.insert_result(session, job, model5, model5, model5)
|
|
m6, r6 = db_ingest.insert_result(session, job, model6, model6, model6)
|
|
session.commit()
|
|
|
|
models = [m3, m4]
|
|
hd = {'gen': 2}
|
|
result_values, result_deltas = db_query.query_model_results_array(session, include_params=True,
|
|
query_hook=pop_query_hook, hook_data=hd)
|
|
|
|
template = ['parA', 'parB', 'parC', '_model', '_rfac', '_gen', '_particle']
|
|
dt = [(field, db_util.field_to_numpy_type(field)) for field in template]
|
|
|
|
expected = np.zeros((len(models),), dtype=dt)
|
|
expected['parA'] = np.array([3.412, 4.123])
|
|
expected['parB'] = np.array([7.856, 8.567])
|
|
expected['_model'] = np.array([93, 94])
|
|
expected['_rfac'] = np.array([0.345, 0.354])
|
|
expected['_gen'] = np.array([2, 2])
|
|
expected['_particle'] = np.array([2, 3])
|
|
|
|
self.assertEqual(result_values.shape, expected.shape)
|
|
self.assertNotIn('parC', result_values.dtype.names)
|
|
np.testing.assert_array_almost_equal(result_values['parA'], expected['parA'])
|
|
np.testing.assert_array_almost_equal(result_values['parB'], expected['parB'])
|
|
np.testing.assert_array_almost_equal(result_values['_model'], expected['_model'])
|
|
np.testing.assert_array_almost_equal(result_values['_gen'], expected['_gen'])
|
|
np.testing.assert_array_almost_equal(result_values['_particle'], expected['_particle'])
|
|
|
|
def test_query_best_task_models(self):
|
|
with self.db.session() as session:
|
|
objs = setup_sample_database(session)
|
|
job = objs['j1']
|
|
|
|
model0xxx = {'_model': 0, '_scan': -1, '_domain': -1, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567,
|
|
'_rfac': 0.01}
|
|
model00xx = {'_model': 1, '_scan': 0, '_domain': -1, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567,
|
|
'_rfac': 0.02}
|
|
model000x = {'_model': 2, '_scan': 0, '_domain': 0, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567,
|
|
'_rfac': 0.03}
|
|
model01xx = {'_model': 3, '_scan': 1, '_domain': -1, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567,
|
|
'_rfac': 0.04}
|
|
model010x = {'_model': 4, '_scan': 1, '_domain': 0, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567,
|
|
'_rfac': 0.05}
|
|
|
|
model1xxx = {'_model': 5, '_scan': -1, '_domain': -1, '_emit': -1, '_region': -1, 'parA': 4.123,
|
|
'parB': 8.567, '_rfac': 0.09}
|
|
model10xx = {'_model': 6, '_scan': 0, '_domain': -1, '_emit': -1, '_region': -1, 'parA': 4.123,
|
|
'parB': 8.567, '_rfac': 0.08}
|
|
model100x = {'_model': 7, '_scan': 0, '_domain': 0, '_emit': -1, '_region': -1, 'parA': 4.123,
|
|
'parB': 8.567, '_rfac': 0.07}
|
|
model11xx = {'_model': 8, '_scan': 1, '_domain': -1, '_emit': -1, '_region': -1, 'parA': 4.123,
|
|
'parB': 8.567, '_rfac': 0.06}
|
|
model110x = {'_model': 9, '_scan': 1, '_domain': 0, '_emit': -1, '_region': -1, 'parA': 4.123,
|
|
'parB': 8.567, '_rfac': 0.05}
|
|
|
|
model2xxx = {'_model': 10, '_scan': -1, '_domain': -1, '_emit': -1, '_region': -1, 'parA': 4.123,
|
|
'parB': 8.567, '_rfac': 0.01}
|
|
|
|
db_ingest.insert_result(session, job, model0xxx, model0xxx)
|
|
db_ingest.insert_result(session, job, model00xx, model00xx)
|
|
db_ingest.insert_result(session, job, model000x, model000x)
|
|
db_ingest.insert_result(session, job, model01xx, model01xx)
|
|
db_ingest.insert_result(session, job, model010x, model010x)
|
|
|
|
db_ingest.insert_result(session, job, model1xxx, model1xxx)
|
|
db_ingest.insert_result(session, job, model10xx, model10xx)
|
|
db_ingest.insert_result(session, job, model100x, model100x)
|
|
db_ingest.insert_result(session, job, model11xx, model11xx)
|
|
db_ingest.insert_result(session, job, model110x, model110x)
|
|
|
|
db_ingest.insert_result(session, job, model2xxx, model2xxx)
|
|
|
|
result = db_query.query_best_task_models(session, job.id, level=1, count=2)
|
|
|
|
expected = {0, 1, 3, 6, 8, 10}
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_query_best_models_per_job(self):
|
|
with self.db.session() as session:
|
|
objs = setup_sample_database(session)
|
|
job = objs['j2']
|
|
|
|
model2 = {'parA': 4.123, 'parB': 8.567, '_model': 92, '_rfac': 0.654, '_gen': 1, '_particle': 2}
|
|
model3 = {'parA': 3.412, 'parB': 7.856, '_model': 93, '_rfac': 0.345, '_gen': 1, '_particle': 3}
|
|
model4 = {'parA': 4.123, 'parB': 8.567, '_model': 94, '_rfac': 0.354, '_gen': 1, '_particle': 4}
|
|
model5 = {'parA': 2.341, 'parC': 6.785, '_model': 95, '_rfac': 0.453, '_gen': 1, '_particle': 5}
|
|
model6 = {'parA': 4.123, 'parB': 8.567, '_model': 96, '_rfac': 0.354, '_gen': 1, '_particle': 6}
|
|
model7 = {'parA': 5.123, 'parB': 6.567, '_model': 97, '_rfac': 0.154, '_gen': 1, '_particle': 7}
|
|
|
|
model2.update({'_scan': -1, '_domain': -1, '_emit': -1, '_region': -1})
|
|
model3.update({'_scan': 1, '_domain': -1, '_emit': -1, '_region': -1})
|
|
model4.update({'_scan': 2, '_domain': 11, '_emit': 23, '_region': 33})
|
|
model5.update({'_scan': 3, '_domain': 11, '_emit': -1, '_region': -1})
|
|
model6.update({'_scan': 4, '_domain': 11, '_emit': 25, '_region': -1})
|
|
model7.update({'_scan': 5, '_domain': -1, '_emit': -1, '_region': -1})
|
|
m2, r2 = db_ingest.insert_result(session, job, model2, model2)
|
|
m3, r3 = db_ingest.insert_result(session, job, model3, model3)
|
|
m4, r4 = db_ingest.insert_result(session, job, model4, model4)
|
|
m5, r5 = db_ingest.insert_result(session, job, model5, model5)
|
|
m6, r6 = db_ingest.insert_result(session, job, model6, model6)
|
|
m7, r7 = db_ingest.insert_result(session, job, model7, model7)
|
|
|
|
lim = 3
|
|
query = db_query.query_best_models_per_job(session, task_level='domain', limit=lim)
|
|
expected_models = [91, 97]
|
|
self.assertEqual(len(query), len(expected_models))
|
|
for model, result in query:
|
|
self.assertIn(model.model, expected_models)
|
|
|
|
lim = 3
|
|
query = db_query.query_best_models_per_job(session, jobs=[job], task_level='domain', limit=lim)
|
|
expected_models = [97]
|
|
self.assertEqual(len(query), len(expected_models))
|
|
for model, result in query:
|
|
self.assertIn(model.model, expected_models)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|