done major restructuring and futurproving

This commit is contained in:
2025-07-14 18:00:39 +02:00
parent df3bcf0ed9
commit f6d1485570
14 changed files with 1193 additions and 178 deletions

6
Config/config.json Normal file
View File

@@ -0,0 +1,6 @@
{
"Number_of_cycles": 40,
"Amplitude_mm": 10,
"Time_in_beam_s": 5,
"Time_out_of_beam_s": 10
}

5
Config/paths.json Normal file
View File

@@ -0,0 +1,5 @@
{
"meas_scripts_dir": 'C:\Users\berti_r\Python_Projects\metrology\metrology'
"meas_scripts_dir_local": r"C:\Users\berti_r\Python_Projects\StagePerformaceDocu\Scripts"
"config_path": r"C:\Users\berti_r\Python_Projects\StagePerformaceDocu\Config\config.json"
}

View File

@@ -0,0 +1,43 @@
import sys
from pathlib import Path
import threading
import queue
import paho.mqtt.client as mqtt
import time
# Global variable to store received message
received_data = None
def __on_connect(client, userdata, flags, rc):
print("Connected with result code " + str(rc))
client.subscribe("test/topic") # Change this to your topic
def __on_message(client, userdata, msg):
global received_data
received_data = msg.payload.decode()
#TODO: Write data to pickle and in queue
print(f"Message received on {msg.topic}: {received_data}")
def __mqtt_thread():
client = mqtt.Client()
client.on_connect = __on_connect
client.on_message = __on_message
client.connect("broker.hivemq.com", 1883, 60) # Replace with your broker address and port
client.loop_forever()
def start_mqtt_in_thread():
thread = threading.Thread(target=__mqtt_thread, daemon=True)
thread.start()
# Example usage
if __name__ == "__main__":
start_mqtt_in_thread()
# Simulate main thread work
while True:
if received_data:
print(f"Main thread sees data: {received_data}")
received_data = None # Reset after handling
time.sleep(1)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

202
Scripts/ad.py Normal file
View File

@@ -0,0 +1,202 @@
# -*- coding: utf-8 -*-
"""
Camera module for the PCO cameras.
"""
import numpy as np
import time
from epics import PV
# from tomoalign.camera import camera
import camera
class AD(camera.Camera):
"""
Camera class for the GreyPoint camera.
"""
# Direction of increasing row pixel index with respect to lab X-axis.
pixel_x_dir = -1
# Direction of increasing column pixel index with respect to lab Y-axis.
pixel_y_dir = -1
def __init__(self, prefix="X02DA-PG-USB:cam1:"):
"""
Initialize the camera class
Parameters
----------
prefix : str, optional
The PV prefix for the GP camera.
(default = "X02DA-PG-USB:cam1:")
"""
self.prefix = prefix
self.pv_cam_run = PV('{}Acquire'.format(self.prefix))
self.pv_cam_exposure = PV('{}AcquireTime'.format(self.prefix))
self.pv_im_array = PV('{}ArrayData'.format("X02DA-PG-USB:image1:"))
self.pv_im_height = PV('{}ArraySize1_RBV'.format("X02DA-PG-USB:image1:"))
self.pv_im_width = PV('{}ArraySize0_RBV'.format("X02DA-PG-USB:image1:"))
def start(self, wait=True):
"""
Start the camera acquisition process.
Parameters
----------
wait : bool, optional
Whether to wait for the camera to acquire images and the minimum
wait time. (default=True)
"""
if not self.is_running():
_start_time = time.time()
# self.pv_cam_run.put(1, wait=True)
self.pv_cam_run.put(1, wait=False)
if wait:
while not self.is_running():
if(time.time() - _start_time) > 20:
raise Warning("PCO camera did not start " + \
"acquiring images for the last 20 seconds. " + \
"Giving up...")
time.sleep(0.1)
wt = self.get_wait_time()
et = time.time() - _start_time
if et < wt:
time.sleep(wt - et)
def stop(self, wait=True):
"""
Stop the camera acquisition process.
Parameters
----------
wait : bool, optional
Whether to wait for the camera to stop acquiring images and the
minimum wait time. (default=True)
"""
if self.is_running():
_start_time = time.time()
# self.pv_cam_run.put(0, wait=True)
self.pv_cam_run.put(0, wait=False)
if wait:
while self.is_running():
if(time.time() - _start_time) > 20:
raise Warning("PCO camera did not stop " + \
"acquiring images for the last 20 seconds. " + \
"Giving up...")
time.sleep(0.1)
wt = self.get_wait_time()
et = time.time() - _start_time
if et < wt:
time.sleep(wt - et)
def get_exposure_time(self):
"""
Return the camera exposure time.
"""
return self.pv_cam_exposure.get()
def get_image(self, wait_for_new_frame=True):
"""
Run the camera and return an image data array.
Caution
-------
The timing of the image data collection is asynchronous!
I.e., the timing is not deterministically coupled to the call of
:meth:`get_image`!
We simply monitor here the EPICS waveform with the image data and will
wait to receive the next image that is published to the PV from the
continuously acquiring camera.
Parameters
----------
wait_for_new_frame : bool, optional
Wait for a new image data array to arrive after the call to
:meth:`get_image` has been issued. This ensures that the data
retrieved from the EPICS waveform with the image array data is not
outdated. (default = True)
Returns
-------
im : array-like
The image data
"""
self.start()
if wait_for_new_frame:
# check the timestamp of the image EPICS PV and wait for an update
# automatic auto-monitoring of the PV is disabled due to the large
# array size --> need to update the timestamp manually with
# get_timevars().
self.pv_im_array.get_timevars()
ts = self.pv_im_array.timestamp
changed = False
while not changed:
while not self.pv_im_array.timestamp > ts:
# wait for an update of the PV
time.sleep(0.05)
self.pv_im_array.get_timevars()
ts = self.pv_im_array.timestamp
changed = True
return self.get_image_data()
def get_image_data(self):
"""
Return the image data array from the camera.
Note that for the PCO cameras, the camera itself must be aquiring
images for this function to be able to grab a current frame from the
preview stream.
Caution
-------
The method will return the currently available array data from the
EPICS waveform record. If the camera is not running presently, this
data could potentially be very outdated!
The data is returned immediately, the method will not wait for new
data to arrive.
Returns
-------
im : array-like
The image data
See also
--------
get_image : start camera if it is not running and grab the image data
"""
image_size = [int(self.pv_im_height.get()),
int(self.pv_im_width.get())]
image = self.pv_im_array.get().astype('uint16')
image = np.asarray(image.reshape(image_size))
return image
def is_running(self):
"""
Check if the camera is running.
"""
return bool(self.pv_cam_run.get(as_string=False))
def get_wait_time(self):
"""
Return the minimum wait time in seconds required after any
configuration change (motor motion, camera start, etc.) to ensure a
valid image is returned.
"""
return (self.get_exposure_time() / 1000.0) + 0.1

