pmsco-public/tests/test_swarm.py
matthias muntwiler bbd16d0f94 add files for public distribution
based on internal repository 0a462b6 2017-11-22 14:41:39 +0100
2017-11-22 14:55:20 +01:00

365 lines
16 KiB
Python

"""
@package tests.test_swarm
unit tests for pmsco.swarm
the purpose of these tests is to help debugging the code.
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
@pre nose must be installed (python-nose package on Debian).
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
@copyright (c) 2015 by Paul Scherrer Institut @n
Licensed under the Apache License, Version 2.0 (the "License"); @n
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
"""
import unittest
import os
import os.path
import tempfile
import shutil
import numpy as np
import pmsco.swarm as mo
import pmsco.project as mp
POP_SIZE = 5
class TestPopulation(unittest.TestCase):
def setUp(self):
self.test_dir = tempfile.mkdtemp()
self.domain = mp.Domain()
self.domain.add_param('A', 1.5, 1.0, 2.0, 0.1)
self.domain.add_param('B', 2.5, 2.0, 3.0, 0.1)
self.domain.add_param('C', 3.5, 3.0, 4.0, 0.1)
self.expected_names = ('A', 'B', 'C', '_particle', '_gen', '_model', '_rfac')
self.size = POP_SIZE
self.pop = mo.Population()
self.optimum1 = {'A': 1.045351, 'B': 2.346212, 'C': 3.873627}
def tearDown(self):
# after each test method
self.pop = None
shutil.rmtree(self.test_dir)
@classmethod
def setup_class(cls):
# before any methods in this class
pass
@classmethod
def teardown_class(cls):
# teardown_class() after any methods in this class
pass
def rfactor1(self, pos):
r = (pos['A'] - self.optimum1['A'])**2 \
+ (pos['B'] - self.optimum1['B'])**2 \
+ (pos['C'] - self.optimum1['C'])**2
r /= 3.0
return r
def test_setup(self):
self.pop.setup(self.size, self.domain)
self.assertItemsEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertItemsEqual(np.arange(POP_SIZE), self.pop.pos['_particle'])
self.assertItemsEqual(np.zeros((POP_SIZE)), self.pop.pos['_gen'])
self.assertItemsEqual(np.arange(POP_SIZE), self.pop.pos['_model'])
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertEqual(0, self.pop.generation)
self.assertEqual(POP_SIZE, self.pop.model_count)
def test_setup_with_results(self):
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, data_file, False)
self.assertItemsEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(0, self.pop.generation)
self.assertEqual(3, self.pop.model_count)
self.assertItemsEqual(np.arange(POP_SIZE), self.pop.pos['_particle'])
self.assertItemsEqual([-1, -1, 0, 0, 0], self.pop.pos['_gen'])
self.assertItemsEqual([-1, -2, 0, 1, 2], self.pop.pos['_model'])
self.assertAlmostEqual(0.3, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(0.6, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][4], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][4], 3)
self.assertAlmostEqual(3.3, self.pop.pos['C'][0], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][4], 3)
def test_setup_with_results_recalc(self):
data_dir = os.path.dirname(os.path.abspath(__file__))
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
self.pop.setup(self.size, self.domain, data_file, True)
self.assertItemsEqual(self.pop.pos.dtype.names, self.expected_names)
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
self.assertEqual(self.pop.generation, 0)
self.assertEqual(self.pop.model_count, POP_SIZE)
self.assertItemsEqual(self.pop.pos['_particle'], np.arange(POP_SIZE))
self.assertItemsEqual(self.pop.pos['_gen'], [0, 0, 0, 0, 0])
self.assertItemsEqual(self.pop.pos['_model'], np.arange(POP_SIZE))
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
self.assertAlmostEqual(1.3, self.pop.pos['A'][0], 3)
self.assertAlmostEqual(1.1, self.pop.pos['A'][1], 3)
self.assertAlmostEqual(1.5, self.pop.pos['A'][4], 3)
self.assertAlmostEqual(2.3, self.pop.pos['B'][0], 3)
self.assertAlmostEqual(2.1, self.pop.pos['B'][1], 3)
self.assertAlmostEqual(2.5, self.pop.pos['B'][4], 3)
self.assertAlmostEqual(3.3, self.pop.pos['C'][0], 3)
self.assertAlmostEqual(3.1, self.pop.pos['C'][1], 3)
self.assertAlmostEqual(3.5, self.pop.pos['C'][4], 3)
def test_pos_gen(self):
self.pop.setup(self.size, self.domain)
for index, item in enumerate(self.pop.pos_gen()):
self.assertIsInstance(item, dict)
self.assertItemsEqual(item.keys(), self.expected_names)
self.assertEqual(item['_particle'], index)
def test_randomize(self):
self.pop.setup(self.size, self.domain)
self.pop.randomize()
m = np.mean(self.pop.pos['A'])
self.assertGreaterEqual(m, self.domain.min['A'])
self.assertLessEqual(m, self.domain.max['A'])
def test_seed(self):
self.pop.setup(self.size, self.domain)
self.pop.seed(self.domain.start)
self.assertAlmostEqual(self.pop.pos['A'][0], self.domain.start['A'], delta=0.001)
def test_best_friend(self):
self.pop.setup(self.size, self.domain)
self.pop.best['_rfac'] = np.arange(self.size)
friend = self.pop.best_friend(0)
self.assertNotIsInstance(friend, np.ndarray)
self.assertItemsEqual(friend.dtype.names, self.expected_names)
def test_advance_particle(self):
self.pop.setup(self.size, self.domain)
self.pop.pos['A'] = np.linspace(1.0, 2.0, POP_SIZE)
self.pop.pos['B'] = np.linspace(2.0, 3.0, POP_SIZE)
self.pop.pos['C'] = np.linspace(3.0, 4.0, POP_SIZE)
self.pop.pos['_rfac'] = np.linspace(2.0, 1.0, POP_SIZE)
self.pop.vel['A'] = np.linspace(-0.1, 0.1, POP_SIZE)
self.pop.vel['B'] = np.linspace(-0.1, 0.1, POP_SIZE)
self.pop.vel['C'] = np.linspace(-0.1, 0.1, POP_SIZE)
pos0 = self.pop.pos['A'][0]
self.pop.advance_particle(0)
pos1 = self.pop.pos['A'][0]
self.assertNotAlmostEqual(pos0, pos1, delta=0.001)
for key in ['A','B','C']:
for pos in self.pop.pos[key]:
self.assertGreaterEqual(pos, self.domain.min[key])
self.assertLessEqual(pos, self.domain.max[key])
def test_add_result(self):
self.pop.setup(self.size, self.domain)
i_sample = 1
i_result = 0
result = self.pop.pos[i_sample]
self.pop.add_result(result, 0.0)
self.assertEqual(self.pop.results.shape[0], 1)
self.assertItemsEqual(self.pop.results[i_result], result)
self.assertItemsEqual(self.pop.best[i_sample], result)
def test_is_converged(self):
self.pop.setup(self.size, self.domain)
self.assertFalse(self.pop.is_converged())
i_sample = 0
result = self.pop.pos[i_sample]
for i in range(POP_SIZE):
rfac = 1.0 - float(i)/POP_SIZE
self.pop.add_result(result, rfac)
self.assertFalse(self.pop.is_converged())
for i in range(POP_SIZE):
rfac = (1.0 - float(i)/POP_SIZE) / 1000.0
self.pop.add_result(result, rfac)
self.assertTrue(self.pop.is_converged())
def test_save_population(self):
self.pop.setup(self.size, self.domain)
filename = os.path.join(self.test_dir, "test_save_population.pop")
self.pop.save_population(filename)
def test_save_results(self):
self.pop.setup(self.size, self.domain)
i_sample = 1
result = self.pop.pos[i_sample]
self.pop.add_result(result, 1.0)
filename = os.path.join(self.test_dir, "test_save_results.dat")
self.pop.save_results(filename)
def test_save_array(self):
self.pop.setup(self.size, self.domain)
filename = os.path.join(self.test_dir, "test_save_array.pos")
self.pop.save_array(filename, self.pop.pos)
def test_load_array(self):
n = 3
filename = os.path.join(self.test_dir, "test_load_array")
self.pop.setup(self.size, self.domain)
# expected array
dt_exp = self.pop.get_model_dtype(self.domain.start)
a_exp = np.zeros((n,), dtype=dt_exp)
a_exp['A'] = np.linspace(0, 1, n)
a_exp['B'] = np.linspace(1, 2, n)
a_exp['C'] = np.linspace(3, 4, n)
a_exp['_rfac'] = np.linspace(5, 6, n)
a_exp['_gen'] = np.array([3, 4, 7])
a_exp['_particle'] = np.array([1, 0, 2])
a_exp['_model'] = np.array([3, 6, 1])
# test array is a expected array with different column order
dt_test = [('A', 'f4'), ('_particle', 'i4'), ('_rfac', 'f4'), ('C', 'f4'), ('_gen', 'i4'), ('B', 'f4'),
('_model', 'i4')]
names_test = [a[0] for a in dt_test]
a_test = np.zeros((n,), dtype=dt_test)
for name in names_test:
a_test[name] = a_exp[name]
header = " ".join(names_test)
np.savetxt(filename, a_test, fmt='%g', header=header)
result = np.zeros((n,), dtype=dt_exp)
result = self.pop.load_array(filename, result)
self.assertItemsEqual(result.dtype.names, a_exp.dtype.names)
for name in a_exp.dtype.names:
np.testing.assert_almost_equal(result[name], a_exp[name], err_msg=name)
def test_constrain_position(self):
# upper
pos1 = 11.0
vel1 = 5.0
min1 = 0.0
max1 = 10.0
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 're-enter')
self.assertAlmostEqual(pos2, 1.0)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'bounce')
self.assertAlmostEqual(pos2, 9.0)
self.assertAlmostEqual(vel2, -vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'scatter')
self.assertGreaterEqual(pos2, 6.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, 0.0)
self.assertLessEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'stick')
self.assertAlmostEqual(pos2, max1)
self.assertAlmostEqual(vel2, max1 - (pos1 - vel1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'random')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, -(max1 - min1))
self.assertLessEqual(vel2, (max1 - min1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'expand')
self.assertAlmostEqual(pos2, pos1)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max2, pos1)
self.assertRaises(ValueError, mo.Population.constrain_position, pos1, vel1, min1, max1, 'undefined')
# lower
pos1 = -1.0
vel1 = -5.0
min1 = 0.0
max1 = 10.0
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 're-enter')
self.assertAlmostEqual(pos2, 9.0)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'bounce')
self.assertAlmostEqual(pos2, 1.0)
self.assertAlmostEqual(vel2, -vel1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'scatter')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 4.0)
self.assertGreaterEqual(vel2, vel1)
self.assertLessEqual(vel2, 0.0)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'stick')
self.assertAlmostEqual(pos2, min1)
self.assertAlmostEqual(vel2, min1 - (pos1 - vel1))
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'random')
self.assertGreaterEqual(pos2, 0.0)
self.assertLessEqual(pos2, 10.0)
self.assertGreaterEqual(vel2, -(max1 - min1))
self.assertLessEqual(vel2, max1 - min1)
self.assertAlmostEqual(min1, min2)
self.assertAlmostEqual(max1, max2)
pos2, vel2, min2, max2 = mo.Population.constrain_position(pos1, vel1, min1, max1, 'expand')
self.assertAlmostEqual(pos2, pos1)
self.assertAlmostEqual(vel2, vel1)
self.assertAlmostEqual(min2, pos1)
self.assertAlmostEqual(max2, max1)
self.assertRaises(ValueError, mo.Population.constrain_position, pos1, vel1, min1, max1, 'undefined')
def test_convergence_1(self):
self.pop.setup(self.size, self.domain)
self.pop.pos['A'] = np.linspace(1.0, 2.0, POP_SIZE)
self.pop.pos['B'] = np.linspace(2.0, 3.0, POP_SIZE)
self.pop.pos['C'] = np.linspace(3.0, 4.0, POP_SIZE)
self.pop.pos['_rfac'] = np.linspace(2.0, 1.0, POP_SIZE)
self.pop.vel['A'] = np.linspace(-0.1, 0.1, POP_SIZE)
self.pop.vel['B'] = np.linspace(-0.1, 0.1, POP_SIZE)
self.pop.vel['C'] = np.linspace(-0.1, 0.1, POP_SIZE)
for i in range(10):
self.pop.advance_population()
for pos in self.pop.pos:
self.pop.add_result(pos, self.rfactor1(pos))
for pos in self.pop.pos:
self.assertLess(pos['_rfac'], 0.2)
if __name__ == '__main__':
unittest.main()