20 Commits

Author SHA1 Message Date
gac-x06da
d2b18d367a Found local changes on deployment system 2024-06-20 15:50:57 +02:00
a6f8d6936e Merge branch 'fix/pre_startup' into 'main'
fix: added get config for PX-III

See merge request bec/pxiii_bec!9
2024-04-29 17:29:27 +02:00
fa52c5eee1 fix: added get config for PX-III 2024-04-29 14:00:38 +02:00
1448c6b82a Merge branch 'feature/update_script' into 'main'
feat: added auto updates for bec figures

See merge request bec/pxiii-bec!7
2024-04-24 07:28:17 +02:00
f768808776 Merge branch 'fix/file_writer' into 'main'
fix: added file writer entry point

See merge request bec/pxiii-bec!8
2024-04-23 19:09:17 +02:00
265c09d746 fix: added file writer entry point 2024-04-23 18:27:08 +02:00
cae8614cb5 feat: added auto updates for bec figures 2024-04-23 13:25:08 +02:00
be5c35a938 build: fixed ds plugin name 2024-04-23 09:56:38 +02:00
2d18332e23 Merge branch 'build/plugin_update' into 'main'
build: updated to new plugin structure

See merge request bec/pxiii-bec!6
2024-04-23 08:08:21 +02:00
b9d471361a build: added CA startup entrypoint 2024-04-22 19:33:01 +02:00
ce4d57c597 build: updated to new plugin structure 2024-04-22 19:18:27 +02:00
e81c8b1484 Merge branch 'doc-pmodule-update' into 'master'
doc: psi-python311/2024.02 is now stable

See merge request bec/pxiii-bec!5
2024-03-13 21:30:26 +01:00
1a84335bbb doc: psi-python311/2024.02 is now stable 2024-03-12 13:33:24 +01:00
e87598bc7e Merge branch 'drop-python39' into 'master'
ci: drop python/3.9

See merge request bec/pxiii-bec!4
2024-03-05 22:39:41 +01:00
106f3bed6e ci: drop python/3.9 2024-03-05 12:59:22 +01:00
1de99e47d6 Merge branch 'python39' into 'master'
transition to python/3.9

See merge request bec/pxiii-bec!3
2023-12-12 21:16:36 +01:00
0cdb8e3ca6 refactor: update to psi-python39/2021.11 in deploy script 2023-12-12 16:51:58 +01:00
3887cdb835 build: require python3.9 2023-12-12 16:36:33 +01:00
d89ba54768 Merge branch 'fix-imports' into 'master'
fix: update imports

See merge request bec/pxiii-bec!2
2023-12-08 12:45:36 +01:00
04b08b9d21 fix: update imports 2023-12-08 12:37:37 +01:00
36 changed files with 344 additions and 902 deletions

View File

@@ -1 +0,0 @@
from .bec_client import *

View File

@@ -1 +0,0 @@
from .plugins import *

View File