73
Scripts/camera.py Normal file
View File

@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""
Generic camera class for alignment purposes.
Note
----
This class should eventually be merged with a corresponding class used for data
acquisition (tomodaq package). For now, implement the bare necessities to get
started on the alignment procedures.
"""
class Camera:
"""
Generic camera class.
All hardware-specific implementations of camera classes should be
subclassed from this generic class.
"""
# Direction of increasing row pixel index with respect to lab X-axis.
pixel_x_dir = 1
# Direction of increasing column pixel index with respect to lab Y-axis.
pixel_y_dir = -1
def __init__(self):
"""
Initialize the camera class
"""
pass
def start(self, wait=True):
"""
Start the camera acquisition process.
Parameters
----------
wait : bool, optional
Whether to wait for the camera to acquire images and the minimum
wait time. (default=True)
"""
raise NotImplementedError()
def get_exposure_time(self):
"""
Return the camera exposure time.
"""
raise NotImplementedError()
def get_image(self):
"""
Return the image array from the camera.
"""
raise NotImplementedError()
def get_wait_time(self):
"""
Return the minimum timeout after starting the camera to
"""
raise NotImplementedError()
def is_running(self):
"""
Check if the camera is running.
"""
raise NotImplementedError()

85
Scripts/image_analysis.py Normal file
View File

@@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
"""
Module defining various functions for basic image analysis.
Convenience functions for directly grabing a new image and running the fitting
on this image are provided.
"""
import matplotlib.pyplot as plt
import scipy.ndimage as ndi
import skimage.filters
def image_center_of_mass(image=None, binarize=True,
apply_threshold=True, threshold=None, median_filter=True,
plot=True, verbose=True):
"""
Calculate the center of mass (COM) of the intensity distribution in an image.
Parameters
----------
image : 2D array-like or None, optional
The image data for which the COM is to be calculated.
binarize : bool, optional
If set to True, the COM is calculated on the binarized image. The
binarization is calculated with the given `threshold`. (default=False)
apply_threshold : bool, optional
If set to True, the image is masked with a thresholded image first to
remove a constant background before calculating the COM. Setting has no
effect if `binarize` is True. (default = True)
threshold : float or None, optional
If a value is given, it will be used as the threshold value. If set to
None, the threshold is determined automatically based on the Li
method. (default = None)
median_filter : bool, optional
Apply a median filter before calculating the COM. This helps if the
images are noisy. (default = True)
plot : bool, optional
If True, the function will attempt to plot the result (depends on
whether matplotlib is available) (default = True)
verbose : bool, optional
If True, output some information along the way. (default = True)
"""
if image is None:
print('WARNING: An image is needed!')
return
if median_filter:
image = skimage.filters.median(image)
if binarize or apply_threshold:
if threshold is None:
thrsh = skimage.filters.threshold_li(image)
else:
thrsh = threshold
im_mask = image > thrsh
if verbose:
print(f"Image thresholded at {thrsh:.1f}")
if binarize:
image = im_mask
elif apply_threshold:
image = image * im_mask
com_y, com_x = ndi.center_of_mass(image)
if verbose:
print(f"Center of mass (h, v): {com_x:.2f}, {com_y:.2f}")
if plot:
plt.figure()
plt.imshow(image)
plt.axis('equal')
plt.axis('tight')
x_min, x_max = plt.xlim()
y_min, y_max = plt.ylim()
plt.vlines(com_x, y_min, y_max, linewidth=1, color='r',
label=f'horizontal COM: {com_x:.2f}')
plt.hlines(com_y, x_min, x_max, linewidth=1, color='g',
label=f'vertical COM: {com_y:.2f}')
plt.legend()
plt.show()
return com_x, com_y

View File

@@ -0,0 +1,232 @@
import time
import os
from time import sleep
import matplotlib.pyplot as plt
from pathlib import Path
import sys
#error chatchign and hard code catch stuff
def check_path(path_str):
try:
path = Path(path_str)
if not path.exists():
raise FileNotFoundError(f"Path does not exist: {path_str}")
print(f"Path exists: {path_str}")
except FileNotFoundError as e:
print(f"Error: {e}")
library_path = r"C:\Users\berti_r\Python_Projects\templates\motion_libs"
check_path(library_path)
sys.path.append(library_path)
import motionFunctionsLib as mfl
from PIL import Image
import numpy as np
from image_analysis import image_center_of_mass
from utils import get_datestr, get_timestr
import ad
workdir = \
os.path.expanduser(rf'C:\Users\berti_r\Python_Projects\metrology\metrology\Data{get_datestr()}_alignment_tests')
if not os.path.exists(workdir):
os.makedirs(workdir)
# connect to PLC using NetId and PLC Port (from AIK in the TwinCat)
plc = mfl.plc('5.17.17.136.1.1', 852)
plc.connect()
axis1 = mfl.axis(plc, 1)
#insert try catch later
def run_repeatability_series(
motor_pv_prefix, ntries=100, distance=None, direction=1,
meas_pos=None, settling_time=0.0, save_images=True, run_analysis=True):
#improv
if os.getenv("EPICS_CA_ADDR_LIST") is not None:
pass
else:
os.environ["EPICS_CA_ADDR_LIST"] = "129.129.181.64"
camera = ad.AD()
pixel_size = 1.1
savedir = os.path.join(workdir,
f'{get_timestr()}_repeatibility_{motor_pv_prefix}')
savefile = os.path.join(savedir,
f'repeatibility_{motor_pv_prefix}.dat')
os.makedirs(savedir)
camera.start()
#enable axis clean up later
axis1.setAcceleration(10000.0)
axis1.setDeceleration(20000.0)
axis1.setVelocity(-3)
axis1.disableAxis()
sleep(1)
axis1.enableAxis()
sleep(1)
for i in range(ntries):
#---------------------------------------------move------------------------------------------
axis1.moveRelativeAndWait(-1)
sleep(0.1)
axis1.moveRelativeAndWait(1)
sleep(1)
# mot.move(meas_pos - np.sign(direction) * distance, wait=True)
# time.sleep(0.1)
# start_pos_rbv = mot.get_position(readback=True)
start_pos_rbv = 4
# mot.move(meas_pos, wait=True)
# time.sleep(settling_time)
# meas_pos_rbv = mot.get_position(readback=True)
meas_pos_rbv = 5
#---------------------------------------------capture------------------------------------------
im = camera.get_image()
com_x, com_y = image_center_of_mass(im, plot=False, verbose=False)
data_str = " {:6d} {:18f} {:18f} {:8.3f} {:8.3f} {:14.3f}\n".format(
i, start_pos_rbv, meas_pos_rbv, com_x, com_y, time.time())
# data_str = " {:6d} {:8.3f} {:8.3f} {:14.3f}\n".format(
# i, com_x, com_y, time.time())
print(data_str, end='')
with open(savefile, 'a') as fh:
fh.write(data_str)
if save_images:
imobj = Image.fromarray(im)
imfile = os.path.join(savedir,
f'im_{i:05d}.tif')
imobj.save(imfile)
if run_analysis:
print("")
analyze_repeatability(savefile, pixel_size=pixel_size, units='um')
axis1.disableAxis()
def analyze_repeatability(input_file, pixel_size, units='um'):
"""
Analyze and plot the result of a repeatability scan.
Parameters
----------
input_file : str
The data file containing the scan data
pixel_size : float
The effective image pixel size in the same units as the motor positions
units : str, optional
The string to use for the units. (default = 'um')
Note
----
The positive motor directions and the positive pixel directions used in the
analysis may not be the same! E.g., in the Y-direction, pixels are counted
from the top towards the bottom, while positive motion direction is towards
the top.
"""
index, start_pos, meas_pos, com_x, com_y, ts = np.loadtxt(
input_file, unpack=True)
com_x = com_x * pixel_size
com_y = com_y * pixel_size
pos_dir = start_pos < meas_pos
neg_dir = start_pos > meas_pos
bidir = np.any(pos_dir) and np.any(neg_dir)
t = ts - ts[0]
def calc_repeatability(com_vals):
mean_val = np.mean(com_vals)
p2v_val = com_vals.max() - com_vals.min()
sigma_val = np.std(com_vals)
rms_val = np.sqrt(np.mean(np.square(com_vals - mean_val)))
return (mean_val, p2v_val, sigma_val, rms_val)
mean_x, p2v_x, sigma_x, rms_x = calc_repeatability(com_x)
mean_y, p2v_y, sigma_y, rms_y = calc_repeatability(com_y)
if bidir:
mean_x_pos, p2v_x_pos, sigma_x_pos, rms_x_pos = calc_repeatability(
com_x[pos_dir])
mean_x_neg, p2v_x_neg, sigma_x_neg, rms_x_neg = calc_repeatability(
com_x[neg_dir])
mean_y_pos, p2v_y_pos, sigma_y_pos, rms_y_pos = calc_repeatability(
com_y[pos_dir])
mean_y_neg, p2v_y_neg, sigma_y_neg, rms_y_neg = calc_repeatability(
com_y[neg_dir])
result_str = "Repeatability results\n"
result_str += "---------------------\n"
result_str += ("{:10s} {:>12s} {:>12s} {:>12s} {:>12s}\n".format(
'Direction', 'Peak2valley', 'RMS', '1-sima', '3-sigma'
))
result_str += ("{:10s} {:12f} {:12f} {:12f} {:12f}\n".format(
'X', p2v_x, rms_x, sigma_x, 3*sigma_x))
if bidir:
result_str += ("{:10s} {:12f} {:12f} {:12f} {:12f}\n".format(
'X-pos', p2v_x_pos, rms_x_pos, sigma_x_pos, 3*sigma_x_pos))
result_str += ("{:10s} {:12f} {:12f} {:12f} {:12f}\n".format(
'X-neg', p2v_x_neg, rms_x_neg, sigma_x_neg, 3*sigma_x_neg))
result_str += ("\n")
result_str += ("{:10s} {:12f} {:12f} {:12f} {:12f}\n".format(
'Y', p2v_y, rms_y, sigma_y, 3*sigma_y))
if bidir:
result_str += ("{:10s} {:12f} {:12f} {:12f} {:12f}\n".format(
'Y-pos', p2v_y_pos, rms_y_pos, sigma_y_pos, 3*sigma_y_pos))
result_str += ("{:10s} {:12f} {:12f} {:12f} {:12f}\n".format(
'Y-neg', p2v_y_neg, rms_y_neg, sigma_y_neg, 3*sigma_y_neg))
print(result_str)
result_file = os.path.splitext(input_file)[0] + "_results.dat"
with open(result_file, 'w') as fh:
fh.write(result_str)
print(f"Results saved in: {result_file:s}")
def plot_repeatability(
ax, index, com, mean, p2v, rms, pos_mask, neg_mask):
"""
Plot the repeatability graph
"""
ax.plot(t, (com - mean), ':k', linewidth=1)
ax.plot(t[pos_mask], (com - mean)[pos_mask], 'or',
label='meas. pos. (+)')
ax.plot(t[neg_mask], (com - mean)[neg_mask], 'ob',
label='meas. pos. (-)')
ax.hlines([com.max()-mean, com.min()-mean], t.min(), t.max(),
linestyle='-.', color='k',
label=f'p-2-v: {p2v:.3f} {units:s}', linewidth=1)
ax.hlines([rms/2.0, -rms/2.0], t.min(), t.max(),
linestyle=':', color='k',
label=f'RMS: {rms:.3f} {units:s}', linewidth=1)
ax.set_xlabel("Measurement time [s]")
ax.set_ylabel(f"Pos. fluctuations [{units:s}]")
ax.legend()
fig, (ax1, ax2) = plt.subplots(2,1,figsize=[6,8])
plot_repeatability(
ax1, index, com_x, mean_x, p2v_x, rms_x, pos_dir, neg_dir)
plot_repeatability(
ax2, index, com_y, mean_y, p2v_y, rms_y, pos_dir, neg_dir)
ax1.set_title("Repeatability in X-direction")
ax2.set_title("Repeatability in Y-direction")
fig.tight_layout()
plt.show()
plot_file = os.path.splitext(input_file)[0] + ".pdf"
plt.savefig(plot_file)
print(f"Plot saved in {plot_file:s}")
#run_repeatability_series(0,1)

328
Scripts/utils.py Normal file
View File

@@ -0,0 +1,328 @@
# -*- coding: utf-8 -*-
"""
Module defining utility functions for the alignment package.
"""
import datetime
import os
import sys
from epics import caget, Motor
sys.path.insert(0, '/sls/X02DA/applications/tomodaq')
"""
from tomodaq.positioner.rotations.aerotechA3200 import AerotechA3200
from tomodaq.positioner.rotations.aerotechAutomation1 import AerotechAutomation1"""
def get_timestr():
"""
Return the current timestamp as a string.
"""
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def get_datestr():
"""
Return the current date as a string.
"""
return datetime.datetime.now().strftime("%Y%m%d")
def get_beamline_config():
"""
Return a dictionary with the current beamline configuration.
Returns
-------
beamline_config : dict
The current beamline configuration in a dictionary.
"""
es = caget('X02DA-ES1-CAM1:ENDST_SEL', as_string=True)
cam_server = caget('X02DA-ES1-CAM1:SERV_SEL', as_string=True)
camera = caget('X02DA-ES1-CAM1:CAM_SEL', as_string=True)
microscope = caget('X02DA-ES1-MS:MS_SEL', as_string=False)
microscope_name = caget('X02DA-ES1-MS:MS_SEL', as_string=True)
magnification = caget('X02DA-ES1-MS:MAGNF')
pixel_size = caget('X02DA-ES1-CAM1:ACT_PIXL_SIZE')
scintillator = caget('X02DA-ES1-MS1:SCINTIL', as_string=True)
beam_mode = caget('X02DA-OP:BEAM_MODE', as_string=True)
energy = caget('X02DA-OP-ENE:ACTUAL')
conf = {
'end_station': es,
'camera_server': cam_server,
'camera': camera,
'microscope': microscope,
'microscope_name': microscope_name,
'magnification': magnification,
'pixel_size': pixel_size,
'scintillator': scintillator,
'beam_mode': beam_mode,
'beam_energy': energy,
}
return conf
def get_camera(bl_config):
"""
Returns an instance of the correct camera class according to the
current beamline configuration.
"""
if bl_config['camera'].startswith("PCO."):
from tomoalign.camera import pco
if bl_config['camera_server'] == "Server 1":
cam_prefix = "X02DA-CCDCAM1:"
elif bl_config['camera_server'] == "Server 2":
cam_prefix = "X02DA-CCDCAM2:"
else:
raise ValueError("Unknown camera server: " \
"{}".format(bl_config['camera_server']))
camera = pco.PCO(prefix=cam_prefix)
elif bl_config['camera'] == "Gigafrost 1":
from tomoalign.camera import gigafrost
camera = gigafrost.Gigafrost(layout_name='GF1')
elif bl_config['camera'] == "Gigafrost 2":
from tomoalign.camera import gigafrost
camera = gigafrost.Gigafrost(layout_name='GF2')
else:
raise NotImplementedError("Camera not implemented yet: " \
"{}".format(bl_config['camera']))
return camera
def get_microscope(bl_config):
"""
Returns an instance of the correct microscope class according to the
current beamline configuration.
"""
if bl_config['end_station'] == "ES1":
es_prefix = "ES1"
elif bl_config['end_station'] == "ES2":
es_prefix = "ES2"
else:
raise ValueError("Unknown endstation: {}".format(
bl_config['end_station']))
if bl_config['microscope'] == 0:
# This is the standard microscope
from tomoalign.microscope import microscope1
mic_prefix = "X02DA-{}-MS1:".format(es_prefix)
microscope = microscope1.Microscope1(prefix=mic_prefix)
elif bl_config['microscope'] == 1:
# This is the 1:1 WB microscope
from tomoalign.microscope import microscope2
mic_prefix = "X02DA-{}-MS2:".format(es_prefix)
microscope = microscope2.Microscope2(prefix=mic_prefix)
elif bl_config['microscope'] == 3:
# This is the 10x/20x WB microscope
from tomoalign.microscope import microscope4
mic_prefix = "X02DA-{}-MS4:".format(es_prefix)
microscope = microscope4.Microscope4(prefix=mic_prefix)
elif bl_config['microscope'] == 5:
# This is the 4x WB macroscope
from tomoalign.microscope import microscope5
# Currently, the macroscope is only supported on ES1
#mic_prefix = "X02DA-{}-MS5:".format(es_prefix)
mic_prefix = "X02DA-ES1-MS5:"
microscope = microscope5.Microscope5(prefix=mic_prefix)
elif bl_config['microscope'] == 6:
# This is the dual head WB microscope
from tomoalign.microscope import microscope6
# Currently, the macroscope is only supported on ES1
#mic_prefix = "X02DA-{}-MS6:".format(es_prefix)
mic_prefix = "X02DA-ES1-MS6:"
microscope = microscope6.Microscope6(prefix=mic_prefix)
elif bl_config['microscope'] == 7:
# This is the high-NA 10x WB microscope
from tomoalign.microscope import microscope7
# Currently, the macroscope is only supported on ES1
#mic_prefix = "X02DA-{}-MS7:".format(es_prefix)
mic_prefix = "X02DA-ES1-MS7:"
microscope = microscope7.Microscope7(prefix=mic_prefix)
else:
raise NotImplementedError("Microscope not implemented yet: " \
"{}".format(bl_config['microscope']))
return microscope
def get_sample_trx(bl_config):
"""
Returns an instance of the correct epics Motor for the sample TRX
translation according to the current beamline configuration.
"""
if bl_config['end_station'] == "ES1":
es_prefix = "ES1"
elif bl_config['end_station'] == "ES2":
es_prefix = "ES2"
else:
raise ValueError("Unknown endstation: {}".format(
bl_config['end_station']))
sample_trx = Motor('X02DA-{}-SMP1:TRX'.format(es_prefix))
return sample_trx
def get_sample_try(bl_config):
"""
Returns an instance of the correct epics Motor for the sample TRY
translation according to the current beamline configuration.
"""
if bl_config['end_station'] == "ES1":
es_prefix = "ES1"
elif bl_config['end_station'] == "ES2":
es_prefix = "ES2"
else:
raise ValueError("Unknown endstation: {}".format(
bl_config['end_station']))
sample_try = Motor('X02DA-{}-SMP1:TRY'.format(es_prefix))
return sample_try
def get_sample_trz(bl_config):
"""
Returns an instance of the correct epics Motor for the sample TRZ
translation according to the current beamline configuration.
Note
----
End station 1 does not have a TRZ stage. A warning is printed and None is
returned as the motor.
"""
if bl_config['end_station'] == "ES1":
# ES1 does not have a TRZ stage
print("WARNING: End station 1 does not have a TRZ sample motion!")
sample_trz = None
elif bl_config['end_station'] == "ES2":
sample_trz = Motor('X02DA-ES2-SMP1:TRZ')
else:
raise ValueError("Unknown endstation: {}".format(
bl_config['end_station']))
return sample_trz
def get_sample_trxx(bl_config):
"""
Returns an instance of the correct epics Motor for the sample TRXX
translation according to the current beamline configuration.
"""
if bl_config['end_station'] == "ES1":
es_prefix = "ES1"
elif bl_config['end_station'] == "ES2":
es_prefix = "ES2"
else:
raise ValueError("Unknown endstation: {}".format(
bl_config['end_station']))
sample_trxx = Motor('X02DA-{}-SMP1:TRXX'.format(es_prefix))
return sample_trxx
def get_sample_trzz(bl_config):
"""
Returns an instance of the correct epics Motor for the sample TRZZ
translation according to the current beamline configuration.
"""
if bl_config['end_station'] == "ES1":
es_prefix = "ES1"
elif bl_config['end_station'] == "ES2":
es_prefix = "ES2"
else:
raise ValueError("Unknown endstation: {}".format(
bl_config['end_station']))
sample_trzz = Motor('X02DA-{}-SMP1:TRZZ'.format(es_prefix))
return sample_trzz
def get_sample_translation_all(bl_config):
"""
Returns instances of the correct epics Motors for all 5 sample
traslations TRX, TRY, TRZ, TRXX, TRZZ according to the current
beamline configuration.
"""
sample_trx = get_sample_trx(bl_config)
sample_try = get_sample_try(bl_config)
sample_trz = get_sample_trz(bl_config)
sample_trxx = get_sample_trxx(bl_config)
sample_trzz = get_sample_trzz(bl_config)
return sample_trx, sample_try, sample_trz, sample_trxx, sample_trzz
def get_sample_roty(bl_config):
"""
This is not implemented yet as ROTY is not a normal motor record.
This should eventually return the correct rotation positioner instance
(--> tomodaq)
For the moment, the ROTY stages are handled separately.
"""
if bl_config['end_station'] == "ES1":
mot_roty = AerotechAutomation1()
elif bl_config['end_station'] == "ES2":
mot_roty = AerotechA3200()
else:
raise ValueError("Unknown endstation: {}".format(
bl_config['end_station']))
return mot_roty
def get_writer(bl_config):
"""
Returns an writer instance according to the current beamline configuration.
"""
if bl_config['camera'].startswith("PCO."):
userID=os.geteuid()
from pco_rclient import PcoWriter
cam_config = "/sls/X02DA/applications/tomcat-operation-scripts/daq/config_files/pco_writer.json"
if bl_config['camera_server'] == "Server 1":
cam_name = "pco1"
address="tcp://10.10.1.26:8080"
elif bl_config['camera_server'] == "Server 2":
cam_name = "pco2"
address="tcp://10.10.1.202:8080"
else:
raise ValueError("Unknown camera server: " \
"{}".format(bl_config['camera_server']))
writer=PcoWriter(connection_address=address, user_id=int(userID),
cam=cam_name, config_file=cam_config)
else:
raise NotImplementedError("The camera {} does not need a writer " \
"instance".format(bl_config['camera']))
return writer

View File

@@ -1,188 +1,11 @@
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": [
"g# This is a sample Jupyter Notebook\n",
"\n",
"Below is an example of a code cell. \n",
"Put your cursor into the cell and press Shift+Enter to execute it and select the next one, or click 'Run Cell' button.\n",
"\n",
"Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings.\n",
"\n",
"To learn more about Jupyter Notebooks in PyCharm, see [help](https://www.jetbrains.com/help/pycharm/ipython-notebook-support.html).\n",
"For an overview of PyCharm, go to Help -> Learn IDE features or refer to [our documentation](https://www.jetbrains.com/help/pycharm/getting-started.html)."
],
"id": "8a77807f92f26ee"
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2025-07-10T13:20:59.460075Z",
"start_time": "2025-07-10T13:20:59.332688Z"
}
},
"cell_type": "code",
"source": [
"import sys\n",
"\n",
"# Imports\n",
"\n",
"from IPython.display import display, clear_output\n",
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"import time\n",
"import threading\n",
"import os\n",
"import glob\n",
"import numpy as np\n",
"from datetime import datetime, timedelta\n",
"\n",
"import ipywidgets as widgets\n",
"from IPython.display import display\n",
"from webcolors import names\n",
"from pathlib import Path\n",
"\n",
"def check_path(path_str):\n",
" try:\n",
" path = Path(path_str)\n",
" if not path.exists():\n",
" raise FileNotFoundError(f\"Path does not exist: {path_str}\")\n",
" print(f\"Path exists: {path_str}\")\n",
" except FileNotFoundError as e:\n",
" print(f\"Error: {e}\")\n",
"\n",
"meas_scripts_dir = r\"C:\\Users\\berti_r\\Python_Projects\\metrology\\metrology\"\n",
"check_path(meas_scripts_dir)\n",
"sys.path.append(meas_scripts_dir)\n",
"\n",
"#local includes\n",
"import metrology_functions as mf\n",
"\n",
"\n",
"# --------------------------------------------------GUI Objects-----------------------------------------------\n",
"start_button = widgets.Button(description=\"Start Measurement\")\n",
"\n",
"nr_of_cycles = widgets.BoundedIntText(\n",
" value=1,\n",
" min=1,\n",
" max=1000,\n",
" step=1,\n",
" description='Nr of cycles:',\n",
" disabled=False\n",
")\n",
"# --------------------------------------------------Output chanels-----------------------------------------------\n",
"output1 = widgets.Output()\n",
"output2 = widgets.Output()\n",
"\n",
"# --------------------------------------------------IRQs-----------------------------------------------\n",
"def start_measurement(b):\n",
" with output1:\n",
" clear_output()\n",
" print(f\"Measurement started with {global_nr_of_cycles} cycles\")\n",
" mf.run_repeatability_series(0,global_nr_of_cycles)\n",
"\n",
"# Callback function to update the global variable\n",
"def set_nr_of_cycles(change):\n",
" global global_nr_of_cycles\n",
" global_nr_of_cycles = change['new']\n",
" with output2:\n",
" clear_output()\n",
" print(f'Nr of cycles: {global_nr_of_cycles}')\n",
"\n",
"\n",
"# --------------------------------------------------Unmask IRQs-----------------------------------------------\n",
"nr_of_cycles.observe(set_nr_of_cycles, names='value')\n",
"start_button.on_click(start_measurement)\n",
"\n",
"# --------------------------------------------------Display-----------------------------------------------\n",
"\n",
"display(nr_of_cycles, output2)\n",
"display(start_button, output1)\n"
],
"id": "fbc121e30a2defb3",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Path exists: C:\\Users\\berti_r\\Python_Projects\\metrology\\metrology\n",
"Constructor for PLC\n",
"Connect to PLC\n",
"is_open()=True\n",
"get_local_address()=None\n",
"read_device_info()=('Plc30 App', <pyads.structs.AdsVersion object at 0x00000197BF61C830>)\n",
"GVL_APP.nAXIS_NUM=1\n",
"Constructor for axis\n"
]
},
{
"data": {
"text/plain": [
"BoundedIntText(value=1, description='Nr of cycles:', max=1000, min=1)"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "2afeeb32ecc846d3aeeab5193d357ca0"
}
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"Output()"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "2291ed5151aa4d10aa4f780784e61869"
}
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"Button(description='Start Measurement', style=ButtonStyle())"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "28a931d72e9644d88ddbbd2e0c91e825"
}
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"Output()"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "68db2138d0234caf8ee3065f00de3734"
}
},
"metadata": {},
"output_type": "display_data"
}
],
"execution_count": 1
},
{
"metadata": {},
"cell_type": "markdown",
"source": [
"# System description and Goals\n",
"![assambly](Images_doku/Overview.png)\n",
"![assambly](../Images_doku/Overview.png)\n",
"\n",
"## Performance criteria\n",
"### Hard\n",
@@ -206,6 +29,15 @@
"description of movment patterns and req. of expected use in beamline\n",
"\n",
"## Posible enviromental impacts and measurement methods\n",
"- Vibrations\n",
"- Temperatur changes\n",
"- Backlash\n",
"- physical shape of screwdrive\n",
"- physical shape of rail\n",
"- Motor break induced force\n",
"- Motor properties\n",
"- Humidity\n",
"- enc resolution\n",
"\n"
],
"id": "ac98fcd46a8e41f9",
@@ -218,6 +50,203 @@
}
}
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2025-07-14T15:22:58.513182Z",
"start_time": "2025-07-14T15:22:58.266333Z"
}
},
"cell_type": "code",
"source": [
"import sys\n",
"\n",
"# Imports\n",
"\n",
"from IPython.display import display, clear_output\n",
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"import time\n",
"import threading\n",
"import os\n",
"import json\n",
"import glob\n",
"import numpy as np\n",
"from datetime import datetime, timedelta\n",
"\n",
"import ipywidgets as widgets\n",
"from IPython.display import display\n",
"from pywin.debugger import close\n",
"from webcolors import names\n",
"from pathlib import Path\n",
"#TODO: Move script from frederica to scripts\n",
"def check_path(path_str):\n",
" try:\n",
" path = Path(path_str)\n",
" if not path.exists():\n",
" raise FileNotFoundError(f\"Path does not exist: {path_str}\")\n",
" print(f\"Path exists: {path_str}\")\n",
" except FileNotFoundError as e:\n",
" print(f\"Error: {e}\")\n",
"\n",
"meas_scripts_dir = r\"C:\\Users\\berti_r\\Python_Projects\\metrology\\metrology\"\n",
"meas_scripts_dir_local = r\"C:\\Users\\berti_r\\Python_Projects\\StagePerformaceDocu\\Scripts\"\n",
"config_path = r\"C:\\Users\\berti_r\\Python_Projects\\StagePerformaceDocu\\Config\\config.json\"\n",
"\n",
"#check_path(meas_scripts_dir)\n",
"check_path(meas_scripts_dir_local)\n",
"check_path(config_path)\n",
"sys.path.append(meas_scripts_dir_local)\n",
"#sys.path.append(meas_scripts_dir)\n",
"#TODO: mirror struct from jason\n",
"\n",
"#local includes\n",
"import metrology_functions as mf\n",
"\n",
"# Load config from JSON file\n",
"def load_config():\n",
" with open(config_path, 'r') as f:\n",
" return json.load(f)\n",
"\n",
"# Save updated config to JSON file\n",
"def save_config(updated_config):\n",
" with open(config_path, 'w') as f:\n",
" json.dump(updated_config, f, indent=4)\n",
"\n",
"# Get number of cycles from config\n",
"def init_nr_of_cycles():\n",
" config = load_config()\n",
" return config.get(\"Number_of_cycles\", 1)\n",
"\n",
"\n",
"\n",
"\n",
"# --------------------------------------------------GUI Objects-----------------------------------------------\n",
"start_button = widgets.Button(description=\"Start Measurement\")\n",
"\n",
"nr_of_cycles = widgets.BoundedIntText(\n",
" value=init_nr_of_cycles(),\n",
" min=1,\n",
" max=1000,\n",
" step=1,\n",
" description='Nr of cycles:',\n",
" disabled=False\n",
")\n",
"# --------------------------------------------------Output chanels-----------------------------------------------\n",
"output1 = widgets.Output()\n",
"output2 = widgets.Output()\n",
"\n",
"# --------------------------------------------------IRQs-----------------------------------------------\n",
"def start_measurement(b):\n",
" with output1:\n",
" clear_output()\n",
" local_nr_of_cycles = init_nr_of_cycles()\n",
" print(f\"Measurement started with {local_nr_of_cycles} cycles\")\n",
" mf.run_repeatability_series(0,local_nr_of_cycles) # TODO: <----- real measurement start (SFM with semaphore and queue?)\n",
" #Todo: Start temperatur Logger\n",
"\n",
"\n",
"\n",
"# Set number of cycles and save to config\n",
"def set_nr_of_cycles(change):\n",
" new_cycles = change['new']\n",
" config = load_config()\n",
" config['Number_of_cycles'] = new_cycles\n",
" save_config(config)\n",
"\n",
" with output2:\n",
" clear_output()\n",
" print(f'Number of cycles set to: {new_cycles}')\n",
"\n",
"\n",
"# --------------------------------------------------Unmask IRQs-----------------------------------------------\n",
"nr_of_cycles.observe(set_nr_of_cycles, names='value')\n",
"start_button.on_click(start_measurement)\n",
"\n",
"# --------------------------------------------------Display-----------------------------------------------\n",
"\n",
"display(nr_of_cycles, output2)\n",
"display(start_button, output1)\n"
],
"id": "fbc121e30a2defb3",
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Path exists: C:\\Users\\berti_r\\Python_Projects\\metrology\\metrology\n",
"Path exists: C:\\Users\\berti_r\\Python_Projects\\StagePerformaceDocu\\Scripts\n",
"Path exists: C:\\Users\\berti_r\\Python_Projects\\StagePerformaceDocu\\Config\\config.json\n",
"Path exists: C:\\Users\\berti_r\\Python_Projects\\templates\\motion_libs\n",
"Constructor for PLC\n",
"Connect to PLC\n",
"is_open()=True\n",
"get_local_address()=None\n",
"read_device_info()=('Plc30 App', <pyads.structs.AdsVersion object at 0x000002A3CAE58C20>)\n",
"GVL_APP.nAXIS_NUM=3\n",
"Constructor for axis\n"
]
},
{
"data": {
"text/plain": [
"BoundedIntText(value=3, description='Nr of cycles:', max=1000, min=1)"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "1e72fea404844af69b9884b31d9fe2d1"
}
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"Output()"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "7459f5fd35314ce99fce8129bda3e8e9"
}
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"Button(description='Start Measurement', style=ButtonStyle())"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "40b270d1d2814cfe8d6d876a78591c0f"
}
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"Output()"
],
"application/vnd.jupyter.widget-view+json": {
"version_major": 2,
"version_minor": 0,
"model_id": "44db6c1f5acf4cb4bf04a8d3fabb1c34"
}
},
"metadata": {},
"output_type": "display_data"
}
],
"execution_count": 1
},
{
"metadata": {},
"cell_type": "code",
@@ -299,6 +328,18 @@
"id": "2d155545665095ec",
"outputs": [],
"execution_count": null
},
{
"metadata": {},
"cell_type": "code",
"source": [
"#Data loger thread\n",
"import paho-mqtt.client as mqtt\n",
"import"
],
"id": "9813d493bd439789",
"outputs": [],
"execution_count": null
}
],
"metadata": {