update public distribution

based on internal repository c9a2ac8 2019-01-03 16:04:57 +0100
tagged rev-master-2.0.0
This commit is contained in:
2019-01-31 15:45:02 +01:00
parent bbd16d0f94
commit acea809e4e
92 changed files with 165828 additions and 143181 deletions

View File

@ -17,9 +17,14 @@ Licensed under the Apache License, Version 2.0 (the "License"); @n
http://www.apache.org/licenses/LICENSE-2.0
"""
import unittest
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from io import BytesIO
import math
import numpy as np
import unittest
import pmsco.cluster as mc
@ -153,7 +158,7 @@ class TestClusterFunctions(unittest.TestCase):
self.assertEqual(2, clu.get_emitter_count())
result = clu.get_emitters()
expect = [(0., 0., 0., 1), (1., 0., 1., 10)]
self.assertItemsEqual(expect, result)
self.assertEqual(expect, result)
def test_get_z_layers(self):
clu = mc.Cluster()
@ -288,13 +293,21 @@ class TestClusterFunctions(unittest.TestCase):
v_lat3 = np.asarray([0, 0, 1])
clu.add_bulk(7, v_pos, v_lat1, v_lat2, v_lat3)
clu.set_emitter(pos=v_pos)
clu.trim_cylinder(2.3, 4.2)
r0 = 2.3
z0 = 4.2
clu.trim_cylinder(r0, z0)
self.assertEqual(clu.data.dtype, clu.dtype)
self.assertEqual(clu.data.shape[0], 21 * 5)
self.assertEqual(clu.data[1]['i'], 2)
self.assertEqual(clu.data[1]['s'], 'N')
self.assertEqual(clu.data[1]['t'], 7)
self.assertEqual(clu.get_emitter_count(), 1)
n_low = np.sum(clu.data['z'] < -z0)
self.assertEqual(0, n_low)
n_high = np.sum(clu.data['z'] > z0)
self.assertEqual(0, n_high)
n_out = np.sum(clu.data['x']**2 + clu.data['y']**2 > r0**2)
self.assertEqual(0, n_out)
def test_trim_sphere(self):
clu = mc.Cluster()
@ -305,13 +318,39 @@ class TestClusterFunctions(unittest.TestCase):
v_lat3 = np.asarray([0, 0, 1])
clu.add_bulk(7, v_pos, v_lat1, v_lat2, v_lat3)
clu.set_emitter(pos=v_pos)
clu.trim_sphere(2.3)
r0 = 2.3
clu.trim_sphere(r0)
self.assertEqual(clu.data.dtype, clu.dtype)
self.assertEqual(clu.data.shape[0], 39)
self.assertEqual(clu.data[1]['i'], 2)
self.assertEqual(clu.data[1]['s'], 'N')
self.assertEqual(clu.data[1]['t'], 7)
self.assertEqual(clu.get_emitter_count(), 1)
n_out = np.sum(clu.data['x']**2 + clu.data['y']**2 + clu.data['z'] > r0**2)
self.assertEqual(0, n_out)
def test_trim_paraboloid(self):
clu = mc.Cluster()
clu.set_rmax(10.0)
v_pos = np.asarray([0, 0, 0])
v_lat1 = np.asarray([1, 0, 0])
v_lat2 = np.asarray([0, 1, 0])
v_lat3 = np.asarray([0, 0, 1])
clu.add_bulk(7, v_pos, v_lat1, v_lat2, v_lat3)
clu.set_emitter(pos=v_pos)
r0 = 3.5
z0 = -2.3
clu.trim_paraboloid(r0, z0)
self.assertEqual(clu.data.dtype, clu.dtype)
self.assertEqual(63, clu.data.shape[0])
self.assertEqual(2, clu.data[1]['i'])
self.assertEqual('N', clu.data[1]['s'])
self.assertEqual(7, clu.data[1]['t'])
self.assertEqual(1, clu.get_emitter_count())
n_low = np.sum(clu.data['z'] < z0)
self.assertEqual(0, n_low)
n_out = np.sum(clu.data['x']**2 + clu.data['y']**2 > r0**2)
self.assertEqual(0, n_out)
def test_trim_slab(self):
clu = self.create_cube()
@ -319,3 +358,21 @@ class TestClusterFunctions(unittest.TestCase):
self.assertEqual(clu.data.dtype, clu.dtype)
self.assertEqual(clu.data.shape[0], 9 * 2)
self.assertEqual(clu.get_emitter_count(), 1)
def test_save_to_file(self):
clu = self.create_cube()
f = BytesIO()
pos = np.asarray((-1, -1, 0))
clu.set_emitter(pos=pos)
clu.save_to_file(f, mc.FMT_XYZ, "qwerty", emitters_only=True)
f.seek(0)
line = f.readline()
self.assertEqual(line, b"2\n", b"line 1: " + line)
line = f.readline()
self.assertEqual(line, b"qwerty\n", b"line 2: " + line)
line = f.readline()
self.assertRegexpMatches(line, b"H +[0.]+ +[0.]+ +[0.]+", b"line 3: " + line)
line = f.readline()
self.assertRegexpMatches(line, b"Si +[01.-]+ +[01.-]+ +[0.]+", b"line 4: " + line)
line = f.readline()
self.assertEqual(line, b"", b"end of file")

View File

@ -20,6 +20,9 @@ Licensed under the Apache License, Version 2.0 (the "License"); @n
http://www.apache.org/licenses/LICENSE-2.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
import math
import numpy as np
@ -80,7 +83,7 @@ class TestDataFunctions(unittest.TestCase):
shape = (10, )
data = md.create_data(shape, dtype=md.DTYPE_ETPAIS)
expected_names = ('e', 't', 'p', 'a', 'i', 's')
self.assertItemsEqual(data.dtype.names, expected_names)
self.assertEqual(data.dtype.names, expected_names)
self.assertEqual(data.shape, shape)
def test_detect_scan_mode_1d(self):
@ -88,10 +91,11 @@ class TestDataFunctions(unittest.TestCase):
expected_mode = ['e']
expected_positions = {}
expected_positions['e'] = np.linspace(0.0, 10.0, 10)
expected_positions['e'] = np.linspace(100.0, 200.0, 10)
self.assertItemsEqual(scan_mode, expected_mode)
self.assertItemsEqual(scan_positions, expected_positions)
self.assertEqual(scan_mode, expected_mode)
for dim in expected_positions:
np.testing.assert_almost_equal(scan_positions[dim], expected_positions[dim], decimal=3)
def test_detect_scan_mode_2d(self):
scan_mode, scan_positions = md.detect_scan_mode(self.ea_scan)
@ -103,8 +107,9 @@ class TestDataFunctions(unittest.TestCase):
expected_positions['p'] = np.zeros((1))
expected_positions['a'] = np.asarray((-1.0, 0.0, 1.0, 2.0))
self.assertItemsEqual(scan_mode, expected_mode)
self.assertItemsEqual(scan_positions, expected_positions)
self.assertEqual(scan_mode, expected_mode)
for dim in expected_positions:
np.testing.assert_almost_equal(scan_positions[dim], expected_positions[dim], decimal=3)
def test_detect_scan_mode_holo(self):
scan_mode, scan_positions = md.detect_scan_mode(self.holo_scan)
@ -115,8 +120,9 @@ class TestDataFunctions(unittest.TestCase):
expected_positions['t'] = self.holo_scan['t']
expected_positions['p'] = self.holo_scan['p']
self.assertItemsEqual(scan_mode, expected_mode)
self.assertItemsEqual(scan_positions, expected_positions)
self.assertEqual(scan_mode, expected_mode)
for dim in expected_positions:
np.testing.assert_almost_equal(scan_positions[dim], expected_positions[dim], decimal=3)
def test_detect_scan_mode_theta(self):
scan = self.holo_scan
@ -130,8 +136,9 @@ class TestDataFunctions(unittest.TestCase):
expected_positions['t'] = np.linspace(1.0, 2.0, scan.shape[0])
expected_positions['p'] = np.ones((1)) * 3.3
self.assertItemsEqual(scan_mode, expected_mode)
self.assertItemsEqual(scan_positions, expected_positions)
self.assertEqual(scan_mode, expected_mode)
for dim in expected_positions:
np.testing.assert_almost_equal(scan_positions[dim], expected_positions[dim], decimal=3)
def test_calc_modfunc_mean_1d(self):
modf = md.calc_modfunc_mean(self.e_scan)
@ -178,6 +185,7 @@ class TestDataFunctions(unittest.TestCase):
exp_modf['i'] = +1.000001
np.testing.assert_array_less(modf['i'], exp_modf['i'])
@unittest.skip("test_calc_modfunc_loess_1d_nan fails and kills the whole test process")
def test_calc_modfunc_loess_1d_nan(self):
"""
check that data.calc_modfunc_loess() ignores NaNs gracefully.
@ -229,7 +237,7 @@ class TestDataFunctions(unittest.TestCase):
# the R factor should be sensitive enough to detect mixed-up axes.
exp_modf['i'] = 0.03 * np.sin((scan['e'] - 150) / 50 * math.pi)
rf = md.rfactor(modf, exp_modf)
print rf
print(rf)
self.assertLessEqual(rf, 0.50)
def test_alpha_mirror_average(self):

456
tests/test_database.py Normal file
View File