@@ -1,245 +0,0 @@
from bec_client.scan_manager import ScanReport
from bec_utils.devicemanager import Device
# pylint:disable=undefined-variable
# pylint: disable=too-many-arguments
def dscan(
motor1: Device, m1_from: float, m1_to: float, steps: int, exp_time: float, **kwargs
) -> ScanReport:
"""Relative line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position relative to the current position.
m1_to (float): End position relative to the current position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> dscan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=True, **kwargs
)
def d2scan(
motor1: Device,
m1_from: float,
m1_to: float,
motor2: Device,
m2_from: float,
m2_to: float,
steps: int,
exp_time: float,
**kwargs
) -> ScanReport:
"""Relative line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> d2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=True,
**kwargs
)
def ascan(motor1, m1_from, m1_to, steps, exp_time, **kwargs):
"""Absolute line scan with one device.
Args:
motor1 (Device): Device that should be scanned.
m1_from (float): Start position.
m1_to (float): End position.
steps (int): Number of steps.
exp_time (float): Exposure time.
Returns:
ScanReport: Status object.
Examples:
>>> ascan(dev.motor1, -5, 5, 10, 0.1)
"""
return scans.line_scan(
motor1, m1_from, m1_to, steps=steps, exp_time=exp_time, relative=False, **kwargs
)
def a2scan(motor1, m1_from, m1_to, motor2, m2_from, m2_to, steps, exp_time, **kwargs):
"""Absolute line scan with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
steps (int): Number of steps.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> a2scan(dev.motor1, -5, 5, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.line_scan(
motor1,
m1_from,
m1_to,
motor2,
m2_from,
m2_to,
steps=steps,
exp_time=exp_time,
relative=False,
**kwargs
)
def dmesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Relative mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device relative to its current position.
m1_to (float): End position of the first device relative to its current position.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device relative to its current position.
m2_to (float): End position of the second device relative to its current position.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> dmesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=True,
)
def amesh(motor1, m1_from, m1_to, m1_steps, motor2, m2_from, m2_to, m2_steps, exp_time, **kwargs):
"""Absolute mesh scan (grid scan) with two devices.
Args:
motor1 (Device): First device that should be scanned.
m1_from (float): Start position of the first device.
m1_to (float): End position of the first device.
m1_steps (int): Number of steps for motor1.
motor2 (Device): Second device that should be scanned.
m2_from (float): Start position of the second device.
m2_to (float): End position of the second device.
m2_steps (int): Number of steps for motor2.
exp_time (float): Exposure time
Returns:
ScanReport: Status object.
Examples:
>>> amesh(dev.motor1, -5, 5, 10, dev.motor2, -8, 8, 10, 0.1)
"""
return scans.grid_scan(
motor1,
m1_from,
m1_to,
m1_steps,
motor2,
m2_from,
m2_to,
m2_steps,
exp_time=exp_time,
relative=False,
)
def umv(*args) -> ScanReport:
"""Updated absolute move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umv(dev.samx, 1)
>>> umv(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=False)
def umvr(*args) -> ScanReport:
"""Updated relative move (i.e. blocking) for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> umvr(dev.samx, 1)
>>> umvr(dev.samx, 1, dev.samy, 2)
"""
return scans.umv(*args, relative=True)
def mv(*args) -> ScanReport:
"""Absolute move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mv(dev.samx, 1)
>>> mv(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=False)
def mvr(*args) -> ScanReport:
"""Relative move for one or more devices.
Returns:
ScanReport: Status object.
Examples:
>>> mvr(dev.samx, 1)
>>> mvr(dev.samx, 1, dev.samy, 2)
"""
return scans.mv(*args, relative=True)

View File

@@ -1 +0,0 @@

View File

@@ -1,25 +0,0 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to set up the BEC client configuration. The script is
executed in the global namespace of the BEC client. This means that all
variables defined here are available in the BEC client.
To set up the BEC client configuration, use the ServiceConfig class. For example,
to set the configuration file path, add the following lines to the script:
import pathlib
from bec_lib.core import ServiceConfig
current_path = pathlib.Path(__file__).parent.resolve()
CONFIG_PATH = f"{current_path}/<path_to_my_config_file.yaml>"
config = ServiceConfig(CONFIG_PATH)
If this startup script defined a ServiceConfig object, the BEC client will use
it to configure itself. Otherwise, the BEC client will use the default config.
"""
# example:
# current_path = pathlib.Path(__file__).parent.resolve()
# CONFIG_PATH = f"{current_path}/../../../bec_config.yaml"
# config = ServiceConfig(CONFIG_PATH)

View File

@@ -1 +0,0 @@
from .saxs_imaging_processor import SaxsImagingProcessor

View File

