0
0
mirror of https://github.com/bec-project/bec_widgets.git synced 2025-07-14 03:31:50 +02:00

feat(plots/image): image processor can run in threaded or non-threaded version

This commit is contained in:
wyzula-jan
2024-03-12 00:38:54 +01:00
parent 3362fabed7
commit 4865b10ced

View File

@ -16,7 +16,7 @@ from bec_widgets.utils import ConnectionConfig, BECConnector
from bec_widgets.widgets.plots import BECPlotBase, WidgetConfig
class PostProcessingConfig(BaseModel):
class ProcessingConfig(BaseModel):
fft: Optional[bool] = Field(False, description="Whether to perform FFT on the monitor data.")
log: Optional[bool] = Field(False, description="Whether to perform log on the monitor data.")
center_of_mass: Optional[bool] = Field(
@ -42,8 +42,8 @@ class ImageItemConfig(ConnectionConfig):
color_bar: Optional[Literal["simple", "full"]] = Field(
"simple", description="The type of the color bar."
)
post_processing: PostProcessingConfig = Field(
default_factory=PostProcessingConfig, description="The post processing of the image."
processing: ProcessingConfig = Field(
default_factory=ProcessingConfig, description="The post processing of the image."
)
@ -100,7 +100,12 @@ class BECImageItem(BECConnector, pg.ImageItem):
"downsample": self.set_auto_downsample,
"color_map": self.set_color_map,
"monitor": self.set_monitor,
"opacity": self.set_opacity,
"vrange": self.set_vrange,
"fft": self.set_fft,
"log": self.set_log,
"rot": self.set_rotation,
"transpose": self.set_transpose,
}
for key, value in kwargs.items():
if key in method_map:
@ -108,6 +113,24 @@ class BECImageItem(BECConnector, pg.ImageItem):
else:
print(f"Warning: '{key}' is not a recognized property.")
def set_fft(self, enable: bool = False):
self.config.processing.fft = enable
def set_log(self, enable: bool = False):
self.config.processing.log = enable
if enable and self.color_bar and self.config.color_bar == "full":
self.color_bar.autoHistogramRange()
def set_rotation(self, deg_90: int = 0):
self.config.processing.rotation = deg_90
def set_transpose(self, enable: bool = False):
self.config.processing.transpose = enable
def set_opacity(self, opacity: float = 1.0):
self.setOpacity(opacity)
self.config.opacity = opacity
def set_color_map(self, cmap: str = "magma"):
"""
Set the color map of the image.
@ -139,6 +162,24 @@ class BECImageItem(BECConnector, pg.ImageItem):
"""
self.config.monitor = monitor
def set_vrange(self, vmin: float = None, vmax: float = None, vrange: tuple[int, int] = None):
"""
Set the range of the color bar.
Args:
vmin(float): Minimum value of the color bar.
vmax(float): Maximum value of the color bar.
"""
if vrange is not None:
vmin, vmax = vrange
self.setLevels([vmin, vmax])
self.config.vrange = (vmin, vmax)
if self.color_bar is not None:
if self.config.color_bar == "simple":
self.color_bar.setLevels(low=vmin, high=vmax)
elif self.config.color_bar == "full":
self.color_bar.setLevels(min=vmin, max=vmax)
self.color_bar.setHistogramRange(vmin - 0.1 * vmin, vmax + 0.1 * vmax)
def _add_color_bar(
self, color_bar_style: str = "simple", vrange: Optional[tuple[int, int]] = None
):
@ -174,23 +215,23 @@ class BECImageItem(BECConnector, pg.ImageItem):
else:
raise ValueError("style should be 'simple' or 'full'")
def set_vrange(self, vmin: float = None, vmax: float = None, vrange: tuple[int, int] = None):
def _update_color_bar_for_log(self):
"""
Set the range of the color bar.
Args:
vmin(float): Minimum value of the color bar.
vmax(float): Maximum value of the color bar.
Update the color bar to reflect a logarithmic scale.
"""
if vrange is not None:
vmin, vmax = vrange
self.setLevels([vmin, vmax])
self.config.vrange = (vmin, vmax)
if self.color_bar is not None:
if self.config.color_bar == "simple":
self.color_bar.setLevels(low=vmin, high=vmax)
elif self.config.color_bar == "full":
self.color_bar.setLevels(min=vmin, max=vmax)
self.color_bar.setHistogramRange(vmin - 0.1 * vmin, vmax + 0.1 * vmax)
...
# if self.config.vrange:
# vmin, vmax = self.config.vrange
# offset = 1e-6
# vmin_log, vmax_log = np.log10(vmin + offset), np.log10(vmax + offset)
# if self.config.color_bar == "simple":
# self.color_bar.setLevels(low=vmin_log, high=vmax_log)
# elif self.config.color_bar == "full":
# self.color_bar.setImageItem(self)
# self.color_bar.setLevels(min=vmin_log, max=vmax_log)
# self.color_bar.setHistogramRange(
# vmin_log - 0.1 * vmin_log, vmax_log + 0.1 * vmax_log
# )
class BECImageShow(BECPlotBase):
@ -221,6 +262,23 @@ class BECImageShow(BECPlotBase):
self._images = defaultdict(dict)
self.apply_config(self.config)
self.processor = ImageProcessor()
self.use_threading = False # TODO WILL be moved to the init method and to figure method
def _create_thread_worker(self, device: str, image: np.ndarray):
thread = QThread()
worker = ProcessorWorker(self.processor)
worker.moveToThread(thread)
# Connect signals and slots
thread.started.connect(lambda: worker.process_image(device, image))
worker.processed.connect(self.update_image)
worker.finished.connect(thread.quit)
worker.finished.connect(thread.wait)
worker.finished.connect(worker.deleteLater)
thread.finished.connect(thread.deleteLater)
thread.start()
def find_widget_by_id(self, item_id: str) -> BECImageItem:
"""
@ -397,6 +455,27 @@ class BECImageShow(BECPlotBase):
image = self._add_image_object(source=image_source, config=image_config, data=data)
return image
def apply_setting_to_images(
self, setting_method_name: str, args: list, kwargs: dict, image_id: str = None
):
"""
Apply a setting to all images or a specific image by its ID.
Args:
setting_method_name (str): The name of the method to apply (e.g., 'set_color_map').
args (list): Positional arguments for the setting method.
kwargs (dict): Keyword arguments for the setting method.
image_id (str, optional): The ID of the specific image to apply the setting to. If None, applies to all images.
"""
if image_id:
image = self.find_widget_by_id(image_id)
if image:
getattr(image, setting_method_name)(*args, **kwargs)
else:
for source, images in self._images.items():
for _, image in images.items():
getattr(image, setting_method_name)(*args, **kwargs)
def set_vrange(self, vmin: float, vmax: float, name: str = None):
"""
Set the range of the color bar.
@ -404,15 +483,9 @@ class BECImageShow(BECPlotBase):
Args:
vmin(float): Minimum value of the color bar.
vmax(float): Maximum value of the color bar.
name(str): The name of the image.
name(str): The name of the image. If None, apply to all images.
"""
if name is None:
for source, images in self._images.items():
for id, image in images.items():
image.set_vrange(vmin, vmax)
else:
image = self.find_widget_by_id(name)
image.set_vrange(vmin, vmax)
self.apply_setting_to_images("set_vrange", args=[vmin, vmax], kwargs={}, image_id=name)
def set_color_map(self, cmap: str, name: str = None):
"""
@ -420,21 +493,122 @@ class BECImageShow(BECPlotBase):
If name is not specified, then set color map for all images.
Args:
cmap(str): The color map of the image.
name(str): The name of the image.
name(str): The name of the image. If None, apply to all images.
"""
if name is None:
for source, images in self._images.items():
for id, image in images.items():
image.set_color_map(cmap)
else:
image = self.find_widget_by_id(name)
image.set_color_map(cmap)
self.apply_setting_to_images("set_color_map", args=[cmap], kwargs={}, image_id=name)
def set_processing(self, name: str = None, **kwargs):
"""
Set the post processing of the image.
If name is not specified, then set post processing for all images.
Args:
name(str): The name of the image. If None, apply to all images.
**kwargs: Keyword arguments for the properties to be set.
Possible properties:
- fft: bool
- log: bool
- rot: int
- transpose: bool
"""
self.apply_setting_to_images("set", args=[], kwargs=kwargs, image_id=name)
def set_image_properties(self, name: str = None, **kwargs):
"""
Set the properties of the image.
Args:
name(str): The name of the image. If None, apply to all images.
**kwargs: Keyword arguments for the properties to be set.
Possible properties:
- downsample: bool
- color_map: str
- monitor: str
- opacity: float
- vrange: tuple[int,int]
- fft: bool
- log: bool
- rot: int
- transpose: bool
"""
self.apply_setting_to_images("set", args=[], kwargs=kwargs, image_id=name)
def set_fft(self, enable: bool = False, name: str = None):
"""
Set the FFT of the image.
If name is not specified, then set FFT for all images.
Args:
enable(bool): Whether to perform FFT on the monitor data.
name(str): The name of the image. If None, apply to all images.
"""
self.apply_setting_to_images("set_fft", args=[enable], kwargs={}, image_id=name)
def set_log(self, enable: bool = False, name: str = None):
"""
Set the log of the image.
If name is not specified, then set log for all images.
Args:
enable(bool): Whether to perform log on the monitor data.
name(str): The name of the image. If None, apply to all images.
"""
self.apply_setting_to_images("set_log", args=[enable], kwargs={}, image_id=name)
def set_rotation(self, deg_90: int = 0, name: str = None):
"""
Set the rotation of the image.
If name is not specified, then set rotation for all images.
Args:
deg_90(int): The rotation angle of the monitor data before displaying.
name(str): The name of the image. If None, apply to all images.
"""
self.apply_setting_to_images("set_rotation", args=[deg_90], kwargs={}, image_id=name)
def set_transpose(self, enable: bool = False, name: str = None):
"""
Set the transpose of the image.
If name is not specified, then set transpose for all images.
Args:
enable(bool): Whether to transpose the monitor data before displaying.
name(str): The name of the image. If None, apply to all images.
"""
self.apply_setting_to_images("set_transpose", args=[enable], kwargs={}, image_id=name)
def toggle_threading(self, use_threading: bool):
"""
Toggle threading for the widgets postprocessing and updating.
Args:
use_threading(bool): Whether to use threading.
"""
self.use_threading = use_threading
if self.use_threading is False and self.thread.isRunning():
self.cleanup()
@pyqtSlot(dict)
def on_image_update(self, msg: dict):
"""
Update the image of the device monitor from bec.
Args:
msg(dict): The message from bec.
"""
data = msg["data"]
device = msg["device"]
# TODO postprocessing
image_to_update = self._images["device_monitor"][device]
processing_config = image_to_update.config.processing
self.processor.set_config(processing_config)
if self.use_threading:
print("using threaded version")
self._create_thread_worker(device, data)
else:
print("using NON-threaded version")
data = self.processor.process_image(data)
self.update_image(device, data)
@pyqtSlot(str, np.ndarray)
def update_image(self, device: str, data: np.ndarray):
"""
Update the image of the device monitor.
Args:
device(str): The name of the device.
data(np.ndarray): The data to be updated.
"""
image_to_update = self._images["device_monitor"][device]
image_to_update.updateImage(data)
@ -498,3 +672,130 @@ class BECImageShow(BECPlotBase):
self.bec_dispatcher.disconnect_slot(
self.on_image_update, MessageEndpoints.device_monitor(monitor)
)
if self.thread.isRunning():
self.thread.quit()
self.thread.wait()
class ImageProcessor:
"""
Class for processing the image data.
"""
def __init__(self, config: ProcessingConfig = None):
if config is None:
config = ProcessingConfig()
self.config = config
def set_config(self, config: ProcessingConfig):
"""
Set the configuration of the processor.
Args:
config(ProcessingConfig): The configuration of the processor.
"""
self.config = config
def FFT(self, data: np.ndarray) -> np.ndarray:
"""
Perform FFT on the data.
Args:
data(np.ndarray): The data to be processed.
Returns:
np.ndarray: The processed data.
"""
return np.abs(np.fft.fftshift(np.fft.fft2(data)))
def rotation(self, data: np.ndarray, rotate_90: int) -> np.ndarray:
"""
Rotate the data by 90 degrees n times.
Args:
data(np.ndarray): The data to be processed.
rotate_90(int): The number of 90 degree rotations.
Returns:
np.ndarray: The processed data.
"""
return np.rot90(data, k=rotate_90, axes=(0, 1))
def transpose(self, data: np.ndarray) -> np.ndarray:
"""
Transpose the data.
Args:
data(np.ndarray): The data to be processed.
Returns:
np.ndarray: The processed data.
"""
return np.transpose(data)
def log(self, data: np.ndarray) -> np.ndarray:
"""
Perform log on the data.
Args:
data(np.ndarray): The data to be processed.
Returns:
np.ndarray: The processed data.
"""
# TODO this is not final solution -> data should stay as int16
data = data.astype(np.float32)
offset = 1e-6
data_offset = data + offset
return np.log10(data_offset)
# def center_of_mass(self, data: np.ndarray) -> tuple: # TODO check functionality
# return np.unravel_index(np.argmax(data), data.shape)
def process_image(self, data: np.ndarray) -> np.ndarray:
"""
Process the data according to the configuration.
Args:
data(np.ndarray): The data to be processed.
Returns:
np.ndarray: The processed data.
"""
if self.config.fft:
data = self.FFT(data)
if self.config.rotation is not None:
data = self.rotation(data, self.config.rotation)
if self.config.transpose:
data = self.transpose(data)
if self.config.log:
data = self.log(data)
return data
class ProcessorWorker(QObject):
"""
Worker for processing the image data.
"""
processed = pyqtSignal(str, np.ndarray)
stopRequested = pyqtSignal()
finished = pyqtSignal()
def __init__(self, processor):
super().__init__()
self.processor = processor
self._isRunning = False
self.stopRequested.connect(self.stop)
@pyqtSlot(str, np.ndarray)
def process_image(self, device: str, image: np.ndarray):
"""
Process the image data.
Args:
device(str): The name of the device.
image(np.ndarray): The image data.
"""
self._isRunning = True
processed_image = self.processor.process_image(image)
self._isRunning = False
if not self._isRunning:
self.processed.emit(device, processed_image)
self.finished.emit()
def stop(self):
self._isRunning = False