@ -0,0 +1,456 @@
"""
@package tests.test_database
unit tests for pmsco.database
the purpose of these tests is to help debugging the code.
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
@pre nose must be installed (python-nose package on Debian).
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2016 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
import datetime
import os.path
import tempfile
import shutil
import numpy as np
import pmsco.database as db
import pmsco.dispatch as dispatch
import pmsco.optimizers.population as population
class TestDatabase(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.lock_filename = os.path.join(self.test_dir, "test_database.lock")
self.db = db.ResultsDatabase()
self.db.connect(":memory:", lock_filename=self.lock_filename)
def tearDown(self):
self.db.disconnect()
shutil.rmtree(self.test_dir)
@classmethod
def setup_class(cls):
# before any methods in this class
pass
@classmethod
def teardown_class(cls):
# teardown_class() after any methods in this class
pass
def test_regular_params(self):
d1 = {'parA': 1.234, 'par_B': 5.678, '_model': 91, '_rfac': 0.534}
d2 = db.regular_params(d1)
d3 = {'parA': d1['parA'], 'par_B': d1['par_B']}
self.assertEqual(d2, d3)
self.assertIsNot(d2, d1)
def test_special_params(self):
d1 = {'parA': 1.234, 'par_B': 5.678, '_model': 91, '_rfac': 0.534, '_db_model': 99}
d2 = db.special_params(d1)
d3 = {'model': d1['_model'], 'rfac': d1['_rfac']}
self.assertEqual(d2, d3)
self.assertIsNot(d2, d1)
dt = [('parA', 'f4'), ('par_B', 'f4'), ('_model', 'i4'), ('_rfac', 'f4'), ('_db_model', 'f4')]
arr = np.zeros(1, dtype=dt)
for k, v in d1.items():
arr[0][k] = v
d4 = db.special_params(arr[0])
self.assertEqual(d4.keys(), d3.keys())
for k in d4:
self.assertAlmostEqual(d4[k], d3[k])
cid1 = dispatch.CalcID(1, 2, 3, 4, -1)
cid2 = db.special_params(cid1)
cid3 = {'model': 1, 'scan': 2, 'sym': 3, 'emit': 4, 'region': -1}
self.assertEqual(cid2, cid3)
l1 = d1.keys()
l2 = db.special_params(l1)
l3 = d3.keys()
self.assertEqual(list(l2), list(l3))
t1 = tuple(l1)
t2 = db.special_params(t1)
t3 = tuple(l3)
self.assertEqual(t2, t3)
def setup_sample_database(self):
self.db.register_project("unittest", "testcode")
self.db.register_job(self.db.project_id, "testjob", "testmode", "testhost", None, datetime.datetime.now())
self.ex_model = {'parA': 1.234, 'parB': 5.678, '_model': 91, '_rfac': 0.534}
self.db.register_params(self.ex_model)
self.db.insert_model(self.ex_model)
self.db.create_models_view()
def test_register_project(self):
id1 = self.db.register_project("unittest1", "Atest")
self.assertIsInstance(id1, int)
id2 = self.db.register_project("unittest2", "Btest")
self.assertIsInstance(id2, int)
id3 = self.db.register_project("unittest1", "Ctest")
self.assertIsInstance(id3, int)
self.assertNotEqual(id1, id2)
self.assertEqual(id1, id3)
c = self.db._conn.cursor()
c.execute("select count(*) from Projects")
count = c.fetchone()
self.assertEqual(count[0], 2)
c.execute("select name, code from Projects where id=:id", {'id': id1})
row = c.fetchone()
self.assertIsNotNone(row)
self.assertEqual(len(row), 2)
self.assertEqual(row[0], "unittest1")
self.assertEqual(row[1], "Atest")
self.assertEqual(row['name'], "unittest1")
self.assertEqual(row['code'], "Atest")
def test_register_params(self):
self.setup_sample_database()
model5 = {'parA': 2.341, 'parC': 6.785, '_model': 92, '_rfac': 0.453}
self.db.register_params(model5)
expected = ['parA', 'parB', 'parC']
c = self.db._conn.cursor()
c.execute("select * from Params order by key")
results = c.fetchall()
self.assertEqual(len(results), 3)
result_params = [row['key'] for row in results]
self.assertEqual(result_params, expected)
def test_query_project_params(self):
self.setup_sample_database()
project1 = self.db.project_id
self.db.register_project("unittest2", "testcode2")
self.db.register_job(self.db.project_id, "testjob2", "test", "localhost", None, datetime.datetime.now())
model5 = {'parA': 2.341, 'parC': 6.785, '_model': 92, '_rfac': 0.453}
self.db.register_params(model5)
self.db.insert_model(model5)
results = self.db.query_project_params(project_id=project1)
expected = ['parA', 'parB']
self.assertEqual(expected, sorted(results.keys()))
def test_insert_model(self):
self.setup_sample_database()
c = self.db._conn.cursor()
c.execute("select count(*) from Models")
count = c.fetchone()
self.assertEqual(count[0], 1)
c.execute("select * from Models")
row = c.fetchone()
model_id = row['id']
self.assertIsInstance(model_id, int)
self.assertEqual(row['job_id'], self.db.job_id)
self.assertEqual(row['model'], self.ex_model['_model'])
self.assertIsNone(row['gen'])
self.assertIsNone(row['particle'])
sql = "select key, value from ParamValues " + \
"join Params on ParamValues.param_id = Params.id " + \
"where model_id = :model_id"
c.execute(sql, {'model_id': model_id})
result = c.fetchall() # list of Row objects
self.assertEqual(len(result), 2)
for row in result:
self.assertAlmostEqual(row['value'], self.ex_model[row['key']])
def test_query_model(self):
self.setup_sample_database()
c = self.db._conn.cursor()
c.execute("select * from Models")
row = c.fetchone()
model_id = row['id']
model = self.db.query_model(model_id)
del self.ex_model['_model']
del self.ex_model['_rfac']
self.assertEqual(model, self.ex_model)
def test_query_model_array(self):
self.setup_sample_database()
index = {'_scan': -1, '_sym': -1, '_emit': -1, '_region': -1}
model2 = {'parA': 4.123, 'parB': 8.567, '_model': 92, '_rfac': 0.654}
model3 = {'parA': 3.412, 'parB': 7.856, '_model': 93, '_rfac': 0.345}
model4 = {'parA': 4.123, 'parB': 8.567, '_model': 94, '_rfac': 0.354}
model5 = {'parA': 2.341, 'parC': 6.785, '_model': 95, '_rfac': 0.453}
model6 = {'parA': 4.123, 'parB': 8.567, '_model': 96, '_rfac': 0.354}
self.db.register_params(model5)
self.db.create_models_view()
model2.update(index)
model3.update(index)
model4.update(index)
model5.update(index)
model6.update(index)
self.db.insert_result(model2, model2)
self.db.insert_result(model3, model3)
self.db.insert_result(model4, model4)
self.db.insert_result(model5, model5)
self.db.insert_result(model6, model6)
# only model3, model4 and model5 fulfill all conditions and limits
fil = ['mode = "testmode"', 'rfac <= 0.6']
lim = 3
result = self.db.query_model_array(filter=fil, limit=lim)
template = ['parA', 'parB', 'parC', '_model', '_rfac', '_gen', '_particle']
dt = population.Population.get_pop_dtype(template)
expected = np.zeros((lim,), dtype=dt)
expected['parA'] = np.array([3.412, 4.123, 2.341])
expected['parB'] = np.array([7.856, 8.567, None])
expected['parC'] = np.array([None, None, 6.785])
expected['_model'] = np.array([93, 94, 95])
expected['_rfac'] = np.array([0.345, 0.354, 0.453])
expected['_gen'] = np.array([0, 0, 0])
expected['_particle'] = np.array([0, 0, 0])
self.assertEqual(result.shape, expected.shape)
np.testing.assert_array_almost_equal(result['parA'], expected['parA'])
np.testing.assert_array_almost_equal(result['parB'], expected['parB'])
np.testing.assert_array_almost_equal(result['parC'], expected['parC'])
np.testing.assert_array_almost_equal(result['_model'], expected['_model'])
np.testing.assert_array_almost_equal(result['_gen'], expected['_gen'])
np.testing.assert_array_almost_equal(result['_particle'], expected['_particle'])
def test_query_best_results(self):
self.setup_sample_database()
model2 = {'parA': 4.123, 'parB': 8.567, '_model': 92, '_rfac': 0.654, '_gen': 1, '_particle': 2}
model3 = {'parA': 3.412, 'parB': 7.856, '_model': 93, '_rfac': 0.345, '_gen': 1, '_particle': 3}
model4 = {'parA': 4.123, 'parB': 8.567, '_model': 94, '_rfac': 0.354, '_gen': 1, '_particle': 4}
model5 = {'parA': 2.341, 'parC': 6.785, '_model': 95, '_rfac': 0.453, '_gen': 1, '_particle': 5}
model6 = {'parA': 4.123, 'parB': 8.567, '_model': 96, '_rfac': 0.354, '_gen': 1, '_particle': 6}
model7 = {'parA': 5.123, 'parB': 6.567, '_model': 97, '_rfac': 0.154, '_gen': 1, '_particle': 7}
self.db.register_params(model5)
self.db.create_models_view()
model2.update({'_scan': -1, '_sym': 11, '_emit': 21, '_region': 31})
model3.update({'_scan': 1, '_sym': 12, '_emit': 22, '_region': 32})
model4.update({'_scan': 2, '_sym': 11, '_emit': 23, '_region': 33})
model5.update({'_scan': 3, '_sym': 11, '_emit': 24, '_region': 34})
model6.update({'_scan': 4, '_sym': 11, '_emit': 25, '_region': 35})
model7.update({'_scan': 5, '_sym': -1, '_emit': -1, '_region': -1})
self.db.insert_result(model2, model2)
self.db.insert_result(model3, model3)
self.db.insert_result(model4, model4)
self.db.insert_result(model5, model5)
self.db.insert_result(model6, model6)
self.db.insert_result(model7, model7)
# only model3, model4 and model5 fulfill all conditions and limits
fil = ['mode = "testmode"', 'sym = 11']
lim = 3
result = self.db.query_best_results(filter=fil, limit=lim)
ifields = ['_db_job', '_db_model', '_db_result',
'_model', '_scan', '_sym', '_emit', '_region',
'_gen', '_particle']
ffields = ['_rfac']
dt = [(f, 'i8') for f in ifields]
dt.extend([(f, 'f8') for f in ffields])
expected = np.zeros((lim,), dtype=dt)
expected['_rfac'] = np.array([0.354, 0.354, 0.453])
expected['_model'] = np.array([94, 96, 95])
expected['_scan'] = np.array([2, 4, 3])
expected['_sym'] = np.array([11, 11, 11])
expected['_emit'] = np.array([23, 25, 24])
expected['_region'] = np.array([33, 35, 34])
expected['_gen'] = np.array([1, 1, 1])
expected['_particle'] = np.array([4, 6, 5])
self.assertEqual(result.shape, expected.shape)
np.testing.assert_array_almost_equal(result['_rfac'], expected['_rfac'])
np.testing.assert_array_equal(result['_model'], expected['_model'])
np.testing.assert_array_equal(result['_scan'], expected['_scan'])
np.testing.assert_array_equal(result['_sym'], expected['_sym'])
np.testing.assert_array_equal(result['_emit'], expected['_emit'])
np.testing.assert_array_equal(result['_region'], expected['_region'])
np.testing.assert_array_equal(result['_gen'], expected['_gen'])
np.testing.assert_array_equal(result['_particle'], expected['_particle'])
def test_insert_result(self):
self.setup_sample_database()
index = dispatch.CalcID(15, 16, 17, 18, -1)
result = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_particle': 21}
result_id = self.db.insert_result(index, result)
c = self.db._conn.cursor()
c.execute("select count(*) from Results")
count = c.fetchone()
self.assertEqual(count[0], 1)
c.execute("select * from Results")
row = c.fetchone()
self.assertIsInstance(row['id'], int)
self.assertEqual(row['id'], result_id)
model_id = row['model_id']
self.assertIsInstance(model_id, int)
self.assertEqual(row['scan'], index.scan)
self.assertEqual(row['sym'], index.sym)
self.assertEqual(row['emit'], index.emit)
self.assertEqual(row['region'], index.region)
self.assertEqual(row['rfac'], result['_rfac'])
c.execute("select * from Models where id = :model_id", {'model_id': model_id})
row = c.fetchone()
model_id = row['id']
self.assertIsInstance(model_id, int)
self.assertEqual(row['job_id'], self.db.job_id)
self.assertEqual(row['model'], index.model)
self.assertIsNone(row['gen'])
self.assertEqual(row['particle'], result['_particle'])
sql = "select key, value from ParamValues " + \
"join Params on ParamValues.param_id = Params.id " + \
"where model_id = :model_id"
c.execute(sql, {'model_id': model_id})
rows = c.fetchall() # list of Row objects
self.assertEqual(len(rows), 2)
for row in rows:
self.assertAlmostEqual(row['value'], result[row['key']])
def test_update_result(self):
self.setup_sample_database()
index = dispatch.CalcID(15, 16, 17, 18, -1)
result1 = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_particle': 21}
result_id1 = self.db.insert_result(index, result1)
result2 = {'parA': 5.456, '_rfac': 0.254, '_particle': 11}
result_id2 = self.db.insert_result(index, result2)
result3 = result1.copy()
result3.update(result2)
self.assertEqual(result_id1, result_id2)
c = self.db._conn.cursor()
c.execute("select count(*) from Results")
count = c.fetchone()
self.assertEqual(count[0], 1)
c.execute("select * from Results")
row = c.fetchone()
self.assertIsInstance(row['id'], int)
self.assertEqual(row['id'], result_id1)
model_id = row['model_id']
self.assertIsInstance(model_id, int)
self.assertEqual(row['scan'], index.scan)
self.assertEqual(row['sym'], index.sym)
self.assertEqual(row['emit'], index.emit)
self.assertEqual(row['region'], index.region)
self.assertEqual(row['rfac'], result2['_rfac'])
c.execute("select * from Models where id = :model_id", {'model_id': model_id})
row = c.fetchone()
model_id = row['id']
self.assertIsInstance(model_id, int)
self.assertEqual(row['job_id'], self.db.job_id)
self.assertEqual(row['model'], index.model)
self.assertIsNone(row['gen'])
self.assertEqual(row['particle'], result2['_particle'])
sql = "select key, value from ParamValues " + \
"join Params on ParamValues.param_id = Params.id " + \
"where model_id = :model_id"
c.execute(sql, {'model_id': model_id})
rows = c.fetchall() # list of Row objects
self.assertEqual(len(rows), 2)
for row in rows:
self.assertAlmostEqual(row['value'], result3[row['key']])
def test_update_result_dict(self):
"""
test update result with index as dictionary
@return:
"""
self.setup_sample_database()
index = {'_model': 15, '_scan': 16, '_sym': 17, '_emit': 18, '_region': -1}
result1 = {'parA': 4.123, 'parB': 8.567, '_rfac': 0.654, '_particle': 21}
result_id1 = self.db.insert_result(index, result1)
result2 = {'parA': 5.456, '_rfac': 0.254, '_particle': 11}
result_id2 = self.db.insert_result(index, result2)
result3 = result1.copy()
result3.update(result2)
self.assertEqual(result_id1, result_id2)
c = self.db._conn.cursor()
c.execute("select count(*) from Results")
count = c.fetchone()
self.assertEqual(count[0], 1)
c.execute("select * from Results")
row = c.fetchone()
self.assertIsInstance(row['id'], int)
self.assertEqual(row['id'], result_id1)
model_id = row['model_id']
self.assertIsInstance(model_id, int)
self.assertEqual(row['scan'], index['_scan'])
self.assertEqual(row['sym'], index['_sym'])
self.assertEqual(row['emit'], index['_emit'])
self.assertEqual(row['region'], index['_region'])
self.assertEqual(row['rfac'], result2['_rfac'])
c.execute("select * from Models where id = :model_id", {'model_id': model_id})
row = c.fetchone()
model_id = row['id']
self.assertIsInstance(model_id, int)
self.assertEqual(row['job_id'], self.db.job_id)
self.assertEqual(row['model'], index['_model'])
self.assertIsNone(row['gen'])
self.assertEqual(row['particle'], result2['_particle'])
sql = "select key, value from ParamValues " + \
"join Params on ParamValues.param_id = Params.id " + \
"where model_id = :model_id"
c.execute(sql, {'model_id': model_id})
rows = c.fetchall() # list of Row objects
self.assertEqual(len(rows), 2)
for row in rows:
self.assertAlmostEqual(row['value'], result3[row['key']])
def test_query_best_task_models(self):
self.setup_sample_database()
model0xxx = {'_model': 0, '_scan': -1, '_sym': -1, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567, '_rfac': 0.01}
model00xx = {'_model': 1, '_scan': 0, '_sym': -1, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567, '_rfac': 0.02}
model000x = {'_model': 2, '_scan': 0, '_sym': 0, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567, '_rfac': 0.03}
model01xx = {'_model': 3, '_scan': 1, '_sym': -1, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567, '_rfac': 0.04}
model010x = {'_model': 4, '_scan': 1, '_sym': 0, '_emit': -1, '_region': -1, 'parA': 4., 'parB': 8.567, '_rfac': 0.05}
model1xxx = {'_model': 5, '_scan': -1, '_sym': -1, '_emit': -1, '_region': -1, 'parA': 4.123, 'parB': 8.567, '_rfac': 0.09}
model10xx = {'_model': 6, '_scan': 0, '_sym': -1, '_emit': -1, '_region': -1, 'parA': 4.123, 'parB': 8.567, '_rfac': 0.08}
model100x = {'_model': 7, '_scan': 0, '_sym': 0, '_emit': -1, '_region': -1, 'parA': 4.123, 'parB': 8.567, '_rfac': 0.07}
model11xx = {'_model': 8, '_scan': 1, '_sym': -1, '_emit': -1, '_region': -1, 'parA': 4.123, 'parB': 8.567, '_rfac': 0.06}
model110x = {'_model': 9, '_scan': 1, '_sym': 0, '_emit': -1, '_region': -1, 'parA': 4.123, 'parB': 8.567, '_rfac': 0.05}
model2xxx = {'_model': 10, '_scan': -1, '_sym': -1, '_emit': -1, '_region': -1, 'parA': 4.123, 'parB': 8.567, '_rfac': 0.01}
self.db.insert_result(model0xxx, model0xxx)
self.db.insert_result(model00xx, model00xx)
self.db.insert_result(model000x, model000x)
self.db.insert_result(model01xx, model01xx)
self.db.insert_result(model010x, model010x)
self.db.insert_result(model1xxx, model1xxx)
self.db.insert_result(model10xx, model10xx)
self.db.insert_result(model100x, model100x)
self.db.insert_result(model11xx, model11xx)
self.db.insert_result(model110x, model110x)
self.db.insert_result(model2xxx, model2xxx)
result = self.db.query_best_task_models(level=1, count=2)
expected = {0, 1, 3, 6, 8, 10}
self.assertEqual(result, expected)
if __name__ == '__main__':
unittest.main()

117
tests/test_dispatch.py Normal file
View File

@ -0,0 +1,117 @@
"""
@package tests.test_dispatch
unit tests for pmsco.dispatch
the purpose of these tests is to mainly to check the syntax, and correct data types,
i.e. anything that could cause a run-time error.
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2018 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import six
import copy
import unittest
import pmsco.dispatch as dispatch
class TestCalcID(unittest.TestCase):
def setUp(self):
# before each test method
pass
def tearDown(self):
# after each test method
pass
def test_levels(self):
cid = dispatch.CalcID(1, 2, 3, -1, -1)
result = cid.levels
expected = dispatch.CALC_LEVELS
self.assertIsInstance(cid, tuple)
self.assertEqual(result, expected)
def test_level(self):
cid = dispatch.CalcID(1, 2, 3, -1, -1)
result = cid.level
expected = dispatch.CALC_LEVELS[2]
self.assertEqual(result, expected)
cid = dispatch.CalcID(-1, -1, -1, -1, -1)
result = cid.level
expected = ''
self.assertEqual(result, expected)
def test_numeric_level(self):
cid = dispatch.CalcID(1, 2, 3, -1, -1)
result = cid.numeric_level
expected = 2
self.assertEqual(result, expected)
cid = dispatch.CalcID(-1, -1, -1, -1, -1)
result = cid.numeric_level
expected = -1
self.assertEqual(result, expected)
def test_collapse_levels(self):
cid = dispatch.CalcID(1, 2, 3, 4, 5)
result = cid.collapse_levels(dispatch.CALC_LEVELS[2])
expected = dispatch.CalcID(1, 2, 3, -1, -1)
self.assertEqual(result, expected)
result = cid.collapse_levels(-1)
expected = dispatch.CalcID(-1, -1, -1, -1, -1)
self.assertEqual(result, expected)
class TestCalculationTask(unittest.TestCase):
def setUp(self):
self.sample = dispatch.CalculationTask()
self.sample.id = dispatch.CalcID(11, 12, 13, 14, 15)
self.sample.parent_id = dispatch.CalcID(21, 22, 23, 24, 25)
self.sample.model = {'A': 31, 'B': 32}
self.sample.file_root = "testfile"
self.sample.file_ext = ".ext"
self.sample.result_filename = "resultfile"
self.sample.modf_filename = "modffile"
self.sample.result_valid = True
# self.sample.time = datetime.timedelta()
# self.sample.files = {}
# self.sample.region = {}
self.sample.rfac = 0.123456
def tearDown(self):
pass
def test_change_id(self):
result = copy.deepcopy(self.sample)
self.sample.id = dispatch.CalcID(11, 92, 13, 14, 15)
result.change_id(scan=92)
self.assertEqual(result, self.sample)
def test_get_mpi_message(self):
result = self.sample.get_mpi_message()
expected = {'model': 11, 'scan': 12, 'sym': 13, 'emit': 14, 'region': 15}
self.assertEqual(result['id'], expected)
self.assertEqual(result['model'], self.sample.model)
self.assertEqual(result['result_filename'], self.sample.result_filename)
self.assertEqual(result['result_valid'], self.sample.result_valid)
def test_set_mpi_message(self):
result = dispatch.CalculationTask()
msg = self.sample.get_mpi_message()
result.set_mpi_message(msg)
self.assertEqual(result, self.sample)
def test_format_filename(self):
result = self.sample.format_filename(emit=94)
expected = "testfile_11_12_13_94_15.ext"
self.assertEqual(result, expected)