@@ -1,430 +0,0 @@
from __future__ import annotations
import numpy as np
import time
from queue import Queue
from typing import Optional, Tuple
from data_processing.stream_processor import StreamProcessor
from bec_lib.core import BECMessage
from bec_lib.core.redis_connector import MessageObject, RedisConnector
class SaxsImagingProcessor(StreamProcessor):
def __init__(self, connector: RedisConnector, config: dict) -> None:
""""""
super().__init__(connector, config)
self.metadata_consumer = None
self.parameter_consumer = None
self.metadata = {}
self.num_received_msgs = 0
self.queue = Queue()
self._init_parameter(endpoint="px_stream/gui_event")
self.start_parameter_consumer(endpoint="px_stream/gui_event")
self._init_metadata_and_proj_nr(endpoint="px_stream/proj_nr")
self.start_metadata_consumer(endpoint="px_stream/projection_*/metadata")
def _init_parameter(self, endpoint: str) -> None:
"""Initialize the parameters azi_angle, contrast and horiz_roi.
Args:
endpoint (str): Endpoint for redis topic.
Returns:
None
"""
self.azi_angle = None
self.horiz_roi = [20, 50]
self.contrast = 0
msg = self.producer.get(topic=endpoint)
if msg is None:
return None
msg_raw = BECMessage.DeviceMessage.loads(msg)
self._parameter_msg_handler(msg_raw)
def start_parameter_consumer(self, endpoint: str) -> None:
"""Initialize the consumers for gui_event parameters.
Consumer is started with a callback function that updates
the parameters: azi_angle, contrast and horiz_roi.
Args:
endpoint (str): Endpoint for redis topic.
Returns:
None
"""
if self.parameter_consumer and self.parameter_consumer.is_alive():
self.parameter_consumer.shutdown()
self.parameter_consumer = self._connector.consumer(
pattern=endpoint, cb=self._update_parameter_cb, parent=self
)
self.parameter_consumer.start()
@staticmethod
def _update_parameter_cb(msg: MessageObject, parent: SaxsImagingProcessor) -> None:
"""Callback function for the parameter consumer.
Args:
msg (MessageObject): Message object.
parent (SaxsImagingProcessor): Parent class.
Returns:
None
"""
msg_raw = BECMessage.DeviceMessage.loads(msg.value)
parent._parameter_msg_handler(msg_raw)
def _parameter_msg_handler(self, msg: BECMessage) -> None:
"""Handle the parameter message.
There can be updates on three different parameters:
azi_angle, contrast and horiz_roi.
Args:
msg (BECMessage): Message object.
Returns:
None
"""
if msg.content["signals"].get("horiz_roi") is not None:
self.horiz_roi = msg.content["signals"]["horiz_roi"]
if msg.content["signals"].get("azi_angles") is not None:
self.azi_angle = msg.content["signals"]["azi_angle"]
if msg.content["signals"].get("contrast") is not None:
self.contrast = msg.content["signals"]["contrast"]
# self._init_parameter_updated = True
# if len(self.metadata) > 0:
# self._update_queue(self.metadata[self.proj_nr], self.proj_nr)
def _init_metadata_and_proj_nr(self, endpoint: str) -> None:
"""Initialize the metadata and proj_nr.
Args:
endpoint (str): Endpoint for redis topic.
Returns:
None
"""
msg = self.producer.get(topic=endpoint)
if msg is None:
self.proj_nr = None
return None
msg_raw = BECMessage.DeviceMessage.loads(msg)
self.proj_nr = msg_raw.content["signals"]["proj_nr"]
# TODO hardcoded endpoint, possibe to use more general solution?
msg = self.producer.get(topic=f"px_stream/projection_{self.proj_nr}/metadata")
msg_raw = BECMessage.DeviceMessage.loads(msg)
self._update_queue(msg_raw.content["signals"], self.proj_nr)
def _update_queue(self, metadata: dict, proj_nr: int) -> None:
"""Update the process queue.
Args:
metadata (dict): Metadata for the projection.
proj_nr (int): Projection number.
Returns:
None
"""
self.metadata.update({proj_nr: metadata})
self.queue.put((proj_nr, metadata))
def start_metadata_consumer(self, endpoint: str) -> None:
"""Start the metadata consumer.
Consumer is started with a callback function that updates the metadata.
Args:
endpoint (str): Endpoint for redis topic.
Returns:
None
"""
if self.metadata_consumer and self.metadata_consumer.is_alive():
self.metadata_consumer.shutdown()
self.metadata_consumer = self._connector.consumer(
pattern=endpoint, cb=self._update_metadata_cb, parent=self
)
self.metadata_consumer.start()
@staticmethod
def _update_metadata_cb(msg: MessageObject, parent: SaxsImagingProcessor) -> None:
"""Callback function for the metadata consumer.
Args:
msg (MessageObject): Message object.
parent (SaxsImagingProcessor): Parent class.
Returns:
None
"""
msg_raw = BECMessage.DeviceMessage.loads(msg.value)
parent._metadata_msg_handler(msg_raw, msg.topic.decode())
def _metadata_msg_handler(self, msg: BECMessage, topic) -> None:
"""Handle the metadata message.
If self.metadata is larger than 10, the oldest entry is removed.
Args:
msg (BECMessage): Message object.
topic (str): Topic for the message.
Returns:
None
"""
if len(self.metadata) > 10:
first_key = next(iter(self.metadata))
self.metadata.pop(first_key)
self.proj_nr = int(topic.split("px_stream/projection_")[1].split("/")[0])
self._update_queue(msg.content["signals"], self.proj_nr)
def start_data_consumer(self) -> None:
"""function from the parent class that we don't want to use here"""
pass
def _run_forever(self) -> None:
"""Loop that runs forever when the processor is started.
Upon update of the queue, the data is loaded and processed.
This processing continues as long as the queue is empty,
and proceeds to the next projection when the queue is updated.
Returns:
None
"""
proj_nr, metadata = self.queue.get()
self.num_received_msgs = 0
self.data = None
while self.queue.empty():
start = time.time()
self._get_data(proj_nr, metadata)
start = time.time()
result = self.process(self.data, metadata)
print(f"Processing took {time.time() - start}")
if result is None:
continue
print(f"Length of data is {result[0][0]['z'].shape}")
msg = BECMessage.ProcessedDataMessage(data=result[0][0], metadata=result[1]).dumps()
print("Publishing result")
self._publish_result(msg)
def _get_data(self, proj_nr: int, metadata: dict) -> None:
"""Get data for given proj_nr from redis.
Args:
proj_nr (int): Projection number.
Returns:
list: List of azimuthal integrated data.
"""
start = time.time()
msgs = self.producer.lrange(
f"px_stream/projection_{proj_nr}/data", self.num_received_msgs, -1
)
print(f"Loading of {len(msgs)} took {time.time() - start}")
if not msgs:
return None
frame_shape = BECMessage.DeviceMessage.loads(msgs[0]).content["signals"]["data"].shape[-2:]
if self.data is None:
start = time.time()
self.data = np.empty(
(
metadata["metadata"]["number_of_rows"],
metadata["metadata"]["number_of_columns"],
*frame_shape,
)
)
print(f"Init output took {time.time() - start}")
start = time.time()
for msg in msgs:
self.data[
self.num_received_msgs : self.num_received_msgs + 1, ...
] = BECMessage.DeviceMessage.loads(msg).content["signals"]["data"]
self.num_received_msgs += 1
print(f"Casting data to array took {time.time() - start}")
def process(self, data: np.ndarray, metadata: dict) -> Optional[Tuple[dict, dict]]:
"""Process the scanning SAXS data
Args:
data (list): List of azimuthal integrated data.
metadata (dict): Metadata for the projection.
Returns:
Optional[Tuple[dict, dict]]: Processed data and metadata.
"""
if data is None:
return None
# TODO np.asarray is repsonsible for 95% of the processing time for function.
azint_data = data[0 : self.num_received_msgs, ...]
norm_sum = metadata["norm_sum"]
q = metadata["q"]
out = []
contrast = self.contrast
horiz_roi = self.horiz_roi
azi_angle = self.azi_angle
if azi_angle is None:
azi_angle = 0
f1amp, f2amp, f2phase = self._colorfulplot(
horiz_roi=horiz_roi,
q=q,
norm_sum=norm_sum,
data=azint_data,
azi_angle=azi_angle,
)
if contrast == 0:
out = f1amp
elif contrast == 1:
out = f2amp
elif contrast == 2:
out = f2phase
stream_output = {
# 0: {"x": np.asarray(x), "y": np.asarray(y), "z": np.asarray(out)},
0: {"z": np.asarray(out)},
# "input": self.config["input_xy"],
}
metadata["grid_scan"] = out.shape
return (stream_output, metadata)
def _colorfulplot(
self,
horiz_roi: list,
q: np.ndarray,
norm_sum: np.ndarray,
data: np.ndarray,
azi_angle: float,
percentile_value: int = 96,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Compute data for sSAXS colorful 2D plot.
Pending: hsv_to_rgb conversion for colorful output
Args:
horiz_roi (list): List with q edges for binning.
q (np.ndarray): q values.
norm_sum (np.ndarray): Normalization sum.
data (np.ndarray): Data to be binned.
azi_angle (float, optional): Azimuthal angle for first segment, shifts f2phase. Defaults to 0.
percentile_value (int, optional): Percentile value for removing outliers above threshold. Defaults to 96, range 0...100.
Returns:
Tuple[np.ndarray, np.ndarray, np.ndarray]: f1amp, f2amp, f2phase
"""
output, output_norm = self._bin_qrange(
horiz_roi=horiz_roi, q=q, norm_sum=norm_sum, data=data
)
output_sym = self._sym_data(data=output, norm_sum=output_norm)
output_sym = output_sym
shape = output_sym.shape[0:2]
fft_data = np.fft.rfft(output_sym.reshape((-1, output_sym.shape[-2])), axis=1)
f1amp = np.abs(fft_data[:, 0]) / output_sym.shape[2]
f2amp = 2 * np.abs(fft_data[:, 1]) / output_sym.shape[2]
f2angle = np.angle(fft_data[:, 1]) + np.deg2rad(azi_angle)
f2phase = (f2angle + np.pi) / (2 * np.pi)
f2phase[f2phase > 1] = f2phase[f2phase > 1] - 1
f1amp = f1amp.reshape(shape)
f2amp = f2amp.reshape(shape)
f2angle = f2angle.reshape(shape)
f2phase = f2phase.reshape(shape)
h = f2phase
max_scale = np.percentile(f2amp, percentile_value)
s = f2amp / max_scale
s[s > 1] = 1
max_scale = np.percentile(f1amp, percentile_value)
v = f1amp
v = v / max_scale
v[v > 1] = 1
# hsv = np.stack((h, s, v), axis=2)
# comb_all = colors.hsv_to_rgb(hsv)
return f1amp, f2amp, f2phase # , comb_all
def _bin_qrange(self, horiz_roi, q, norm_sum, data) -> Tuple[np.ndarray, np.ndarray]:
"""Reintegrate data for given q range.
Weighted sum for data using norm_sum as weights
Args:
horiz_roi (list): List with q edges for binning.
q (np.ndarray): q values.
norm_sum (np.ndarray): Normalization sum.
data (np.ndarray): Data to be binned.
Returns:
np.ndarray: Binned data.
np.ndarray: Binned normalization sum.
"""
output = np.zeros((*data.shape[:-1], len(horiz_roi) - 1))
output_norm = np.zeros((data.shape[-2], len(horiz_roi) - 1))
with np.errstate(divide="ignore", invalid="ignore"):
q_mask = np.logical_and(q >= q[horiz_roi[0]], q <= q[horiz_roi[1]])
output_norm[..., 0] = np.nansum(norm_sum[..., q_mask], axis=-1)
output[..., 0] = np.nansum(data[..., q_mask] * norm_sum[..., q_mask], axis=-1)
output[..., 0] = np.divide(
output[..., 0], output_norm[..., 0], out=np.zeros_like(output[..., 0])
)
return output, output_norm
def _sym_data(self, data, norm_sum) -> np.ndarray:
"""Symmetrize data by averaging over the two opposing directions.
Helpful to remove detector gaps for x-ray detectors
Args:
data (np.ndarray): Data to be symmetrized.
norm_sum (np.ndarray): Normalization sum.
Returns:
np.ndarray: Symmetrized data.
"""
n_directions = norm_sum.shape[0] // 2
output = np.divide(
data[..., :n_directions, :] * norm_sum[:n_directions, :]
+ data[..., n_directions:, :] * norm_sum[n_directions:, :],
norm_sum[:n_directions, :] + norm_sum[n_directions:, :],
out=np.zeros_like(data[..., :n_directions, :]),
)
return output
if __name__ == "__main__":
config = {
"output": "px_dap_worker",
}
dap_process = SaxsImagingProcessor.run(config=config, connector_host=["localhost:6379"])

View File

@@ -1,155 +0,0 @@
import os
import h5py
import numpy as np
import time
import json
import argparse
from bec_lib.core import RedisConnector, BECMessage
def load_data(datadir: str, metadata_path: str) -> tuple:
"""Load data from disk
Args:
datapath (str): Path to the data directory with data for projection (h5 files)
metadata_path (str): Path to the metadata file
Returns:
tuple: data, q, norm_sum, metadata
"""
with open(metadata_path) as file:
metadata = json.load(file)
filenames = [fname for fname in os.listdir(datadir) if fname.endswith(".h5")]
filenames.sort()
for ii, fname in enumerate(filenames):
with h5py.File(os.path.join(datadir, fname), "r") as h5file:
if ii == 0:
q = h5file["q"][...].T.squeeze()
norm_sum = h5file["norm_sum"][...]
data = np.zeros((len(filenames), *h5file["I_all"][...].shape))
data[ii, ...] = h5file["I_all"][...]
return data, q, norm_sum, metadata
def _get_projection_keys(producer) -> list:
"""Get all keys for projections with endpoint px_stream/projection_* in redis
Args:
producer (RedisProducer): Redis producer
Returns:
list: List of keys or [] if no keys are found"""
keys = producer.keys("px_stream/projection_*")
if not keys:
return []
return keys
def send_data(
data: np.ndarray,
q: np.ndarray,
norm_sum: np.ndarray,
bec_producer: RedisConnector.producer,
metadata: dict,
proj_nr: int,
) -> None:
"""Send data to redis and delete old data > 5 projections
Args:
data (np.ndarray): Data to send
q (np.ndarray): q values
norm_sum (np.ndarray): Normalization sum
bec_producer (RedisProducer): Redis producer
metadata (dict): Metadata
proj_nr (int): Projection number
Returns:
None
"""
start = time.time()
keys = _get_projection_keys(bec_producer)
pipe = bec_producer.pipeline()
proj_numbers = set(key.decode().split("px_stream/projection_")[1].split("/")[0] for key in keys)
if len(proj_numbers) > 5:
for entry in sorted(proj_numbers)[0:-5]:
for key in bec_producer.keys(f"px_stream/projection_{entry}/*"):
bec_producer.delete(topic=key, pipe=pipe)
print(f"Deleting {key}")
return_dict = {"metadata": metadata, "q": q, "norm_sum": norm_sum}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
bec_producer.set_and_publish(f"px_stream/projection_{proj_nr}/metadata", msg=msg, pipe=pipe)
return_dict = {"proj_nr": proj_nr}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
bec_producer.set_and_publish(f"px_stream/proj_nr", msg=msg, pipe=pipe)
pipe.execute()
for line in range(data.shape[0]):
return_dict = {"data": data[line, ...]}
msg = BECMessage.DeviceMessage(signals=return_dict).dumps()
print(f"Sending line {line}")
bec_producer.rpush(topic=f"px_stream/projection_{proj_nr}/data", msgs=msg)
print(f"Time to send {time.time()-start} seconds")
print(f"Rate {data.shape[0]/(time.time()-start)} Hz")
print(f"Data volume {data.nbytes/1e6} MB")
if __name__ == "__main__":
"""Start the stream simulator, defaults to px_stream/projection_* in redis on localhost:6379
Example usage:
>>> python saxs_imaging_streamsimulator.py -d ~/datadir/ -m ~/metadatafile.json -p 180 -d 30 -r localhost:6379
"""
parser = argparse.ArgumentParser()
parser.add_argument(
"-d",
"--datadir",
type=str,
help="filepath to datadir for projection files (in h5 format)",
required=True,
)
parser.add_argument(
"-m",
"--metadata",
type=str,
help="filepath to metadata json file",
required=True,
)
parser.add_argument(
"-p",
"--proj_nr",
type=int,
help="Projection number matching the data",
required=True,
)
parser.add_argument(
"-w",
"--wait_delay",
type=int,
help="delay between sending data in seconds (int)",
default=30,
)
parser.add_argument(
"-r",
"--redis",
type=str,
help="Redis_host:port",
default="localhost:6379",
)
values = parser.parse_args()
data, q, norm_sum, metadata = load_data(datadir=values.datadir, metadata_path=values.metadata)
bec_producer = RedisConnector([f"{values.redis}"]).producer()
proj_nr = values.proj_nr
delay = values.wait_delay
while True:
send_data(data, q, norm_sum, bec_producer, metadata, proj_nr=proj_nr)
time.sleep(delay)
bec_producer.delete(topic=f"px_stream/projection_{proj_nr}/data:val")

View File

@@ -4,7 +4,7 @@
BEAMLINE_REPO=gitlab.psi.ch:bec/pxiii-bec.git
git clone git@$BEAMLINE_REPO
module add psi-python38/2020.11
module add psi-python311/2024.02
# start redis
docker run --network=host --name redis-bec -d redis
@@ -27,4 +27,3 @@ pip install -e ./pxiii-bec
# start the BEC server
bec-server start --config ./pxiii-bec/deployment/bec-server-config.yaml

View File

@@ -0,0 +1,2 @@
def beam_stop_scan():
print("running beam stop scan")

View File

@@ -16,31 +16,30 @@ parse the --session argument, add the following lines to the script:
if args.session == "my_session":
print("Loading my_session session")
from bec_plugins.bec_client.plugins.my_session import *
from bec_plugins.bec_ipython_client.plugins.my_session import *
else:
print("Loading default session")
from bec_plugins.bec_client.plugins.default_session import *
from bec_plugins.bec_ipython_client.plugins.default_session import *
"""
# pylint: disable=invalid-name, unused-import, import-error, undefined-variable, unused-variable, unused-argument, no-name-in-module
import argparse
from bec_lib.core import bec_logger
from bec_lib import bec_logger
logger = bec_logger.logger
logger.info("Using the PXIII startup script.")
logger.info("Using the PX-III startup script.")
parser = argparse.ArgumentParser()
parser.add_argument("--session", help="Session name", type=str, default="cSAXS")
args = parser.parse_args()
# pylint: disable=import-error
_args = _main_dict["args"]
# SETUP BEAMLINE INFO
from bec_client.plugins.SLS.sls_info import OperatorInfo, SLSInfo
_session_name = "PX-III"
if _args.session.lower() == "alignment":
# load the alignment session
_session_name = "Alignment"
logger.success("Alignment session loaded.")
bec._beamline_mixin._bl_info_register(SLSInfo)
bec._beamline_mixin._bl_info_register(OperatorInfo)
# SETUP PROMPTS
bec._ip.prompts.username = "PXIII"
bec._ip.prompts.username = _session_name
bec._ip.prompts.status = 1

View File

@@ -0,0 +1,23 @@
"""
Pre-startup script for BEC client. This script is executed before the BEC client
is started. It can be used to add additional command line arguments.
"""
from bec_lib import ServiceConfig
def extend_command_line_args(parser):
"""
Extend the command line arguments of the BEC client.
"""
parser.add_argument("--session", help="Session name", type=str, default="PX-III")
return parser
def get_config() -> ServiceConfig:
"""
Create and return the service configuration.
"""
return ServiceConfig(redis={"host": "x06da-bec-001", "port": 6379})

View File

@@ -0,0 +1 @@
from .auto_updates import PlotUpdate

View File

@@ -0,0 +1,39 @@
from bec_widgets.cli import AutoUpdates, ScanInfo
class PlotUpdate(AutoUpdates):
# def simple_line_scan(self, info: ScanInfo) -> None:
# """
# Simple line scan.
# """
# dev_x = info.scan_report_devices[0]
# dev_y = self.get_selected_device(info.monitored_devices, self.figure.selected_device)
# if not dev_y:
# return
# self.figure.clear_all()
# plt = self.figure.plot(dev_x, dev_y)
# plt.set(title=f"PXIII: Scan {info.scan_number}", x_label=dev_x, y_label=dev_y)
def px_default_line_scan(self, info:ScanInfo):
dev_x = info.scan_report_devices[0]
dev_y = "bpm4i"
self.figure.clear_all()
plt = self.figure.plot(dev_x, dev_y)
plt.set(title="Custom line scan with samy")
def handler(self, info: ScanInfo) -> None:
# EXAMPLES:
# if info.scan_name == "line_scan" and info.scan_report_devices:
# self.simple_line_scan(info)
# return
# if info.scan_name == "grid_scan" and info.scan_report_devices:
# self.simple_grid_scan(info)
# return
if info.scan_report_devices and info.scan_report_devices[0] == "samy":
self.px_default_line_scan(info)
return
super().handler(info)

View File

View File

View File

@@ -0,0 +1,11 @@
import os
def setup_epics_ca():
os.environ["EPICS_CA_AUTO_ADDR_LIST"] = "NO"
os.environ["EPICS_CA_ADDR_LIST"] = "129.129.122.255 sls-x12sa-cagw.psi.ch:5836"
os.environ["PYTHONIOENCODING"] = "latin1"
def run():
setup_epics_ca()

View File

View File

View File

View File

78
pyproject.toml Normal file
View File

@@ -0,0 +1,78 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "pxiii_bec"
version = "0.0.0"
description = "Custom device implementations based on the ophyd hardware abstraction layer"
requires-python = ">=3.10"
classifiers = [
"Development Status :: 3 - Alpha",
"Programming Language :: Python :: 3",
"Topic :: Scientific/Engineering",
]
dependencies = [
"bec_ipython_client",
"bec_lib",
"bec_server",
"ophyd_devices",
"std_daq_client",
"rich",
"pyepics",
]
[project.optional-dependencies]
dev = [
"black",
"isort",
"coverage",
"pylint",
"pytest",
"pytest-random-order",
"pytest-redis",
]
[project.entry-points."bec"]
plugin_bec = "pxiii_bec"
[project.entry-points."bec.deployment.device_server"]
plugin_ds_startup = "pxiii_bec.deployment.device_server.startup:run"
[project.entry-points."bec.file_writer"]
plugin_file_writer = "pxiii_bec.file_writer"
[project.entry-points."bec.scans"]
plugin_scans = "pxiii_bec.scans"
[project.entry-points."bec.ipython_client_startup"]
plugin_ipython_client_pre = "pxiii_bec.bec_ipython_client.startup.pre_startup"
plugin_ipython_client_post = "pxiii_bec.bec_ipython_client.startup"
[project.entry-points."bec.widgets.auto_updates"]
plugin_widgets_update = "pxiii_bec.bec_widgets.auto_updates:PlotUpdate"
[tool.hatch.build.targets.wheel]
include = ["*"]
[tool.isort]
profile = "black"
line_length = 100
multi_line_output = 3
include_trailing_comma = true
[tool.black]
line-length = 100
skip-magic-trailing-comma = true
[tool.pylint.basic]
# Good variable names regexes, separated by a comma. If names match any regex,
# they will always be accepted
good-names-rgxs = [
".*scanID.*",
".*RID.*",
".*pointID.*",
".*ID.*",
".*_2D.*",
".*_1D.*",
]

View File

@@ -1,21 +0,0 @@
[metadata]
name = bec_plugins
description = BEC plugins to modify the behaviour of services within the BEC framework
long_description = file: README.md
long_description_content_type = text/markdown
url = https://gitlab.psi.ch/bec/bec
project_urls =
Bug Tracker = https://gitlab.psi.ch/bec/bec/issues
classifiers =
Programming Language :: Python :: 3
Development Status :: 3 - Alpha
Topic :: Scientific/Engineering
[options]
package_dir =
= .
packages = find:
python_requires = >=3.8
[options.packages.find]
where = .

View File

@@ -1,7 +0,0 @@
from setuptools import setup
if __name__ == "__main__":
setup(
install_requires=[],
extras_require={"dev": ["pytest", "pytest-random-order", "coverage"]},
)

22
start_bec_client.py Normal file
View File

@@ -0,0 +1,22 @@
from bec_ipython_client import BECIPythonClient
from bec_lib import bec_logger, ServiceConfig
logger = bec_logger.logger
bec_logger.level = bec_logger.LOGLEVEL.SUCCESS
config = ServiceConfig(redis={"host":"x06da-bec-001", "port":6379})
bec = BECIPythonClient(config=config)
bec.start()
bec.load_high_level_interface("spec_hli")
dev = bec.device_manager.devices
scans = bec.scans
logger.success("Started BECClient")
##########################################
from pxiii_bec.bec_ipython_client.plugins.beam_stop_scan import beam_stop_scan
beam_stop_scan()

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).

View File

@@ -0,0 +1,31 @@
# Getting Started with Testing using pytest
BEC is using the [pytest](https://docs.pytest.org/en/8.0.x/) framework.
It can be install via
``` bash
pip install pytest
```
in your *python environment*.
We note that pytest is part of the optional-dependencies `[dev]` of the plugin package.
## Introduction
Tests in this package should be stored in the `tests` directory.
We suggest to sort tests of different submodules, i.e. `scans` or `devices` in the respective folder structure, and to folow a naming convention of `<test_module_name.py>`.
To run all tests, navigate to the directory of the plugin from the command line, and run the command
``` bash
pytest -v --random-order ./tests
```
Note, the python environment needs to be active.
The additional arg `-v` allows pytest to run in verbose mode which provides more detailed information about the tests being run.
The argument `--random-order` instructs pytest to run the tests in random order, which is the default in the CI pipelines.
## Test examples
Writing tests can be quite specific for the given function.
We recommend writing tests as isolated as possible, i.e. try to test single functions instead of full classes.
A very useful class to enable isolated testing is [MagicMock](https://docs.python.org/3/library/unittest.mock.html).
In addition, we also recommend to take a look at the [How-to guides from pytest](https://docs.pytest.org/en/8.0.x/how-to/index.html).