Commit from local

This commit is contained in:
Jakob Lass 2025-04-09 16:46:45 +02:00
parent 19e0cc92af
commit 6865e2cbff
7 changed files with 544 additions and 5 deletions

View File

@ -35,7 +35,7 @@ Mozilla Public License Version 2.0
means any form of the work other than Source Code Form. means any form of the work other than Source Code Form.
1.7. "Larger Work" 1.7. "Larger Work"
means a work that combines Covered Software with other material, in means a work that combines Covered Software with other material, in
a separate file or files, that is not Covered Software. a separate file or files, that is not Covered Software.
1.8. "License" 1.8. "License"
@ -357,7 +357,7 @@ Exhibit A - Source Code Form License Notice
This Source Code Form is subject to the terms of the Mozilla Public This Source Code Form is subject to the terms of the Mozilla Public
License, v. 2.0. If a copy of the MPL was not distributed with this License, v. 2.0. If a copy of the MPL was not distributed with this
file, You can obtain one at https://mozilla.org/MPL/2.0/. file, You can obtain one at http://mozilla.org/MPL/2.0/.
If it is not possible or desirable to put the notice in a particular If it is not possible or desirable to put the notice in a particular
file, then You may include the notice in a location (such as a LICENSE file, then You may include the notice in a location (such as a LICENSE
@ -370,4 +370,4 @@ Exhibit B - "Incompatible With Secondary Licenses" Notice
--------------------------------------------------------- ---------------------------------------------------------
This Source Code Form is "Incompatible With Secondary Licenses", as This Source Code Form is "Incompatible With Secondary Licenses", as
defined by the Mozilla Public License, v. 2.0. defined by the Mozilla Public License, v. 2.0.

View File

@ -1,3 +1,28 @@
# AMBER AMBER
=====
General introduction
## Installation
```
pip install AMBER
```
or
```
python3 -m pip install AMBER
```
Further details are found in our [documentation](https://dmcpy.readthedocs.io/en/latest/introduction.html)
## Documentation and Tutorials
## Contribute
## Contact
AMBER: Algorithm for Multiplexing spectrometer Background Estimation with Rotation-independence

52
pyproject.toml Normal file
View File

@ -0,0 +1,52 @@
[build-system]
requires = ["setuptools >= 77.0.3"]
build-backend = "setuptools.build_meta"
[project]
name = "AMBER"
version = "0.0.1"
dependencies = [
"numpy>=2",
"scipy>=1.7",
"torch>=2",
]
requires-python = ">=3.6"
authors = [
{name = "Jakob Lass", email = "jakob.lass@psi.ch"},
{name = "Victor Cohen", email = "victor.cohen@sdsc.ethz.ch"},
{name = "Bejar Haro Benjamin", email = "benjamin.bejar@psi.ch"},
{name = "Daniel G. Mazzone", email = "daniel.mazzone@psi.ch"},
]
maintainers = [
{name = "Jakob Lass", email = "jakob.lass@psi.ch"},
]
description = "AMBER: Algorithm for Multiplexing spectrometer Background Estimation with Rotation-independence"
readme = "README.md"
license = "MPL V2.0"
license-files = ["LICENSE"]
keywords = ["Machine Learning", "Signal Segmentation", "Background Determination"]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Science/Research",
"Intended Audience :: Education",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)",
"Operating System :: OS Independent",
""
]
[project.urls]
Homepage = "https://example.com"
Documentation = "https://readthedocs.org"
Repository = "https://github.com/Jakob-Lass/AMBER.git"
"Bug Tracker" = "https://github.com/Jakob-Lass/AMBER/issues"
Changelog = "https://github.com/Jakob-Lass/AMBER/master/CHANGELOG.md"

0
src/__init__.py Normal file
View File

215
src/graph_laplacian.py Normal file
View File

@ -0,0 +1,215 @@
# tools
import numpy as np
import scipy
import torch
from scipy.sparse import lil_matrix, block_diag,csr_array,diags,csr_matrix
def create_laplacian_matrix(nx, ny=None):
"""
Helper method to create the laplacian matrix for the laplacian regularization
Parameters
----------
:param nx: height of the original image
:param ny: width of the original image
Returns
-------
:rtype: scipy.sparse.csr_matrix
:return:the n x n laplacian matrix, where n = nx*ny
"""
if ny is None:
ny = nx
assert(nx>1)
assert(ny>1)
#Blocks corresponding to the corner of the image (linking row elements)
top_block=lil_matrix((ny,ny),dtype=np.float32)
top_block.setdiag([2]+[3]*(ny-2)+[2])
top_block.setdiag(-1,k=1)
top_block.setdiag(-1,k=-1)
#Blocks corresponding to the middle of the image (linking row elements)
mid_block=lil_matrix((ny,ny),dtype=np.float32)
mid_block.setdiag([3]+[4]*(ny-2)+[3])
mid_block.setdiag(-1,k=1)
mid_block.setdiag(-1,k=-1)
#Construction of the diagonal of blocks
list_blocks=[top_block]+[mid_block]*(nx-2)+[top_block]
blocks=block_diag(list_blocks)
#Diagonals linking different rows
blocks.setdiag(-1,k=ny)
return blocks
def delete_from_csr(mat, row_indices=[], col_indices=[]):
"""
Remove the rows (denoted by ``row_indices``) and columns (denoted by ``col_indices``) from the CSR sparse matrix ``mat``.
WARNING: Indices of altered axes are reset in the returned matrix
"""
if not isinstance(mat, csr_matrix):
raise ValueError("works only for CSR format -- use .tocsr() first")
rows = []
cols = []
if row_indices:
rows = list(row_indices)
if col_indices:
cols = list(col_indices)
if len(rows) > 0 and len(cols) > 0:
row_mask = np.ones(mat.shape[0], dtype=bool)
row_mask[rows] = False
col_mask = np.ones(mat.shape[1], dtype=bool)
col_mask[cols] = False
return mat[row_mask][:,col_mask]
elif len(rows) > 0:
mask = np.ones(mat.shape[0], dtype=bool)
mask[rows] = False
return mat[mask]
elif len(cols) > 0:
mask = np.ones(mat.shape[1], dtype=bool)
mask[cols] = False
return mat[:,mask]
else:
return mat
def remove_vertex(L,lst_rows=[],lst_cols=[]):
"""
Function that removes a vertex and adjust the graph laplacian matrix.
"""
L_cut = delete_from_csr(L.tocsr(), row_indices=lst_rows, col_indices=lst_cols)
L_cut = L_cut - diags(L_cut.diagonal())
L_cut = L_cut - diags(L_cut.sum(axis=1).A1)
assert (L_cut.sum(axis=1).A1 == np.zeros(L_cut.shape[0])).all()
return L_cut
def laplacian(Y,nx,ny=None):
"""
Function that removes the vertices corresponding to Nan locations of tensor Y.
Args:
- Y: torch.tensor of the observations (Float64)
- nx,ny: dimensions of the image (Int64)
"""
# find Nan indices
Nan_indices = torch.where(torch.isnan(Y.ravel())==True)[0]
# get list of indices
list_idx = list(Nan_indices.detach().numpy())
# create Laplacian
L = create_laplacian_matrix(nx, ny=ny)
L = remove_vertex(L,list_idx,list_idx)
return L
def unnormalized_laplacian(y, nx, ny=None, method='inverse'):
"""Construct numpy array with non zeros weights and non zeros indices.
Args:
- y: np.array for observations of size (size_x*size_y)
- method: str indicating how to compute the weight of the unnormalized laplacian
"""
# create laplacian matrix
lapl_tmp = laplacian(y,nx,ny)
lapl_tmp.setdiag(np.zeros(nx*ny))
# select the non nan indices
y_tmp = y[torch.isnan(y)==False]
# store non zero indices
idx_rows = np.array(lapl_tmp.nonzero()[0])
idx_cols = np.array(lapl_tmp.nonzero()[1])
# construct the set of weights
nnz_w = np.zeros_like(idx_rows, dtype=np.float32)
# construction of the non zeros weights
if method == 'inverse':
nnz_w = 1/(np.abs(y_tmp[idx_rows] - y_tmp[idx_cols])+1e-4)
else:
nnz_w = np.exp(-np.abs(y_tmp[idx_rows] - y_tmp[idx_cols]))
# construct the non diagonal terms of the Laplacian
lapl_nondiag = csr_array((nnz_w, (idx_rows, idx_cols)), shape=(lapl_tmp.shape[0], lapl_tmp.shape[0]), dtype=np.float32)
# construct the diagonal terms of the Laplacian
lapl_diag = diags(lapl_nondiag.sum(axis=0))
# construct the Laplacian
L = lapl_diag - lapl_nondiag
return L
def laplacian_chain(nb_vertices):
"""
Construct the Laplacian matrix of a chain.
"""
L = np.zeros((nb_vertices,nb_vertices))
# First vertex
L[0,0] = 1
L[0,1] = -1
# Toeplitz matrix
if nb_vertices > 2:
first_row = torch.zeros(nb_vertices)
first_row[0] = -1
first_row[1] = 2
first_row[2] = -1
first_col = torch.zeros(nb_vertices-2)
first_col[0] = -1
D = scipy.linalg.toeplitz(first_col, r=first_row)
L[1:nb_vertices-1,:] = D
# Last vertex
L[-1,-2] = -1
L[-1,-1] = 1
return L
def unnormalized_laplacian_chain(y, nx, method='inverse'):
"""Construct numpy array with non zeros weights and non zeros indices.
Args:
- y: np.array for aggregated observations of size (nb_bins)
- method: str indicating how to compute the weight of the unnormalized laplacian
"""
# create laplacian matrix
lapl_tmp = csr_matrix(laplacian_chain(nx))
lapl_tmp.setdiag(np.zeros(nx*nx))
# select the non nan indices
y_tmp = np.nan_to_num(y)
# store non zero indices
idx_rows = np.array(lapl_tmp.nonzero()[0])
idx_cols = np.array(lapl_tmp.nonzero()[1])
# construct the set of weights
nnz_w = np.zeros_like(idx_rows, dtype=np.float32)
# construction of the non zeros weights
if method == 'inverse':
nnz_w = 1/(np.abs(y_tmp[idx_rows] - y_tmp[idx_cols]))
else:
nnz_w = np.exp(-np.abs(y_tmp[idx_rows] - y_tmp[idx_cols]))
# construct the non diagonal terms of the Laplacian
lapl_nondiag = csr_array((nnz_w, (idx_rows, idx_cols)), shape=(lapl_tmp.shape[0], lapl_tmp.shape[0]), dtype=np.float32)
# construct the diagonal terms of the Laplacian
lapl_diag = diags(lapl_nondiag.sum(axis=0))
# construct the Laplacian
L = lapl_diag - lapl_nondiag
return L

177
tests/test_background.py Normal file
View File

@ -0,0 +1,177 @@
import unittest
import torch
import numpy as np
import os
import sys
maindir = os.getcwd()
main_path = maindir[:maindir.find('ds4ms/code')]
sys.path.append(main_path+"/ds4ms/code/src")
from background import background
class TestBackground(unittest.TestCase):
def setUp(self):
self.bg = background(data_path='/mydata/ds4ms/victor/Data/', str_dataset='VanadiumOzide', str_option='6p9T')
self.bg.load_data(verbose=True)
self.bg.set_grid_volume(dqx=0.03, dqy=0.03, dE=0.08)
self.bg.set_binned_data()
self.bg.set_radial_bins(max_radius=6.0, n_bins=10
self.bg.Ygrid = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
def test_load_data(self):
self.bg.load_data(verbose=True)
self.assertIsNotNone(self.bg.ds)
def test_set_dataset(self):
ds = "dummy_dataset"
self.bg.set_dataset(ds)
self.assertEqual(self.bg.ds, ds)
def test_mask_preprocessing(self):
self.bg.mask_preprocessing()
self.assertIsNotNone(self.bg.ds.mask)
def test_set_grid_volume(self):
self.bg.set_grid_volume(dqx=0.03, dqy=0.03, dE=0.08)
self.assertEqual(self.bg.dqx, 0.03)
self.assertEqual(self.bg.dqy, 0.03)
self.assertEqual(self.bg.dE, 0.08)
def test_set_binned_data(self):
self.bg.set_binned_data()
self.assertIsNotNone(self.bg.data)
self.assertIsNotNone(self.bg.bins)
def test_set_variables(self):
self.bg.set_variables()
self.assertIsNotNone(self.bg.X)
self.assertIsNotNone(self.bg.b)
self.assertIsNotNone(self.bg.b_grid)
def test_R_operator(self):
Y_r = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
b = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
result = self.bg.R_operator(Y_r, b)
self.assertIsNotNone(result)
def test_Rstar_operator(self):
Y_r = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
X = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
result = self.bg.Rstar_operator(Y_r, X)
self.assertIsNotNone(result)
def test_mask_nans(self):
x = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
result = self.bg.mask_nans(x)
self.assertIsNotNone(result)
def test_S_lambda(self):
x = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
lambda_ = torch.tensor(np.random.rand(10), dtype=torch.float64)
result = self.bg.S_lambda(x, lambda_)
self.assertIsNotNone(result)
def test_L_b(self):
self.bg.set_radial_nans()
result = self.bg.L_b(e_cut=0)
self.assertIsNotNone(result)
def test_gamma_matrix(self):
self.bg.set_radial_nans()
result = self.bg.gamma_matrix(e_cut=0)
self.assertIsNotNone(result)
def test_set_radial_nans(self):
self.bg.set_radial_nans()
self.assertIsNotNone(self.bg.u)
def test_set_b_design_matrix(self):
beta_ = torch.tensor(1.0, dtype=torch.float64)
alpha_ = torch.tensor(np.random.rand(10), dtype=torch.float64)
result = self.bg.set_b_design_matrix(beta_, alpha_, e_cut=0)
self.assertIsNotNone(result)
def test_compute_laplacian(self):
Y_r = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
result = self.bg.compute_laplacian(Y_r)
self.assertIsNotNone(result)
def test_compute_all_laplacians(self):
Y_r = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
self.bg.compute_all_laplacians(Y_r)
self.assertIsNotNone(self.bg.L_list)
def test_TV_denoising(self):
gamma_ = torch.tensor(np.random.rand(10), dtype=torch.float64)
result = self.bg.TV_denoising(gamma_, n_epochs=1, verbose=False)
self.assertIsNotNone(result)
def test_L_e(self):
result = self.bg.L_e()
self.assertIsNotNone(result)
def test_set_e_design_matrix(self):
result = self.bg.set_e_design_matrix(mu_=1.0)
self.assertIsNotNone(result)
def test_MAD_lambda(self):
result = self.bg.MAD_lambda()
self.assertIsNotNone(result)
def test_mu_estimator(self):
result = self.bg.mu_estimator()
self.assertIsNotNone(result)
def test_denoising(self):
Y_r = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
lambda_ = torch.tensor(np.random.rand(10), dtype=torch.float64)
beta_ = torch.tensor(np.random.rand(10), dtype=torch.float64)
alpha_ = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
self.bg.denoising(Y_r, lambda_, beta_, alpha_, mu_=1.0, n_epochs=1, verbose=False)
self.assertIsNotNone(self.bg.X)
self.assertIsNotNone(self.bg.b)
def test_applyBackground(self):
self.bg.applyBackground(median=False)
self.assertIsNotNone(self.bg.ds.backgroundModel)
def test_median_bg(self):
result = self.bg.median_bg(self.bg.Ygrid)
self.assertIsNotNone(result)
def test_compute_signal_to_noise(self):
b = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
result = self.bg.compute_signal_to_noise(b)
self.assertIsNotNone(result)
def test_plot_snr(self):
b = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
self.bg.plot_snr(b, e_cut=range(2, 4), fmin=0.0, fmax=0.1)
def test_save_arrays(self):
self.bg.save_arrays(median=True)
self.assertTrue(os.path.exists('arrays'))
def test_compute_signal_to_obs(self):
b = torch.tensor(np.random.rand(10, 10), dtype=torch.float64)
result = self.bg.compute_signal_to_obs(b)
self.assertIsNotNone(result)
def test_cross_validation(self):
lambda_range = torch.tensor([1.0])
alpha_range = torch.tensor([1.0])
beta_range = torch.tensor([1.0])
mu_range = torch.tensor([1.0])
result = self.bg.cross_validation(lambda_range=lambda_range, alpha_range=alpha_range, beta_range=beta_range, mu_range=mu_range, n_epochs=1, verbose=False)
self.assertIsNotNone(result)
def test_compute_mask(self):
result = self.bg.compute_mask(q=0.75, e_cut=None)
self.assertIsNotNone(result)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,70 @@
import unittest
import numpy as np
import torch
from scipy.sparse import lil_matrix, csr_matrix, coo_matrix
import os
import sys
maindir = os.getcwd()
main_path = maindir[:maindir.find('ds4ms/code')]
sys.path.append(main_path+"/ds4ms/code/src")
from graph_laplacian import create_laplacian_matrix, delete_from_csr, remove_vertex, laplacian, unnormalized_laplacian, laplacian_chain, unnormalized_laplacian_chain
class TestGraphLaplacian(unittest.TestCase):
def test_create_laplacian_matrix(self):
nx, ny = 3, 3
L = create_laplacian_matrix(nx, ny)
self.assertEqual(L.shape, (nx*ny, nx*ny))
self.assertIsInstance(L, coo_matrix)
def test_delete_from_csr(self):
mat = lil_matrix((4, 4), dtype=np.float32)
mat.setdiag([1, 2, 3, 4])
mat = mat.tocsr()
print(mat)
mat = delete_from_csr(mat, row_indices=[1], col_indices=[2])
print(mat)
self.assertEqual(mat.shape, (3, 3))
self.assertEqual(mat[1, 1], 0.0)
def test_remove_vertex(self):
nx, ny = 3, 3
L = create_laplacian_matrix(nx, ny)
L_cut = remove_vertex(L, lst_rows=[0], lst_cols=[0])
self.assertEqual(L_cut.shape, (nx*ny-1, nx*ny-1))
self.assertTrue((L_cut.sum(axis=1).A1 == np.zeros(L_cut.shape[0])).all())
def test_laplacian(self):
Y = torch.tensor([[1.0, np.nan, 3.0], [4.0, 5.0, np.nan], [7.0, 8.0, 9.0]])
nx, ny = Y.shape
L = laplacian(Y, nx, ny)
self.assertEqual(L.shape, (nx*ny-2, nx*ny-2))
self.assertTrue((L.sum(axis=1).A1 == np.zeros(L.shape[0])).all())
def test_unnormalized_laplacian(self):
y = torch.tensor([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]])
nx, ny = y.shape
L = unnormalized_laplacian(y, nx, ny, method='inverse')
self.assertEqual(L.shape, (nx*ny, nx*ny))
self.assertIsInstance(L, csr_matrix)
def test_laplacian_chain(self):
nb_vertices = 5
L = laplacian_chain(nb_vertices)
self.assertEqual(L.shape, (nb_vertices, nb_vertices))
self.assertEqual(L[0, 0], 1)
self.assertEqual(L[0, 1], -1)
self.assertEqual(L[-1, -2], -1)
self.assertEqual(L[-1, -1], 1)
def test_unnormalized_laplacian_chain(self):
y = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
nx = len(y)
L = unnormalized_laplacian_chain(y, nx, method='inverse')
self.assertEqual(L.shape, (nx, nx))
self.assertIsInstance(L, csr_matrix)
if __name__ == '__main__':
unittest.main()