View File

@ -10,24 +10,34 @@ to run the tests, change to the directory which contains the tests directory, an
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2015 by Paul Scherrer Institut @n
@copyright (c) 2015-18 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
import mock
import os
import pmsco.files as files
class TestFileTracker(unittest.TestCase):
"""
unit tests of pmsco.files.FileTracker
for a description of the mock and patch mechanism, cf.
- http://blog.thedigitalcatonline.com/blog/2016/03/06/python-mocks-a-gentle-introduction-part-1
- http://blog.thedigitalcatonline.com/blog/2016/09/27/python-mocks-a-gentle-introduction-part-2
"""
def setUp(self):
# before each test method
self.files = files.FileTracker()
self.files.keep_rfac = 1
self.files._os_delete_file = mock.Mock(return_value=None)
self.files.add_file("model 1 file 1 cluster K", 1, 'cluster')
self.files.add_file("model 1 file 2 output D", 1, 'output')
@ -66,39 +76,76 @@ class TestFileTracker(unittest.TestCase):
pass
def test_add_file(self):
pass
self.assertEqual(self.files.get_file_count(), 10, "file count after setup")
# add a new file of an existing model
self.files.add_file("model 2 file 1 input K", 2, 'input')
self.assertEqual(self.files.get_file_count(), 11, "file count after add")
# change the category of an existing file
self.files.add_file("model 2 file 2 output D", 2, 'report')
self.assertEqual(self.files.get_file_count(), 11, "file count after change")
def test_rename_file(self):
pass
self.files.rename_file("model 2 file 2 output D", "renamed file")
self.assertEqual(self.files.get_file_count(), 10, "file count after rename")
self.files.rename_file("inexistant file", "renamed file")
self.assertEqual(self.files.get_file_count(), 10, "file count after rename")
def test_remove_file(self):
pass
@mock.patch('os.remove')
def test_remove_file(self, os_remove_mock):
self.files.remove_file("model 3 file 1 cluster K")
self.assertEqual(self.files.get_file_count(), 9, "file count after remove")
self.assertFalse(os_remove_mock.called, "file deleted")
def test_update_model_rfac(self):
pass
def test_delete_files(self):
def test_set_model_complete(self):
self.assertEqual(self.files.get_complete_models_count(), 3, "complete model count after setup")
self.files.set_model_complete(3, True)
self.assertEqual(self.files.get_complete_models_count(), 4, "complete model count after add")
self.files.set_model_complete(2, False)
self.assertEqual(self.files.get_complete_models_count(), 3, "complete model count after remove")
self.files.set_model_complete(5, True)
self.assertEqual(self.files.get_complete_models_count(), 3, "complete model count after no change")
@mock.patch('os.remove')
def test_delete_files(self, os_remove_mock):
self.files.keep_rfac = 10
self.files.delete_files()
self.files._os_delete_file.assert_any_call("model 1 file 2 output D")
self.files._os_delete_file.assert_any_call("model 2 file 2 output D")
self.files._os_delete_file.assert_any_call("model 5 file 2 output D")
self.assertEqual(len(self.files._id_by_path), 5+2)
self.assertEqual(len(self.files._path_by_id), 5+2)
self.assertEqual(len(self.files._model_by_id), 5+2)
self.assertEqual(len(self.files._category_by_id), 5+2)
os_remove_mock.assert_any_call(os.path.abspath("model 1 file 2 output D"))
os_remove_mock.assert_any_call(os.path.abspath("model 2 file 2 output D"))
os_remove_mock.assert_any_call(os.path.abspath("model 5 file 2 output D"))
self.assertEqual(self.files.get_file_count(), 5+2)
def test_delete_file(self):
pass
@mock.patch('os.remove')
def test_delete_file(self, os_remove_mock):
self.files.delete_file("model 3 file 1 cluster K")
self.assertEqual(self.files.get_file_count(), 9, "file count after remove")
self.assertTrue(os_remove_mock.called, "file not deleted")
def test_delete_bad_rfac(self):
@mock.patch('os.remove')
def test_delete_bad_rfac(self, os_remove_mock):
self.files.delete_bad_rfac(keep=2, force_delete=True)
self.files._os_delete_file.assert_any_call("model 1 file 1 cluster K")
self.files._os_delete_file.assert_any_call("model 5 file 1 cluster K")
self.assertEqual(len(self.files._id_by_path), 6)
self.assertEqual(len(self.files._path_by_id), 6)
self.assertEqual(len(self.files._model_by_id), 6)
self.assertEqual(len(self.files._category_by_id), 6)
self.assertEqual(self.files.get_file_count(), 6)
os_remove_mock.assert_any_call(os.path.abspath("model 1 file 1 cluster K"))
os_remove_mock.assert_any_call(os.path.abspath("model 5 file 1 cluster K"))
def test_delete_category(self):
pass
@mock.patch('os.remove')
def test_delete_category(self, os_remove_mock):
self.files.delete_category('cluster')
self.assertEqual(self.files.get_file_count(), 7)
os_remove_mock.assert_any_call(os.path.abspath("model 1 file 1 cluster K"))
os_remove_mock.assert_any_call(os.path.abspath("model 2 file 1 cluster K"))
os_remove_mock.assert_any_call(os.path.abspath("model 5 file 1 cluster K"))
@mock.patch('os.remove')
def test_delete_models(self, os_remove_mock):
n = self.files.delete_models()
self.assertEqual(n, 0)
n = self.files.delete_models(keep={2, 5})
self.assertEqual(n, 1)
self.assertEqual(self.files.get_file_count(), 8, "file count after remove")
self.assertTrue(os_remove_mock.called, "file not deleted")
n = self.files.delete_models(delete={2, 3})
self.assertEqual(n, 1)
self.assertEqual(self.files.get_file_count(), 6, "file count after remove")

