diff --git a/ophyd_devices/__init__.py b/ophyd_devices/__init__.py index 54723f6..729f684 100644 --- a/ophyd_devices/__init__.py +++ b/ophyd_devices/__init__.py @@ -8,7 +8,7 @@ from .sim.sim_monitor import SimMonitor, SimMonitorAsync SynAxisMonitor = SimMonitor SynGaussBEC = SimMonitor -from .sim.sim_positioner import SimPositioner +from .sim.sim_positioner import SimLinearTrajectoryPositioner, SimPositioner SynAxisOPAAS = SimPositioner from .sim.sim_flyer import SimFlyer diff --git a/ophyd_devices/sim/sim_positioner.py b/ophyd_devices/sim/sim_positioner.py index 15798ab..eb2f03c 100644 --- a/ophyd_devices/sim/sim_positioner.py +++ b/ophyd_devices/sim/sim_positioner.py @@ -12,6 +12,7 @@ from ophyd_devices.sim.sim_data import SimulatedPositioner from ophyd_devices.sim.sim_exception import DeviceStop from ophyd_devices.sim.sim_signals import ReadOnlySignal, SetableSignal from ophyd_devices.sim.sim_test_devices import DummyController +from ophyd_devices.sim.sim_utils import LinearTrajectory, stop_trajectory logger = bec_logger.logger @@ -219,6 +220,40 @@ class SimPositioner(Device, PositionerBase): return "mm" +class SimLinearTrajectoryPositioner(SimPositioner): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def _move_and_finish(self, start_pos, end_pos, st): + success = True + acc_time = ( + self.acceleration.get() + ) # acceleration in Ophyd refers to acceleration time in seconds + vel = self.velocity.get() + acc = abs(vel / acc_time) + traj = LinearTrajectory(start_pos, end_pos, vel, acc) + + try: + while not traj.ended: + ttime.sleep(1 / self.update_frequency) + self._update_state(traj.position()) + if self._stopped: + break + if self._stopped: + # simulate deceleration + traj = stop_trajectory(traj) + while not traj.ended: + ttime.sleep(1 / self.update_frequency) + self._update_state(traj.position()) + self._update_state(traj.position()) + except DeviceStop: + success = False + finally: + self._set_sim_state(self.motor_is_moving.name, 0) + self._done_moving(success=success) + st.set_finished() + + class SimPositionerWithCommFailure(SimPositioner): fails = Cpt(SetableSignal, value=0) diff --git a/ophyd_devices/sim/sim_utils.py b/ophyd_devices/sim/sim_utils.py index 6420ccf..38aca43 100644 --- a/ophyd_devices/sim/sim_utils.py +++ b/ophyd_devices/sim/sim_utils.py @@ -1,8 +1,11 @@ +import math import os +import time from pathlib import Path import h5py import hdf5plugin +import numpy as np class H5Writer: @@ -36,3 +39,121 @@ class H5Writer: """Write data to h5 file""" with h5py.File(self.file_path, "w") as h5_file: h5_file.create_dataset(self.h5_entry, data=self.data_container, **hdf5plugin.LZ4()) + + +class LinearTrajectory: + def __init__( + self, initial_position, final_position, max_velocity, acceleration, initial_time=None + ): + self.initial_position = initial_position + self.final_position = final_position + self.max_velocity = abs(max_velocity) + self.acceleration = abs(acceleration) + self.initial_time = initial_time if initial_time is not None else time.time() + self._velocity_profile = [] + self.ended = False + + self.distance = self.final_position - self.initial_position + self.direction = np.sign(self.distance) + self.distance = abs(self.distance) + + # Calculate time to reach max velocity and the distance covered during this time + self.time_to_max_velocity = self.max_velocity / self.acceleration + self.distance_to_max_velocity = 0.5 * self.acceleration * self.time_to_max_velocity**2 + + if self.distance < 2 * self.distance_to_max_velocity: + # If the distance is too short to reach max velocity (triangular profile) + self.time_accel = np.sqrt(self.distance / self.acceleration) + self.time_decel = self.time_accel + self.time_const_vel = 0 + self.total_time = 2 * self.time_accel + else: + # If the distance allows reaching max velocity (trapezoidal profile) + self.time_accel = self.time_to_max_velocity + self.time_decel = self.time_to_max_velocity + self.time_const_vel = ( + self.distance - 2 * self.distance_to_max_velocity + ) / self.max_velocity + self.total_time = self.time_accel + self.time_const_vel + self.time_decel + + def _get_velocity_at_time(self, dt): + if dt <= self.time_accel: + # Acceleration phase + return self.direction * self.acceleration * dt + elif dt <= self.time_accel + self.time_const_vel: + # Constant velocity phase + return self.direction * self.max_velocity + elif dt <= self.total_time: + # Deceleration phase + return self.direction * self.acceleration * (self.total_time - dt) + else: + return 0 + + def _get_pos_at_time(self, dt): + if dt <= self.time_accel: + # Acceleration phase + return self.initial_position + self.direction * 0.5 * self.acceleration * dt**2 + elif dt <= self.time_accel + self.time_const_vel: + # Constant velocity phase + return self.initial_position + self.direction * ( + self.distance_to_max_velocity + self.max_velocity * (dt - self.time_accel) + ) + elif dt <= self.total_time: + # Deceleration phase + return ( + self.final_position + - self.direction * 0.5 * self.acceleration * (self.total_time - dt) ** 2 + ) + else: + return self.final_position + + def position(self, t=None): + if t is None: + t = time.time() + dt = t - self.initial_time + + if dt < 0: + raise ValueError("Time cannot be before initial time.") + + current_position = self._get_pos_at_time(dt) + current_velocity = self._get_velocity_at_time(dt) + + self._velocity_profile.append([t, current_velocity]) + + if dt > self.total_time: + self.ended = True + + return current_position + + @property + def velocity_profile(self): + return np.array(self._velocity_profile) + + +def stop_trajectory(trajectory, stop_time=None): + """Return a trajectory that starts to decelerate at stop_time, + with same characteristics as given trajectory""" + if stop_time is None: + stop_time = time.time() + # Calculate current position and velocity at the stop time + dt = stop_time - trajectory.initial_time + current_position = trajectory._get_pos_at_time(dt) + current_velocity = trajectory._get_velocity_at_time(dt) + + dec_time = abs(current_velocity / trajectory.acceleration) + decel_distance = trajectory.direction * ( + current_velocity * current_velocity / (2 * trajectory.acceleration) + ) + + # Create a new trajectory from current position-decel_distance to a stop at current position+decel_distance + new_trajectory = LinearTrajectory( + current_position - decel_distance, + current_position + decel_distance, + abs(current_velocity), + trajectory.acceleration, + stop_time - dec_time, + ) + # Keep velocity profile data, so it is possible to check the move + new_trajectory._velocity_profile = trajectory._velocity_profile[:] + + return new_trajectory