modified read_n also for File and RawFile

This commit is contained in:
froejdh_e
2024-11-27 09:31:57 +01:00
parent 996a8861f6
commit 8bf9ac55ce
11 changed files with 217 additions and 78 deletions
+5 -4
View File
@@ -8,14 +8,15 @@ class CtbRawFile(_aare.CtbRawFile):
Args:
fname (pathlib.Path | str): Path to the file to be read.
chunk_size (int): Number of frames to read at a time. Default is 1.
transform (function): Function to apply to the data after reading it.
The function should take a numpy array of type uint8 and return one
or several numpy arrays.
"""
def __init__(self, fname, transform = None, chunk_size = 1):
def __init__(self, fname, chunk_size = 1, transform = None):
super().__init__(fname)
self.transform = transform
self._chunk_size = chunk_size
self._transform = transform
def read_frame(self, frame_index: int | None = None ) -> tuple:
@@ -44,8 +45,8 @@ class CtbRawFile(_aare.CtbRawFile):
header = header[0]
if self.transform:
res = self.transform(data)
if self._transform:
res = self._transform(data)
if isinstance(res, tuple):
return header, *res
else:
+66
View File
@@ -0,0 +1,66 @@
from . import _aare
import numpy as np
from .ScanParameters import ScanParameters
class RawFile(_aare.RawFile):
def __init__(self, fname, chunk_size = 1):
super().__init__(fname)
self._chunk_size = chunk_size
def read(self) -> tuple:
"""Read the entire file.
Seeks to the beginning of the file before reading.
Returns:
tuple: header, data
"""
self.seek(0)
return self.read_n(self.total_frames)
@property
def scan_parameters(self):
"""Return the scan parameters.
Returns:
ScanParameters: Scan parameters.
"""
return ScanParameters(self.master.scan_parameters)
@property
def master(self):
"""Return the master file.
Returns:
RawMasterFile: Master file.
"""
return super().master()
def __len__(self) -> int:
"""Return the number of frames in the file.
Returns:
int: Number of frames in file.
"""
return super().frames_in_file
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
def __iter__(self):
return self
def __next__(self):
try:
if self._chunk_size == 1:
return self.read_frame()
else:
return self.read_n(self._chunk_size)
except RuntimeError:
# TODO! find a good way to check that we actually have the right exception
raise StopIteration
+5 -2
View File
@@ -2,10 +2,13 @@
from . import _aare
from ._aare import File, RawFile, RawMasterFile, RawSubFile
from ._aare import File, RawMasterFile, RawSubFile
from ._aare import Pedestal, ClusterFinder, VarClusterFinder
from ._aare import DetectorType
from ._aare import ClusterFile
from .CtbRawFile import CtbRawFile
from .ScanParameters import ScanParameters
from .RawFile import RawFile
from .ScanParameters import ScanParameters
from .utils import random_pixels, random_pixel
+23
View File
@@ -0,0 +1,23 @@
import numpy as np
def random_pixels(n_pixels, xmin=0, xmax=512, ymin=0, ymax=1024):
"""Return a list of random pixels.
Args:
n_pixels (int): Number of pixels to return.
rows (int): Number of rows in the image.
cols (int): Number of columns in the image.
Returns:
list: List of (row, col) tuples.
"""
return [(np.random.randint(ymin, ymax), np.random.randint(xmin, xmax)) for _ in range(n_pixels)]
def random_pixel(xmin=0, xmax=512, ymin=0, ymax=1024):
"""Return a random pixel.
Returns:
tuple: (row, col)
"""
return random_pixels(1, xmin, xmax, ymin, ymax)[0]