381
tests/test_genetic.py Normal file
View File

@ -0,0 +1,381 @@
"""
@package tests.test_genetic
unit tests for pmsco.optimizers.genetic
the purpose of these tests is to help debugging the code.
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
@pre nose must be installed (python-nose package on Debian).
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2018 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import numpy as np
import os
import os.path
import random
import shutil
import tempfile
import unittest
import pmsco.optimizers.genetic as mo
import pmsco.project as mp
POP_SIZE = 6
class TestPopulation(unittest.TestCase):
def setUp(self):
random.seed(0)
self._test_dir = ""
self.domain = mp.Domain()
self.domain.add_param('A', 1.5, 1.0, 2.0, 0.1)
self.domain.add_param('B', 2.5, 2.0, 3.0, 0.1)
self.domain.add_param('C', 3.5, 3.0, 4.0, 0.1)
self.expected_names = ('_gen', '_model', '_particle', '_rfac', 'A', 'B', 'C')
self.size = POP_SIZE
self.pop = mo.GeneticPopulation()
self.optimum1 = {'A': 1.045351, 'B': 2.346212, 'C': 3.873627}
self.optimum2 = {'A': 1.045351, 'B': 2.346212, 'C': 4.873627}
def tearDown(self):
# after each test method
self.pop = None
if self._test_dir:
shutil.rmtree(self._test_dir)
@property
def test_dir(self):
if not self._test_dir:
self._test_dir = tempfile.mkdtemp()
return self._test_dir
@classmethod
def setup_class(cls):
# before any methods in this class
pass
@classmethod
def teardown_class(cls):
# teardown_class() after any methods in this class
pass
def rfactor1(self, pos):
r = (pos['A'] - self.optimum1['A']) ** 2 \
+ (pos['B'] - self.optimum1['B']) ** 2 \
+ (pos['C'] - self.optimum1['C']) ** 2
r /= 3.0
return r
def rfactor2(self, pos):
"""
R-factor function with multiple local minima
global minimum R = 0.0138591 at A = 1.745, B = 2.395, C = 3.755.
domain A = 1.0:2.0, B = 2.0:3.0, C = 3.0:4.0
@param pos: dict-like position with keys 'A', 'B' and 'C'.
@return: R-factor
"""
xa = (pos['A'] - pos['B']) * 12
xb = (pos['B'] - pos['C']) * 15
xc = (pos['C'] - pos['A']) * 18
da = pos['A'] - 1.8
db = pos['B'] - 2.3
dc = pos['C'] - 3.8
aa = 1.0
ab = 1.0
ac = 1.0
ba = 0.4
bb = 0.8
bc = 1.2
r = aa * math.sin(xa) + ab * math.sin(xb) + ac * math.sin(xc)
r += ba * da**2 + bb * db**2 + bc * dc**2 + 3.0
return r
def test_setup(self):
self.pop.setup(self.size, self.domain)
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
np.testing.assert_array_equal(np.arange(POP_SIZE), self.pop.pos['_particle'])
np.testing.assert_array_equal(np.zeros(POP_SIZE), self.pop.pos['_gen'])
np.testing.assert_array_equal(np.arange(POP_SIZE), self.pop.pos['_model'])
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertEqual(0, self.pop.generation)
self.assertEqual(POP_SIZE, self.pop.model_count)
def test_setup_with_results(self):
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, seed_file=data_file, recalc_seed=False)
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(self.pop.generation, 0)
self.assertEqual(self.pop.model_count, POP_SIZE)
np.testing.assert_array_equal(self.pop.pos['_particle'], np.arange(POP_SIZE))
np.testing.assert_array_equal(self.pop.pos['_gen'], [0, 0, -1, 0, 0, 0])
np.testing.assert_array_equal(self.pop.pos['_model'], np.arange(POP_SIZE))
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(0.6, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][2], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][2], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][0], 3)
self.assertGreaterEqual(4.0, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][2], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][0], 3)
def test_setup_with_results_recalc(self):
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, seed_file=data_file, recalc_seed=True)
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(self.pop.generation, 0)
self.assertEqual(self.pop.model_count, POP_SIZE)
np.testing.assert_array_equal(self.pop.pos['_particle'], np.arange(POP_SIZE))
np.testing.assert_array_equal(self.pop.pos['_gen'], [0, 0, 0, 0, 0, 0])
np.testing.assert_array_equal(self.pop.pos['_model'], np.arange(POP_SIZE))
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][2], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][2], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][0], 3)
self.assertGreaterEqual(4.0, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][2], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][0], 3)
def test_pos_gen(self):
self.pop.setup(self.size, self.domain)
for index, item in enumerate(self.pop.pos_gen()):
self.assertIsInstance(item, dict)
self.assertEqual(set(item.keys()), set(self.expected_names))
self.assertEqual(item['_particle'], index)
def test_randomize(self):
self.pop.setup(self.size, self.domain)
self.pop.randomize()
self.assertTrue(np.all(self.pop.pos['A'] >= self.domain.min['A']))
self.assertTrue(np.all(self.pop.pos['A'] <= self.domain.max['A']))
self.assertGreater(np.std(self.pop.pos['A']), self.domain.step['A'])
def test_seed(self):
self.pop.setup(self.size, self.domain)
self.pop.seed(self.domain.start)
self.assertAlmostEqual(self.pop.pos['A'][0], self.domain.start['A'], delta=0.001)
def test_add_result(self):
self.pop.setup(self.size, self.domain)
i_sample = 1
i_result = 0
result = self.pop.pos[i_sample]
self.pop.add_result(result, 0.0)
self.assertEqual(self.pop.results.shape[0], 1)
self.assertEqual(self.pop.results[i_result], result)
self.assertEqual(self.pop.best[i_sample], result)
def test_is_converged(self):
self.pop.setup(self.size, self.domain)
self.assertFalse(self.pop.is_converged())
i_sample = 0
result = self.pop.pos[i_sample]
for i in range(POP_SIZE):
rfac = 1.0 - float(i) / POP_SIZE
self.pop.add_result(result, rfac)
self.assertFalse(self.pop.is_converged())
for i in range(POP_SIZE):
rfac = (1.0 - float(i) / POP_SIZE) / 1000.0
self.pop.add_result(result, rfac)
self.assertTrue(self.pop.is_converged())
def test_save_population(self):
self.pop.setup(self.size, self.domain)
filename = os.path.join(self.test_dir, "test_save_population.pop")
self.pop.save_population(filename)
def test_save_results(self):
self.pop.setup(self.size, self.domain)
i_sample = 1
result = self.pop.pos[i_sample]
self.pop.add_result(result, 1.0)
filename = os.path.join(self.test_dir, "test_save_results.dat")
self.pop.save_results(filename)
def test_save_array(self):
self.pop.setup(self.size, self.domain)
filename = os.path.join(self.test_dir, "test_save_array.pos")
self.pop.save_array(filename, self.pop.pos)
def test_load_array(self):
n = 3
filename = os.path.join(self.test_dir, "test_load_array")
self.pop.setup(self.size, self.domain)
# expected array
dt_exp = self.pop.get_pop_dtype(self.domain.start)
a_exp = np.zeros((n,), dtype=dt_exp)
a_exp['A'] = np.linspace(0, 1, n)
a_exp['B'] = np.linspace(1, 2, n)
a_exp['C'] = np.linspace(3, 4, n)
a_exp['_rfac'] = np.linspace(5, 6, n)
a_exp['_gen'] = np.array([3, 4, 7])
a_exp['_particle'] = np.array([1, 0, 2])
a_exp['_model'] = np.array([3, 6, 1])
# test array is a expected array with different column order
dt_test = [('A', 'f4'), ('_particle', 'i4'), ('_rfac', 'f4'), ('C', 'f4'), ('_gen', 'i4'), ('B', 'f4'),
('_model', 'i4')]
names_test = [a[0] for a in dt_test]
a_test = np.zeros((n,), dtype=dt_test)
for name in names_test:
a_test[name] = a_exp[name]
header = " ".join(names_test)
np.savetxt(filename, a_test, fmt='%g', header=header)
result = np.zeros((n,), dtype=dt_exp)
result = self.pop.load_array(filename, result)
self.assertEqual(result.dtype.names, a_exp.dtype.names)
for name in a_exp.dtype.names:
np.testing.assert_almost_equal(result[name], a_exp[name], err_msg=name)
def test_mate_parents(self):
self.pop.setup(self.size, self.domain)
pos1 = self.pop.pos.copy()
parents = self.pop.mate_parents(pos1)
self.assertEqual(len(parents), pos1.shape[0] / 2)
def test_crossover(self):
self.pop.setup(self.size, self.domain)
p1 = self.pop.pos[2].copy()
p2 = self.pop.pos[3].copy()
c1, c2 = self.pop.crossover(p1, p2)
self.assertIsInstance(c1, np.void)
self.assertIsInstance(c2, np.void)
self.assertEqual(c1['_particle'], p1['_particle'])
self.assertEqual(c2['_particle'], p2['_particle'])
for name in self.domain.start:
self.assertAlmostEqual(c1[name] + c2[name], p1[name] + p2[name], msg=name)
def test_mutate_weak(self):
self.pop.setup(self.size, self.domain)
p1 = self.pop.pos[3].copy()
c1 = p1.copy()
self.pop.mutate_weak(c1, 1.0)
self.assertEqual(c1['_particle'], p1['_particle'])
self.assertNotAlmostEqual(c1['A'], p1['A'])
self.assertNotAlmostEqual(c1['B'], p1['B'])
self.assertNotAlmostEqual(c1['C'], p1['C'])
def test_mutate_strong(self):
self.pop.setup(self.size, self.domain)
p1 = self.pop.pos[3].copy()
c1 = p1.copy()
self.pop.mutate_strong(c1, 1.0)
self.assertEqual(c1['_particle'], p1['_particle'])
self.assertNotAlmostEqual(c1['A'], p1['A'])
self.assertNotAlmostEqual(c1['B'], p1['B'])
self.assertNotAlmostEqual(c1['C'], p1['C'])
def test_advance_population(self):
self.pop.setup(self.size, self.domain)
p1 = {'A': np.linspace(1.0, 2.0, POP_SIZE),
'B': np.linspace(2.0, 3.0, POP_SIZE),
'C': np.linspace(3.0, 4.0, POP_SIZE)}
self.pop.pos['A'] = p1['A']
self.pop.pos['B'] = p1['B']
self.pop.pos['C'] = p1['C']
for pos in self.pop.pos:
pos['_rfac'] = self.rfactor1(pos)
self.pop._hold_once = False
self.pop.weak_mutation_probability = 1.
self.pop.strong_mutation_probability = 0.
self.pop.advance_population()
for name, value in p1.items():
self.assertTrue(np.any(abs(self.pop.pos[name] - value) >= 0.001), msg=name)
def test_convergence_1(self):
self.pop.setup(self.size, self.domain)
self.pop.pos['A'] = np.linspace(1.0, 2.0, POP_SIZE)
self.pop.pos['B'] = np.linspace(2.0, 3.0, POP_SIZE)
self.pop.pos['C'] = np.linspace(3.0, 4.0, POP_SIZE)
self.pop.pos['_rfac'] = np.linspace(2.0, 1.0, POP_SIZE)
best_rfactors = []
for i in range(10):
self.pop.advance_population()
for pos in self.pop.pos:
self.pop.add_result(pos, self.rfactor1(pos))
best_rfactors.append(self.pop.best['_rfac'].min())
self.assertLess(best_rfactors[-1], best_rfactors[0])
def optimize_rfactor_2(self, pop_size, iterations):
self.size = pop_size
self.pop.setup(self.size, self.domain)
for i in range(iterations):
self.pop.advance_population()
for pos in self.pop.pos:
self.pop.add_result(pos, self.rfactor2(pos))
@unittest.skip("test_convergence_2 is unreliable")
def test_convergence_2(self):
"""
there is a certain probability that this test fails.
@return:
"""
self.pop.weak_mutation_probability = 1.
self.pop.strong_mutation_probability = 0.01
self.optimize_rfactor_2(10, 200)
ibest = self.pop.results['_rfac'].argmin()
best = self.pop.results[ibest]
self.assertLess(best['_rfac'], 0.2)
self.assertAlmostEqual(best['A'], 1.745, delta=0.1)
self.assertAlmostEqual(best['B'], 2.395, delta=0.1)
self.assertAlmostEqual(best['C'], 3.755, delta=0.1)
if __name__ == '__main__':
unittest.main()

View File

@ -1,48 +0,0 @@
"""
@package tests.test_hbncu
unit tests for projects.hbncu
the purpose of these tests is to help debugging the code.
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
@pre nose must be installed (python-nose package on Debian).
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2015 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
import unittest
import os.path
import tempfile
import shutil
import numpy as np
import projects.hbncu.hbncu as hbncu
import pmsco.data as data
import pmsco.dispatch as dispatch
class TestHbncuProject(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.project = hbncu.HbncuProject()
def tearDown(self):
# after each test method
self.project = None
shutil.rmtree(self.test_dir)
@classmethod
def setup_class(cls):
# before any methods in this class
pass
@classmethod
def teardown_class(cls):
# teardown_class() after any methods in this class
pass

616
tests/test_population.py Normal file
View File

@ -0,0 +1,616 @@
"""
@package tests.test_population
unit tests for @ref pmsco.optimizers.population
the purpose of these tests is to help debugging the code.
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
@pre nose must be installed (python-nose package on Debian).
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2015 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import six
import numpy as np
import os
import os.path
import random
import shutil
import tempfile
import unittest
import pmsco.optimizers.population as population
import pmsco.project as project
from pmsco.helpers import BraceMessage as BMsg
POP_SIZE = 5
class TestPopulation(unittest.TestCase):
def setUp(self):
random.seed(0)
self.test_dir = tempfile.mkdtemp()
self.domain = project.Domain()
self.domain.add_param('A', 1.5, 1.0, 2.0, 0.1)
self.domain.add_param('B', 2.5, 2.0, 3.0, 0.1)
self.domain.add_param('C', 3.5, 3.0, 4.0, 0.1)
self.expected_names = ('_gen', '_model', '_particle', '_rfac', 'A', 'B', 'C')
self.size = POP_SIZE
self.pop = population.Population()
self.optimum1 = {'A': 1.045351, 'B': 2.346212, 'C': 3.873627}
def tearDown(self):
# after each test method
self.pop = None
shutil.rmtree(self.test_dir)
@classmethod
def setup_class(cls):
# before any methods in this class
pass
@classmethod
def teardown_class(cls):
# teardown_class() after any methods in this class
pass
def reorder_pop_array(self, x):
"""
re-order the columns of a population-like array.
this makes it easier to create constant arrays with arbitrary column order
since population arrays are ordered alphabetically.
@param x: numpy structured array
@return: numpy structured array having the same dtype as the population arrays.
"""
y = np.zeros(x.shape, dtype=self.pop.get_pop_dtype(self.pop.model_start))
for name in y.dtype.names:
y[name] = x[name]
return y
def assert_pop_array_equal(self, x, y, msg=""):
"""
array equality test for population-like structured numpy arrays.
- check for same column names and kinds
- check for same shape
- check float columns using np.testing.assert_array_almost_equal
- check all other columns using np.testing.assert_array_equal
@param x: structured numpy array.
@param y: structured numpy array.
@param msg: optional error message.
@return: None
@raise AssertionError if arrays are not equal.
"""
self.assertEqual(x.dtype.names, y.dtype.names, msg=BMsg("{0} (dtype)", msg))
self.assertEqual(x.shape, y.shape, msg=BMsg("{0} (shape)", msg))
for n, t in x.dtype.fields.items():
self.assertEqual(t[0].kind, y.dtype.fields[n][0].kind, msg=BMsg("{0} (column {1} kind)", msg, n))
if t[0].kind in {'f', 'c'}:
np.testing.assert_array_almost_equal(x[n], y[n], err_msg="{0} (column {1})".format(msg, n))
else:
np.testing.assert_array_equal(x[n], y[n], err_msg="{0} (column {1})".format(msg, n))
def rfactor1(self, pos):
r = (pos['A'] - self.optimum1['A']) ** 2 \
+ (pos['B'] - self.optimum1['B']) ** 2 \
+ (pos['C'] - self.optimum1['C']) ** 2
r /= 3.0
return r
def test_setup(self):
self.pop.setup(self.size, self.domain)
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
np.testing.assert_array_equal(np.arange(POP_SIZE), self.pop.pos['_particle'])
np.testing.assert_array_equal(np.zeros(POP_SIZE), self.pop.pos['_gen'])
np.testing.assert_array_equal(np.arange(POP_SIZE), self.pop.pos['_model'])
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertEqual(0, self.pop.generation)
self.assertEqual(POP_SIZE, self.pop.model_count)
def test_setup_with_results(self):
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, seed_file=data_file, recalc_seed=False)
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(self.pop.generation, 0)
self.assertEqual(self.pop.model_count, POP_SIZE)
np.testing.assert_array_equal(self.pop.pos['_particle'], np.arange(POP_SIZE))
np.testing.assert_array_equal(self.pop.pos['_gen'], [0, 0, -1, 0, 0])
np.testing.assert_array_equal(self.pop.pos['_model'], np.arange(POP_SIZE))
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(0.6, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][2], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][2], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][0], 3)
self.assertGreaterEqual(4.0, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][2], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][0], 3)
def test_setup_with_results_recalc(self):
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, seed_file=data_file, recalc_seed=True)
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(self.pop.generation, 0)
self.assertEqual(self.pop.model_count, POP_SIZE)
np.testing.assert_array_equal(self.pop.pos['_particle'], np.arange(POP_SIZE))
np.testing.assert_array_equal(self.pop.pos['_gen'], [0, 0, 0, 0, 0])
np.testing.assert_array_equal(self.pop.pos['_model'], np.arange(POP_SIZE))
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][2], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][2], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][0], 3)
self.assertGreaterEqual(4.0, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][2], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][0], 3)
def test_setup_with_partial_results(self):
self.domain.add_param('D', 4.5, 4.0, 5.0, 0.1)
self.expected_names = ('_gen', '_model', '_particle', '_rfac', 'A', 'B', 'C', 'D')
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, seed_file=data_file, recalc_seed=False)
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(self.pop.generation, 0)
self.assertEqual(self.pop.model_count, POP_SIZE)
np.testing.assert_array_equal(self.pop.pos['_particle'], np.arange(POP_SIZE))
np.testing.assert_array_equal(self.pop.pos['_gen'], [0, 0, -1, 0, 0])
np.testing.assert_array_equal(self.pop.pos['_model'], np.arange(POP_SIZE))
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(0.6, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][2], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][2], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][0], 3)
self.assertGreaterEqual(4.0, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][2], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][0], 3)
def test_pos_gen(self):
self.pop.setup(self.size, self.domain)
for index, item in enumerate(self.pop.pos_gen()):
self.assertIsInstance(item, dict)
self.assertEqual(set(item.keys()), set(self.expected_names))
self.assertEqual(item['_particle'], index)
def test_get_pop_dtype(self):
result = population.Population.get_pop_dtype({'A': 1.045, 'C': 3.873, '_D': 4.381, 'B': 2.346})
expected = [('_gen', 'i8'), ('_model', 'i8'), ('_particle', 'i8'), ('_rfac', 'f8'),
('A', 'f8'), ('B', 'f8'), ('C', 'f8')]
self.assertEqual(result, expected)
result = population.Population.get_pop_dtype(['A', 'C', '_D', 'B'])
self.assertEqual(result, expected)
def test_get_model_dtype(self):
result = population.Population.get_model_dtype({'A': 1.045, 'C': 3.873, '_D': 4.381, 'B': 2.346})
expected = [('A', 'f8'), ('B', 'f8'), ('C', 'f8')]
self.assertEqual(result, expected)
result = population.Population.get_model_dtype(['A', 'C', '_D', 'B'])
self.assertEqual(result, expected)
def test_get_model_array(self):
result = population.Population.get_model_array({'A': 1.045, 'C': 3.873, '_D': 4.381, 'B': 2.346})
dt = [('A', 'f8'), ('B', 'f8'), ('C', 'f8')]
expected = np.array([(1.045, 2.346, 3.873)], dtype=dt)
np.testing.assert_array_equal(result, expected)
def test_randomize(self):
self.pop.setup(self.size, self.domain)
self.pop.randomize()
m = np.mean(self.pop.pos['A'])
self.assertGreaterEqual(m, self.domain.min['A'])
self.assertLessEqual(m, self.domain.max['A'])
def test_seed(self):
self.pop.setup(self.size, self.domain)
self.pop.seed(self.domain.start)
self.assertAlmostEqual(self.pop.pos['A'][0], self.domain.start['A'], delta=0.001)
def test_add_result(self):
self.pop.setup(self.size, self.domain)
i_sample = 1
i_result = 0
result = self.pop.pos[i_sample]
self.pop.add_result(result, 0.0)
self.assertEqual(self.pop.results.shape[0], 1)
self.assertEqual(self.pop.results[i_result], result)
self.assertEqual(self.pop.best[i_sample], result)
def test_save_population(self):
self.pop.setup(self.size, self.domain)
filename = os.path.join(self.test_dir, "test_save_population.pop")
self.pop.save_population(filename)
def test_save_results(self):
self.pop.setup(self.size, self.domain)
i_sample = 1
result = self.pop.pos[i_sample]
self.pop.add_result(result, 1.0)
filename = os.path.join(self.test_dir, "test_save_results.dat")
self.pop.save_results(filename)
def test_save_array(self):
self.pop.setup(self.size, self.domain)
filename = os.path.join(self.test_dir, "test_save_array.pos")
self.pop.save_array(filename, self.pop.pos)
def test_load_array(self):
n = 3
filename = os.path.join(self.test_dir, "test_load_array")
self.pop.setup(self.size, self.domain)
# expected array
dt_exp = self.pop.get_pop_dtype(self.domain.start)
a_exp = np.zeros((n,), dtype=dt_exp)
a_exp['A'] = np.linspace(0, 1, n)
a_exp['B'] = np.linspace(1, 2, n)
a_exp['C'] = np.linspace(3, 4, n)
a_exp['_rfac'] = np.linspace(5, 6, n)
a_exp['_gen'] = np.array([3, 4, 7])
a_exp['_particle'] = np.array([1, 0, 2])
a_exp['_model'] = np.array([3, 6, 1])
# test array is a expected array with different column order
dt_test = [('A', 'f4'), ('_particle', 'i4'), ('_rfac', 'f4'), ('C', 'f4'), ('_gen', 'i4'), ('B', 'f4'),
('_model', 'i4')]
names_test = [a[0] for a in dt_test]
a_test = np.zeros((n,), dtype=dt_test)
for name in names_test:
a_test[name] = a_exp[name]
header = " ".join(names_test)
np.savetxt(filename, a_test, fmt='%g', header=header)
result = np.zeros((n,), dtype=dt_exp)
result = self.pop.load_array(filename, result)
self.assert_pop_array_equal(result, a_exp)
def test_constrain_position(self):
# upper
pos1 = 11.0
vel1 = 5.0
min1 = 0.0
max1 = 10.0
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 're-enter')
self.assertAlmostEqual(pos2, 1.0)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'bounce')
self.assertAlmostEqual(pos2, 9.0)
self.assertAlmostEqual(vel2, -vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'scatter')
self.assertGreaterEqual(pos2, 6.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, 0.0)
self.assertLessEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'stick')
self.assertAlmostEqual(pos2, max1)
self.assertAlmostEqual(vel2, max1 - (pos1 - vel1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'random')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, -(max1 - min1))
self.assertLessEqual(vel2, (max1 - min1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'expand')
self.assertAlmostEqual(pos2, pos1)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max2, pos1)
self.assertRaises(ValueError, population.Population.constrain_position, pos1, vel1, min1, max1, 'error')
# lower
pos1 = -1.0
vel1 = -5.0
min1 = 0.0
max1 = 10.0
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 're-enter')
self.assertAlmostEqual(pos2, 9.0)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'bounce')
self.assertAlmostEqual(pos2, 1.0)
self.assertAlmostEqual(vel2, -vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'scatter')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 4.0)
self.assertGreaterEqual(vel2, vel1)
self.assertLessEqual(vel2, 0.0)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'stick')
self.assertAlmostEqual(pos2, min1)
self.assertAlmostEqual(vel2, min1 - (pos1 - vel1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'random')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, -(max1 - min1))
self.assertLessEqual(vel2, max1 - min1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = population.Population.constrain_position(pos1, vel1, min1, max1, 'expand')
self.assertAlmostEqual(pos2, pos1)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min2, pos1)
self.assertAlmostEqual(max2, max1)
self.assertRaises(ValueError, population.Population.constrain_position, pos1, vel1, min1, max1, 'error')
def test_patch_from_file(self):
self.pop.setup(self.size, self.domain)
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
patch_size = 4
expected_pos = {'A': [1.1, 1.2, 1.3, 1.4], 'B': [2.1, 2.2, 2.3, 2.4], 'C': [3.1, 3.2, 4.3, 3.4]}
self.pop.patch_from_file(data_file)
self.assertIsInstance(self.pop.pos_patch, np.ndarray)
self.assertEqual(self.pop.pos_patch.shape, (patch_size,))
np.testing.assert_array_equal(self.pop.pos_patch['_particle'], [1, 2, 3, 4])
np.testing.assert_array_equal(self.pop.pos_patch['_gen'], [1, 2, 3, 4])
np.testing.assert_array_almost_equal(self.pop.pos_patch['A'], expected_pos['A'])
np.testing.assert_array_almost_equal(self.pop.pos_patch['B'], expected_pos['B'])
np.testing.assert_array_almost_equal(self.pop.pos_patch['C'], expected_pos['C'])
def test_apply_patch(self):
self.pop.setup(self.size, self.domain)
expected_pos = self.pop.pos.copy()
dt_test = [('A', 'f4'), ('_particle', 'i4'), ('_rfac', 'f4'), ('C', 'f4'), ('_model', 'i4')]
patch_size = 3
self.pop.pos_patch = np.zeros(patch_size, dtype=dt_test)
self.pop.pos_patch['_particle'] = np.array([1, 2, 3])
self.pop.pos_patch['_rfac'] = np.random.random_sample(patch_size)
self.pop.pos_patch['_model'] = np.arange(patch_size)
self.pop.pos_patch['A'] = np.array([1.1, 2.2, 3.3])
self.pop.pos_patch['C'] = np.array([1.1, 2.2, 3.3])
expected_pos['A'][1] = 1.1
expected_pos['C'][3] = 3.3
expected_pos['_particle'] = np.arange(POP_SIZE)
expected_pos['_gen'] = 0
expected_pos['_model'] = np.arange(POP_SIZE)
expected_pos['_rfac'] = 2.1
self.pop._apply_patch()
self.assertIsNone(self.pop.pos_patch)
self.assertEqual(self.pop.generation, 0)
self.assertEqual(self.pop.model_count, POP_SIZE)
self.assert_pop_array_equal(self.pop.pos, expected_pos)
def test_find_result(self):
self.domain.min['A'] = -0.1
self.domain.max['A'] = 0.1
self.domain.min['B'] = 0.0
self.domain.max['B'] = 1000.
self.domain.min['C'] = 9.
self.domain.max['C'] = 9.001
self.size = 100
self.pop.setup(self.size, self.domain)
self.pop.results = self.pop.pos.copy()
expected_index = 77
names = ['A', 'B', 'C']
target0 = {key: self.pop.results[key][expected_index] for key in names}
target1 = self.pop.results[names].copy()
target1 = target1[expected_index]
target2 = {key: target0[key] for key in ['C', 'B']}
target3 = target2.copy()
target3['C'] += 9.e-6
target4 = {'A': random.random(), 'B': random.random(), 'C': 9.002}
result0 = self.pop.find_model(target0)
self.assertEqual(result0, expected_index)
result1 = self.pop.find_model(target1)
self.assertEqual(result1, expected_index)
result2 = self.pop.find_model(target2)
self.assertEqual(result2, expected_index)
result3 = self.pop.find_model(target3)
self.assertEqual(result3, expected_index)
self.assertRaises(ValueError, self.pop.find_model, target4)
def test_import_positions(self):
"""
test the population.Population.import_positions method.
check the different type conversions.
the main work is in test_import_positions_array.
"""
self.pop.setup(self.size, self.domain)
source_type = [('A', 'f4'), ('B', 'f4'), ('C', 'f4'), ('D', 'f4'),
('_model', 'i4'), ('_particle', 'i4'), ('_gen', 'i4'), ('_rfac', 'f4')]
source = np.array([(1.5, 3.5, 3.5, 4.5, 51, 52, 53, 0.5),
(1.6, 2.6, 2.6, 4.6, 61, 62, 63, 0.6),
(1.7, 2.7, 3.7, 4.7, 71, 72, 73, 0.7),
(1.8, 2.8, 3.8, 4.8, 81, 82, 83, 0.8)], dtype=source_type)
expected_type = [('_gen', 'i8'), ('_model', 'i8'), ('_particle', 'i8'), ('_rfac', 'f8'),
('A', 'f8'), ('B', 'f8'), ('C', 'f8')]
expected = np.array([(0, 0, 0, 0.0, 1.5, 3.5, 3.5),
(0, 0, 0, 0.0, 1.6, 2.6, 2.6),
(0, 0, 0, 0.0, 1.7, 2.7, 3.7),
(0, 0, 0, 0.0, 1.8, 2.8, 3.8)], dtype=expected_type)
expected_single = np.array([expected[0]], dtype=expected_type)
# None
self.pop.clear_import()
source_none = None
retval = self.pop.import_positions(source_none)
self.assertEqual(retval, 0)
# np.ndarray
self.pop.clear_import()
source_ndarray = source
retval = self.pop.import_positions(source_ndarray)
self.assertEqual(retval, expected.shape[0])
self.assert_pop_array_equal(self.pop.pos_import, expected, msg="numpy.ndarray")
# np.void
self.pop.clear_import()
source_void = source[0]
retval = self.pop.import_positions(source_void)
self.assertEqual(retval, 1)
self.assert_pop_array_equal(self.pop.pos_import, expected_single, msg="numpy.void")
# dict
self.pop.clear_import()
source_dict = {k: source[0][k] for k in source.dtype.names}
retval = self.pop.import_positions(source_dict)
self.assertEqual(retval, 1)
self.assert_pop_array_equal(self.pop.pos_import, expected_single, msg="dict")
# sequence
self.pop.clear_import()
source_sequence = [row for row in source]
retval = self.pop.import_positions(source_sequence)
self.assertEqual(retval, expected.shape[0])
self.assert_pop_array_equal(self.pop.pos_import, expected, msg="sequence")
# file
filename = os.path.join(self.test_dir, "test_import_positions")
self.pop.save_array(filename, source)
self.pop.clear_import()
retval = self.pop.import_positions(filename)
self.assertEqual(retval, expected.shape[0])
self.assert_pop_array_equal(self.pop.pos_import, expected, msg="file")
# callable
def source_callable():
return expected.copy()
self.pop.clear_import()
retval = self.pop.import_positions(source_callable)
self.assertEqual(retval, expected.shape[0])
self.assert_pop_array_equal(self.pop.pos_import, expected, msg="callable")
def test_import_positions_array(self):
"""
test the population.Population.import_positions_array method.
- import a data array, check type and size.
- add to previous import.
- ignore control fields.
- no range or duplicate checking.
- missing parameter.
"""
self.pop.setup(self.size, self.domain)
source_type = [('A', 'f4'), ('B', 'f4'), ('C', 'f4'), ('D', 'f4'),
('_model', 'i4'), ('_particle', 'i4'), ('_gen', 'i4'), ('_rfac', 'f4')]
source = np.array([(1.0, 0.0, 0.0, 0.0, 0, 0, 0, 0.0),
(1.5, 3.5, 3.5, 4.5, 51, 52, 53, 0.5),
(1.6, 2.6, 2.6, 4.6, 61, 62, 63, 0.6),
(1.7, 2.7, 3.7, 4.7, 71, 72, 73, 0.7),
(1.8, 2.8, 3.8, 4.8, 81, 82, 83, 0.8)], dtype=source_type)
self.pop.pos_import.resize(1)
self.pop.pos_import[0]['A'] = 1.0
expected_type = [('_gen', 'i8'), ('_model', 'i8'), ('_particle', 'i8'), ('_rfac', 'f8'),
('A', 'f8'), ('B', 'f8'), ('C', 'f8')]
expected = np.array([(0, 0, 0, 0.0, 1.0, 0.0, 0.0),
(0, 0, 0, 0.0, 1.0, 0.0, 0.0),
(0, 0, 0, 0.0, 1.5, 3.5, 3.5),
(0, 0, 0, 0.0, 1.6, 2.6, 2.6),
(0, 0, 0, 0.0, 1.7, 2.7, 3.7),
(0, 0, 0, 0.0, 1.8, 2.8, 3.8)], dtype=expected_type)
retval = self.pop.import_positions_array(source)
self.assertEqual(retval, source.shape[0])
self.assertEqual(self.pop.pos_import.dtype.names, self.pop.pos.dtype.names)
self.assert_pop_array_equal(self.pop.pos_import, expected, msg="imported array")
source_type = [('B', 'f4'), ('C', 'f4'), ('D', 'f4'), ('E', 'f4'),
('_model', 'i4'), ('_particle', 'i4'), ('_gen', 'i4'), ('_rfac', 'f4')]
source = np.array([(1.0, 0.0, 0.0, 0.0, 0, 0, 0, 0.0),
(1.8, 2.8, 3.8, 4.8, 81, 82, 83, 0.8)], dtype=source_type)
self.assertRaises(ValueError, self.pop.import_positions_array, source)
def test_advance_from_import(self):
"""
test the population.Population.advance_from_import method.
- array type and size.
- range checks.
- de-duplication.
"""
self.pop.setup(self.size, self.domain)
self.pop.position_constrain_mode = 'error'
source_type = [('A', 'f8'), ('B', 'f8'), ('C', 'f8'),
('_model', 'i8'), ('_particle', 'i8'), ('_gen', 'i8'), ('_rfac', 'f8')]
source = np.array([(1.3, 2.3, 3.3, 0, 0, 0, 0.0),
(1.3, 2.3, 3.3, 0, 0, 0, 0.0),
(1.4, 2.4, 3.4, 0, 0, 0, 0.0),
(1.5, 3.5, 3.5, 0, 0, 0, 0.0),
(1.6, 2.6, 2.6, 0, 0, 0, 0.0),
(1.7, 2.7, 3.7, 0, 0, 0, 0.0),
(1.8, 2.8, 3.8, 0, 0, 0, 0.0)], dtype=source_type)
source = self.reorder_pop_array(source)
results = np.array([(1.4, 2.4, 3.4, 0, 0, 0, 0.4)], dtype=source_type)
results = self.reorder_pop_array(results)
expected = np.array([(1.3, 2.3, 3.3, POP_SIZE + 0, 0, 0, 2.1),
(1.7, 2.7, 3.7, POP_SIZE + 1, 1, 0, 2.1),
(1.8, 2.8, 3.8, POP_SIZE + 2, 2, 0, 2.1)], dtype=source_type)
expected = self.reorder_pop_array(expected)
self.pop.pos_import = source
self.pop.results = results
retval = self.pop.advance_from_import()
self.assertEqual(retval, expected.shape[0])
self.assertEqual(self.pop.size_act, expected.shape[0])
pos_act = self.pop.pos[0:self.pop.size_act]
self.assert_pop_array_equal(pos_act, expected)
if __name__ == '__main__':
unittest.main()

82
tests/test_project.py Normal file
View File

@ -0,0 +1,82 @@
"""
@package tests.test_project
unit tests for pmsco.project.
the purpose of these tests is to help debugging the code.
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
@pre nose and mock must be installed.
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2015-18 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
import mock
import numpy as np
import pmsco.data as data
import pmsco.dispatch as dispatch
import pmsco.project as project
class TestProject(unittest.TestCase):
def setUp(self):
# before each test method
self.project = project.Project()
def tearDown(self):
# after each test method
pass
@classmethod
def setup_class(cls):
# before any methods in this class
pass
@classmethod
def teardown_class(cls):
# teardown_class() after any methods in this class
pass
@mock.patch('pmsco.data.load_data')
@mock.patch('pmsco.data.save_data')
def test_combine_symmetries(self, save_data_mock, load_data_mock):
self.project.scans.append(project.Scan())
parent_task = dispatch.CalculationTask()
parent_task.change_id(model=0, scan=0)
parent_task.model['wsym1'] = 0.5
child_tasks = [parent_task.copy()] * 2
for idx, task in enumerate(child_tasks):
task.change_id(sym=idx)
data1 = data.create_data(5, datatype='EI')
data1['e'] = np.arange(5)
data1['i'] = 10.
data2 = data1.copy()
data2['i'] = 10.
load_data_mock.side_effect = [data1, data2]
data3 = data1.copy()
data3['i'] = (10. + 0.5 * 10.) / 1.5
self.project.combine_symmetries(parent_task, child_tasks)
save_data_mock.assert_called()
args, kwargs = save_data_mock.call_args
result_data = args[1]
np.testing.assert_array_almost_equal(result_data['e'], data3['e'])
np.testing.assert_array_almost_equal(result_data['i'], data3['i'])
if __name__ == '__main__':
unittest.main()

View File

@ -1,6 +1,6 @@
"""
@package tests.test_swarm
unit tests for pmsco.swarm
unit tests for @ref pmsco.optimizers.swarm
the purpose of these tests is to help debugging the code.
@ -17,29 +17,36 @@ Licensed under the Apache License, Version 2.0 (the "License"); @n
http://www.apache.org/licenses/LICENSE-2.0
"""
import unittest
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import os
import os.path
import tempfile
import random
import shutil
import numpy as np
import pmsco.swarm as mo
import pmsco.project as mp
import tempfile
import unittest
import pmsco.optimizers.swarm as swarm
import pmsco.project as project
POP_SIZE = 5
class TestPopulation(unittest.TestCase):
class TestSwarmPopulation(unittest.TestCase):
def setUp(self):
random.seed(0)
self.test_dir = tempfile.mkdtemp()
self.domain = mp.Domain()
self.domain = project.Domain()
self.domain.add_param('A', 1.5, 1.0, 2.0, 0.1)
self.domain.add_param('B', 2.5, 2.0, 3.0, 0.1)
self.domain.add_param('C', 3.5, 3.0, 4.0, 0.1)
self.expected_names = ('A', 'B', 'C', '_particle', '_gen', '_model', '_rfac')
self.expected_names = ('_gen', '_model', '_particle', '_rfac', 'A', 'B', 'C')
self.size = POP_SIZE
self.pop = mo.Population()
self.pop = swarm.SwarmPopulation()
self.optimum1 = {'A': 1.045351, 'B': 2.346212, 'C': 3.873627}
@ -65,100 +72,12 @@ class TestPopulation(unittest.TestCase):
r /= 3.0
return r
def test_setup(self):
self.pop.setup(self.size, self.domain)
self.assertItemsEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertItemsEqual(np.arange(POP_SIZE), self.pop.pos['_particle'])
self.assertItemsEqual(np.zeros((POP_SIZE)), self.pop.pos['_gen'])
self.assertItemsEqual(np.arange(POP_SIZE), self.pop.pos['_model'])
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertEqual(0, self.pop.generation)
self.assertEqual(POP_SIZE, self.pop.model_count)
def test_setup_with_results(self):
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, data_file, False)
self.assertItemsEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(0, self.pop.generation)
self.assertEqual(3, self.pop.model_count)
self.assertItemsEqual(np.arange(POP_SIZE), self.pop.pos['_particle'])
self.assertItemsEqual([-1, -1, 0, 0, 0], self.pop.pos['_gen'])
self.assertItemsEqual([-1, -2, 0, 1, 2], self.pop.pos['_model'])
self.assertAlmostEqual(0.3, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(0.6, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][4], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][4], 3)
self.assertAlmostEqual(3.3, self.pop.pos['C'][0], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][4], 3)
def test_setup_with_results_recalc(self):
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, data_file, True)
self.assertItemsEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(self.pop.generation, 0)
self.assertEqual(self.pop.model_count, POP_SIZE)
self.assertItemsEqual(self.pop.pos['_particle'], np.arange(POP_SIZE))
self.assertItemsEqual(self.pop.pos['_gen'], [0, 0, 0, 0, 0])
self.assertItemsEqual(self.pop.pos['_model'], np.arange(POP_SIZE))
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][4], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][4], 3)
self.assertAlmostEqual(3.3, self.pop.pos['C'][0], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][4], 3)
def test_pos_gen(self):
self.pop.setup(self.size, self.domain)
for index, item in enumerate(self.pop.pos_gen()):
self.assertIsInstance(item, dict)
self.assertItemsEqual(item.keys(), self.expected_names)
self.assertEqual(item['_particle'], index)
def test_randomize(self):
self.pop.setup(self.size, self.domain)
self.pop.randomize()
m = np.mean(self.pop.pos['A'])
self.assertGreaterEqual(m, self.domain.min['A'])
self.assertLessEqual(m, self.domain.max['A'])
def test_seed(self):
self.pop.setup(self.size, self.domain)
self.pop.seed(self.domain.start)
self.assertAlmostEqual(self.pop.pos['A'][0], self.domain.start['A'], delta=0.001)
def test_best_friend(self):
self.pop.setup(self.size, self.domain)
self.pop.best['_rfac'] = np.arange(self.size)
friend = self.pop.best_friend(0)
self.assertNotIsInstance(friend, np.ndarray)
self.assertItemsEqual(friend.dtype.names, self.expected_names)
self.assertEqual(friend.dtype.names, self.expected_names)
def test_advance_particle(self):
self.pop.setup(self.size, self.domain)
@ -182,16 +101,6 @@ class TestPopulation(unittest.TestCase):
self.assertGreaterEqual(pos, self.domain.min[key])
self.assertLessEqual(pos, self.domain.max[key])
def test_add_result(self):
self.pop.setup(self.size, self.domain)
i_sample = 1
i_result = 0
result = self.pop.pos[i_sample]
self.pop.add_result(result, 0.0)
self.assertEqual(self.pop.results.shape[0], 1)
self.assertItemsEqual(self.pop.results[i_result], result)
self.assertItemsEqual(self.pop.best[i_sample], result)
def test_is_converged(self):
self.pop.setup(self.size, self.domain)
self.assertFalse(self.pop.is_converged())
@ -206,139 +115,6 @@ class TestPopulation(unittest.TestCase):
self.pop.add_result(result, rfac)
self.assertTrue(self.pop.is_converged())
def test_save_population(self):
self.pop.setup(self.size, self.domain)
filename = os.path.join(self.test_dir, "test_save_population.pop")
self.pop.save_population(filename)
def test_save_results(self):
self.pop.setup(self.size, self.domain)
i_sample = 1
result = self.pop.pos[i_sample]
self.pop.add_result(result, 1.0)
filename = os.path.join(self.test_dir, "test_save_results.dat")
self.pop.save_results(filename)
def test_save_array(self):
self.pop.setup(self.size, self.domain)
filename = os.path.join(self.test_dir, "test_save_array.pos")
self.pop.save_array(filename, self.pop.pos)
def test_load_array(self):
n = 3
filename = os.path.join(self.test_dir, "test_load_array")
self.pop.setup(self.size, self.domain)
# expected array
dt_exp = self.pop.get_model_dtype(self.domain.start)
a_exp = np.zeros((n,), dtype=dt_exp)
a_exp['A'] = np.linspace(0, 1, n)
a_exp['B'] = np.linspace(1, 2, n)
a_exp['C'] = np.linspace(3, 4, n)
a_exp['_rfac'] = np.linspace(5, 6, n)
a_exp['_gen'] = np.array([3, 4, 7])
a_exp['_particle'] = np.array([1, 0, 2])
a_exp['_model'] = np.array([3, 6, 1])
# test array is a expected array with different column order
dt_test = [('A', 'f4'), ('_particle', 'i4'), ('_rfac', 'f4'), ('C', 'f4'), ('_gen', 'i4'), ('B', 'f4'),
('_model', 'i4')]
names_test = [a[0] for a in dt_test]
a_test = np.zeros((n,), dtype=dt_test)
for name in names_test:
a_test[name] = a_exp[name]
header = " ".join(names_test)
np.savetxt(filename, a_test, fmt='%g', header=header)
result = np.zeros((n,), dtype=dt_exp)
result = self.pop.load_array(filename, result)
self.assertItemsEqual(result.dtype.names, a_exp.dtype.names)
for name in a_exp.dtype.names:
np.testing.assert_almost_equal(result[name], a_exp[name], err_msg=name)
def test_constrain_position(self):
# upper
pos1 = 11.0
vel1 = 5.0
min1 = 0.0
max1 = 10.0
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 're-enter')
self.assertAlmostEqual(pos2, 1.0)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'bounce')
self.assertAlmostEqual(pos2, 9.0)
self.assertAlmostEqual(vel2, -vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'scatter')
self.assertGreaterEqual(pos2, 6.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, 0.0)
self.assertLessEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'stick')
self.assertAlmostEqual(pos2, max1)
self.assertAlmostEqual(vel2, max1 - (pos1 - vel1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'random')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, -(max1 - min1))
self.assertLessEqual(vel2, (max1 - min1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'expand')
self.assertAlmostEqual(pos2, pos1)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max2, pos1)
self.assertRaises(ValueError, mo.Population.constrain_position, pos1, vel1, min1, max1, 'undefined')
# lower
pos1 = -1.0
vel1 = -5.0
min1 = 0.0
max1 = 10.0
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 're-enter')
self.assertAlmostEqual(pos2, 9.0)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'bounce')
self.assertAlmostEqual(pos2, 1.0)
self.assertAlmostEqual(vel2, -vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'scatter')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 4.0)
self.assertGreaterEqual(vel2, vel1)
self.assertLessEqual(vel2, 0.0)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'stick')
self.assertAlmostEqual(pos2, min1)
self.assertAlmostEqual(vel2, min1 - (pos1 - vel1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'random')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, -(max1 - min1))
self.assertLessEqual(vel2, max1 - min1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'expand')
self.assertAlmostEqual(pos2, pos1)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min2, pos1)
self.assertAlmostEqual(max2, max1)
self.assertRaises(ValueError, mo.Population.constrain_position, pos1, vel1, min1, max1, 'undefined')
def test_convergence_1(self):
self.pop.setup(self.size, self.domain)
@ -359,6 +135,6 @@ class TestPopulation(unittest.TestCase):
for pos in self.pop.pos:
self.assertLess(pos['_rfac'], 0.2)
if __name__ == '__main__':
unittest.main()

View File

@ -1,5 +1,5 @@
# A B C _gen _model _rfac
1.1 2.1 3.1 1 1111 0.6
1.2 2.2 3.2 2 2222 1.5
1.3 2.3 3.3 3 3333 0.3
1.4 2.4 3.4 4 4444 1.0
# A B C E _gen _model _rfac _particle
1.1 2.1 3.1 5.1 1 1111 0.6 1
1.2 2.2 3.2 5.2 2 2222 1.5 2
1.3 2.3 4.3 5.3 3 3333 0.3 3
1.4 2.4 3.4 5.4 4 4444 1.0 4