382 lines
15 KiB
Python
382 lines
15 KiB
Python
"""
|
|
@package tests.test_genetic
|
|
unit tests for pmsco.optimizers.genetic
|
|
|
|
the purpose of these tests is to help debugging the code.
|
|
|
|
to run the tests, change to the directory which contains the tests directory, and execute =nosetests=.
|
|
|
|
@pre nose must be installed (python-nose package on Debian).
|
|
|
|
@author Matthias Muntwiler, matthias.muntwiler@psi.ch
|
|
|
|
@copyright (c) 2018 by Paul Scherrer Institut @n
|
|
Licensed under the Apache License, Version 2.0 (the "License"); @n
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
import math
|
|
import numpy as np
|
|
import os
|
|
import os.path
|
|
import random
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
|
|
import pmsco.optimizers.genetic as mo
|
|
import pmsco.project as mp
|
|
|
|
POP_SIZE = 6
|
|
|
|
|
|
class TestPopulation(unittest.TestCase):
|
|
def setUp(self):
|
|
random.seed(0)
|
|
self._test_dir = ""
|
|
self.model_space = mp.ModelSpace()
|
|
|
|
self.model_space.add_param('A', 1.5, 1.0, 2.0, 0.1)
|
|
self.model_space.add_param('B', 2.5, 2.0, 3.0, 0.1)
|
|
self.model_space.add_param('C', 3.5, 3.0, 4.0, 0.1)
|
|
self.expected_names = ('_gen', '_model', '_particle', '_rfac', 'A', 'B', 'C')
|
|
|
|
self.size = POP_SIZE
|
|
self.pop = mo.GeneticPopulation()
|
|
|
|
self.optimum1 = {'A': 1.045351, 'B': 2.346212, 'C': 3.873627}
|
|
self.optimum2 = {'A': 1.045351, 'B': 2.346212, 'C': 4.873627}
|
|
|
|
def tearDown(self):
|
|
# after each test method
|
|
self.pop = None
|
|
if self._test_dir:
|
|
shutil.rmtree(self._test_dir)
|
|
|
|
@property
|
|
def test_dir(self):
|
|
if not self._test_dir:
|
|
self._test_dir = tempfile.mkdtemp()
|
|
return self._test_dir
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
# before any methods in this class
|
|
pass
|
|
|
|
@classmethod
|
|
def teardown_class(cls):
|
|
# teardown_class() after any methods in this class
|
|
pass
|
|
|
|
def rfactor1(self, pos):
|
|
r = (pos['A'] - self.optimum1['A']) ** 2 \
|
|
+ (pos['B'] - self.optimum1['B']) ** 2 \
|
|
+ (pos['C'] - self.optimum1['C']) ** 2
|
|
r /= 3.0
|
|
return r
|
|
|
|
def rfactor2(self, pos):
|
|
"""
|
|
R-factor function with multiple local minima
|
|
|
|
global minimum R = 0.0138591 at A = 1.745, B = 2.395, C = 3.755.
|
|
|
|
domain A = 1.0:2.0, B = 2.0:3.0, C = 3.0:4.0
|
|
|
|
@param pos: dict-like position with keys 'A', 'B' and 'C'.
|
|
@return: R-factor
|
|
"""
|
|
xa = (pos['A'] - pos['B']) * 12
|
|
xb = (pos['B'] - pos['C']) * 15
|
|
xc = (pos['C'] - pos['A']) * 18
|
|
|
|
da = pos['A'] - 1.8
|
|
db = pos['B'] - 2.3
|
|
dc = pos['C'] - 3.8
|
|
|
|
aa = 1.0
|
|
ab = 1.0
|
|
ac = 1.0
|
|
|
|
ba = 0.4
|
|
bb = 0.8
|
|
bc = 1.2
|
|
|
|
r = aa * math.sin(xa) + ab * math.sin(xb) + ac * math.sin(xc)
|
|
r += ba * da**2 + bb * db**2 + bc * dc**2 + 3.0
|
|
|
|
return r
|
|
|
|
def test_setup(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
|
|
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
|
|
np.testing.assert_array_equal(np.arange(POP_SIZE), self.pop.pos['_particle'])
|
|
np.testing.assert_array_equal(np.zeros(POP_SIZE), self.pop.pos['_gen'])
|
|
np.testing.assert_array_equal(np.arange(POP_SIZE), self.pop.pos['_model'])
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
|
|
self.assertEqual(0, self.pop.generation)
|
|
self.assertEqual(POP_SIZE, self.pop.model_count)
|
|
|
|
def test_setup_with_results(self):
|
|
data_dir = os.path.dirname(os.path.abspath(__file__))
|
|
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
|
|
self.pop.setup(self.size, self.model_space, seed_file=data_file, recalc_seed=False)
|
|
|
|
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
|
|
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
|
|
self.assertEqual(self.pop.generation, 0)
|
|
self.assertEqual(self.pop.model_count, POP_SIZE)
|
|
np.testing.assert_array_equal(self.pop.pos['_particle'], np.arange(POP_SIZE))
|
|
np.testing.assert_array_equal(self.pop.pos['_gen'], [0, 0, -1, 0, 0, 0])
|
|
np.testing.assert_array_equal(self.pop.pos['_model'], np.arange(POP_SIZE))
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
|
|
self.assertAlmostEqual(0.6, self.pop.pos['_rfac'][2], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
|
|
self.assertAlmostEqual(1.3, self.pop.pos['A'][1], 3)
|
|
self.assertAlmostEqual(1.1, self.pop.pos['A'][2], 3)
|
|
self.assertAlmostEqual(1.5, self.pop.pos['A'][0], 3)
|
|
self.assertAlmostEqual(2.3, self.pop.pos['B'][1], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['B'][2], 3)
|
|
self.assertAlmostEqual(2.5, self.pop.pos['B'][0], 3)
|
|
self.assertGreaterEqual(4.0, self.pop.pos['C'][1], 3)
|
|
self.assertAlmostEqual(3.1, self.pop.pos['C'][2], 3)
|
|
self.assertAlmostEqual(3.5, self.pop.pos['C'][0], 3)
|
|
|
|
def test_setup_with_results_recalc(self):
|
|
data_dir = os.path.dirname(os.path.abspath(__file__))
|
|
data_file = os.path.join(data_dir, "test_swarm.setup_with_results.1.dat")
|
|
self.pop.setup(self.size, self.model_space, seed_file=data_file, recalc_seed=True)
|
|
|
|
self.assertEqual(self.pop.pos.dtype.names, self.expected_names)
|
|
self.assertEqual(self.pop.pos.shape, (POP_SIZE,))
|
|
self.assertEqual(self.pop.generation, 0)
|
|
self.assertEqual(self.pop.model_count, POP_SIZE)
|
|
np.testing.assert_array_equal(self.pop.pos['_particle'], np.arange(POP_SIZE))
|
|
np.testing.assert_array_equal(self.pop.pos['_gen'], [0, 0, 0, 0, 0, 0])
|
|
np.testing.assert_array_equal(self.pop.pos['_model'], np.arange(POP_SIZE))
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][0], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][1], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][2], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][3], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['_rfac'][4], 3)
|
|
self.assertAlmostEqual(1.3, self.pop.pos['A'][1], 3)
|
|
self.assertAlmostEqual(1.1, self.pop.pos['A'][2], 3)
|
|
self.assertAlmostEqual(1.5, self.pop.pos['A'][0], 3)
|
|
self.assertAlmostEqual(2.3, self.pop.pos['B'][1], 3)
|
|
self.assertAlmostEqual(2.1, self.pop.pos['B'][2], 3)
|
|
self.assertAlmostEqual(2.5, self.pop.pos['B'][0], 3)
|
|
self.assertGreaterEqual(4.0, self.pop.pos['C'][1], 3)
|
|
self.assertAlmostEqual(3.1, self.pop.pos['C'][2], 3)
|
|
self.assertAlmostEqual(3.5, self.pop.pos['C'][0], 3)
|
|
|
|
def test_pos_gen(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
for index, item in enumerate(self.pop.pos_gen()):
|
|
self.assertIsInstance(item, dict)
|
|
self.assertEqual(set(item.keys()), set(self.expected_names))
|
|
self.assertEqual(item['_particle'], index)
|
|
|
|
def test_randomize(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
self.pop.randomize()
|
|
self.assertTrue(np.all(self.pop.pos['A'] >= self.model_space.min['A']))
|
|
self.assertTrue(np.all(self.pop.pos['A'] <= self.model_space.max['A']))
|
|
self.assertGreater(np.std(self.pop.pos['A']), self.model_space.step['A'])
|
|
|
|
def test_seed(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
self.pop.seed(self.model_space.start)
|
|
self.assertAlmostEqual(self.pop.pos['A'][0], self.model_space.start['A'], delta=0.001)
|
|
|
|
def test_add_result(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
i_sample = 1
|
|
i_result = 0
|
|
result = self.pop.pos[i_sample]
|
|
self.pop.add_result(result, 0.0)
|
|
self.assertEqual(self.pop.results.shape[0], 1)
|
|
self.assertEqual(self.pop.results[i_result], result)
|
|
self.assertEqual(self.pop.best[i_sample], result)
|
|
|
|
def test_is_converged(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
self.assertFalse(self.pop.is_converged())
|
|
i_sample = 0
|
|
result = self.pop.pos[i_sample]
|
|
for i in range(POP_SIZE):
|
|
rfac = 1.0 - float(i) / POP_SIZE
|
|
self.pop.add_result(result, rfac)
|
|
self.assertFalse(self.pop.is_converged())
|
|
for i in range(POP_SIZE):
|
|
rfac = (1.0 - float(i) / POP_SIZE) / 1000.0
|
|
self.pop.add_result(result, rfac)
|
|
self.assertTrue(self.pop.is_converged())
|
|
|
|
def test_save_population(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
filename = os.path.join(self.test_dir, "test_save_population.pop")
|
|
self.pop.save_population(filename)
|
|
|
|
def test_save_results(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
i_sample = 1
|
|
result = self.pop.pos[i_sample]
|
|
self.pop.add_result(result, 1.0)
|
|
filename = os.path.join(self.test_dir, "test_save_results.dat")
|
|
self.pop.save_results(filename)
|
|
|
|
def test_save_array(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
filename = os.path.join(self.test_dir, "test_save_array.pos")
|
|
self.pop.save_array(filename, self.pop.pos)
|
|
|
|
def test_load_array(self):
|
|
n = 3
|
|
filename = os.path.join(self.test_dir, "test_load_array")
|
|
self.pop.setup(self.size, self.model_space)
|
|
|
|
# expected array
|
|
dt_exp = self.pop.get_pop_dtype(self.model_space.start)
|
|
a_exp = np.zeros((n,), dtype=dt_exp)
|
|
a_exp['A'] = np.linspace(0, 1, n)
|
|
a_exp['B'] = np.linspace(1, 2, n)
|
|
a_exp['C'] = np.linspace(3, 4, n)
|
|
a_exp['_rfac'] = np.linspace(5, 6, n)
|
|
a_exp['_gen'] = np.array([3, 4, 7])
|
|
a_exp['_particle'] = np.array([1, 0, 2])
|
|
a_exp['_model'] = np.array([3, 6, 1])
|
|
|
|
# test array is a expected array with different column order
|
|
dt_test = [('A', 'f4'), ('_particle', 'i4'), ('_rfac', 'f4'), ('C', 'f4'), ('_gen', 'i4'), ('B', 'f4'),
|
|
('_model', 'i4')]
|
|
names_test = [a[0] for a in dt_test]
|
|
a_test = np.zeros((n,), dtype=dt_test)
|
|
for name in names_test:
|
|
a_test[name] = a_exp[name]
|
|
header = " ".join(names_test)
|
|
np.savetxt(filename, a_test, fmt='%g', header=header)
|
|
|
|
result = np.zeros((n,), dtype=dt_exp)
|
|
result = self.pop.load_array(filename, result)
|
|
self.assertEqual(result.dtype.names, a_exp.dtype.names)
|
|
for name in a_exp.dtype.names:
|
|
np.testing.assert_almost_equal(result[name], a_exp[name], err_msg=name)
|
|
|
|
def test_mate_parents(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
pos1 = self.pop.pos.copy()
|
|
parents = self.pop.mate_parents(pos1)
|
|
self.assertEqual(len(parents), pos1.shape[0] / 2)
|
|
|
|
def test_crossover(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
p1 = self.pop.pos[2].copy()
|
|
p2 = self.pop.pos[3].copy()
|
|
c1, c2 = self.pop.crossover(p1, p2)
|
|
self.assertIsInstance(c1, np.void)
|
|
self.assertIsInstance(c2, np.void)
|
|
self.assertEqual(c1['_particle'], p1['_particle'])
|
|
self.assertEqual(c2['_particle'], p2['_particle'])
|
|
for name in self.model_space.start:
|
|
self.assertAlmostEqual(c1[name] + c2[name], p1[name] + p2[name], msg=name)
|
|
|
|
def test_mutate_weak(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
p1 = self.pop.pos[3].copy()
|
|
c1 = p1.copy()
|
|
self.pop.mutate_weak(c1, 1.0)
|
|
self.assertEqual(c1['_particle'], p1['_particle'])
|
|
self.assertNotAlmostEqual(c1['A'], p1['A'])
|
|
self.assertNotAlmostEqual(c1['B'], p1['B'])
|
|
self.assertNotAlmostEqual(c1['C'], p1['C'])
|
|
|
|
def test_mutate_strong(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
p1 = self.pop.pos[3].copy()
|
|
c1 = p1.copy()
|
|
self.pop.mutate_strong(c1, 1.0)
|
|
self.assertEqual(c1['_particle'], p1['_particle'])
|
|
self.assertNotAlmostEqual(c1['A'], p1['A'])
|
|
self.assertNotAlmostEqual(c1['B'], p1['B'])
|
|
self.assertNotAlmostEqual(c1['C'], p1['C'])
|
|
|
|
def test_advance_population(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
|
|
p1 = {'A': np.linspace(1.0, 2.0, POP_SIZE),
|
|
'B': np.linspace(2.0, 3.0, POP_SIZE),
|
|
'C': np.linspace(3.0, 4.0, POP_SIZE)}
|
|
|
|
self.pop.pos['A'] = p1['A']
|
|
self.pop.pos['B'] = p1['B']
|
|
self.pop.pos['C'] = p1['C']
|
|
for pos in self.pop.pos:
|
|
pos['_rfac'] = self.rfactor1(pos)
|
|
|
|
self.pop._hold_once = False
|
|
self.pop.weak_mutation_probability = 1.
|
|
self.pop.strong_mutation_probability = 0.
|
|
self.pop.advance_population()
|
|
|
|
for name, value in p1.items():
|
|
self.assertTrue(np.any(abs(self.pop.pos[name] - value) >= 0.001), msg=name)
|
|
|
|
def test_convergence_1(self):
|
|
self.pop.setup(self.size, self.model_space)
|
|
|
|
self.pop.pos['A'] = np.linspace(1.0, 2.0, POP_SIZE)
|
|
self.pop.pos['B'] = np.linspace(2.0, 3.0, POP_SIZE)
|
|
self.pop.pos['C'] = np.linspace(3.0, 4.0, POP_SIZE)
|
|
self.pop.pos['_rfac'] = np.linspace(2.0, 1.0, POP_SIZE)
|
|
|
|
best_rfactors = []
|
|
for i in range(10):
|
|
self.pop.advance_population()
|
|
for pos in self.pop.pos:
|
|
self.pop.add_result(pos, self.rfactor1(pos))
|
|
best_rfactors.append(self.pop.best['_rfac'].min())
|
|
self.assertLess(best_rfactors[-1], best_rfactors[0])
|
|
|
|
def optimize_rfactor_2(self, pop_size, iterations):
|
|
self.size = pop_size
|
|
self.pop.setup(self.size, self.model_space)
|
|
|
|
for i in range(iterations):
|
|
self.pop.advance_population()
|
|
for pos in self.pop.pos:
|
|
self.pop.add_result(pos, self.rfactor2(pos))
|
|
|
|
@unittest.skip("test_convergence_2 is unreliable")
|
|
def test_convergence_2(self):
|
|
"""
|
|
there is a certain probability that this test fails.
|
|
|
|
@return:
|
|
"""
|
|
self.pop.weak_mutation_probability = 1.
|
|
self.pop.strong_mutation_probability = 0.01
|
|
self.optimize_rfactor_2(10, 200)
|
|
ibest = self.pop.results['_rfac'].argmin()
|
|
best = self.pop.results[ibest]
|
|
self.assertLess(best['_rfac'], 0.2)
|
|
self.assertAlmostEqual(best['A'], 1.745, delta=0.1)
|
|
self.assertAlmostEqual(best['B'], 2.395, delta=0.1)
|
|
self.assertAlmostEqual(best['C'], 3.755, delta=0.1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|