185 lines
5.7 KiB
Python
185 lines
5.7 KiB
Python
import time
|
|
from collections import defaultdict, deque
|
|
from functools import partial
|
|
from logging import getLogger
|
|
from threading import Thread
|
|
|
|
import epics
|
|
import numpy as np
|
|
from cam_server.utils import create_thread_pvs, epics_lock
|
|
|
|
_logger = getLogger(__name__)
|
|
|
|
initialized = False
|
|
intensity_pv, xpos_pv, ypos_pv = None, None, None
|
|
sent_pid = -1
|
|
|
|
dif_vals = defaultdict(int)
|
|
|
|
# this is to avoid exceptions in the 'process' function upon appending to buffers if not all of
|
|
# them were created in the 'initialize' function
|
|
buffers = defaultdict(partial(deque, maxlen=1))
|
|
|
|
|
|
def initialize(params):
|
|
global initialized
|
|
|
|
epics.ca.clear_cache()
|
|
|
|
for label in ("xpos_all", "ypos_all", "xpos_odd", "ypos_odd", "xpos_evn", "ypos_evn"):
|
|
x_pvname = params[f"{label}_x_pvname"]
|
|
y_pvname = params[f"{label}_y_pvname"]
|
|
m_pvname = params[f"{label}_m_pvname"]
|
|
w_pvname = params[f"{label}_w_pvname"]
|
|
|
|
if x_pvname and y_pvname and m_pvname and w_pvname:
|
|
buffer = deque(maxlen=params["queue_length"])
|
|
buffers[label] = buffer
|
|
|
|
thread = Thread(target=update_PVs, args=(label, buffer, x_pvname, y_pvname, m_pvname, w_pvname))
|
|
thread.start()
|
|
|
|
# diff PVs
|
|
xpos_dif_m_pvname = params["xpos_dif_m_pvname"]
|
|
xpos_dif_w_pvname = params["xpos_dif_w_pvname"]
|
|
ypos_dif_m_pvname = params["ypos_dif_m_pvname"]
|
|
ypos_dif_w_pvname = params["ypos_dif_w_pvname"]
|
|
|
|
thread = Thread(target=update_dif_PVs, args=(xpos_dif_m_pvname, xpos_dif_w_pvname, ypos_dif_m_pvname, ypos_dif_w_pvname))
|
|
thread.start()
|
|
|
|
initialized = True
|
|
|
|
|
|
def update_PVs(label, buffer, x_pvname, y_pvname, m_pvname, w_pvname):
|
|
x_pv, y_pv, m_pv, w_pv = create_thread_pvs([x_pvname, y_pvname, m_pvname, w_pvname])
|
|
|
|
x_pv.wait_for_connection()
|
|
y_pv.wait_for_connection()
|
|
m_pv.wait_for_connection()
|
|
w_pv.wait_for_connection()
|
|
if not (x_pv.connected and y_pv.connected and m_pv.connected and w_pv.connected):
|
|
raise (f"Cannot connect to {label} PVs.")
|
|
|
|
x_pv.put(np.arange(buffer.maxlen))
|
|
y_pv.put(np.zeros(buffer.maxlen))
|
|
m_pv.put(0)
|
|
w_pv.put(0)
|
|
|
|
while True:
|
|
time.sleep(3)
|
|
if len(buffer) != buffer.maxlen:
|
|
continue
|
|
|
|
_buffer = np.array(buffer)
|
|
_buffer = _buffer[~np.isnan(_buffer)]
|
|
|
|
# histogram
|
|
y_hist, x_hist = np.histogram(_buffer, bins=101)
|
|
x_hist = (x_hist[1:] + x_hist[:-1]) / 2
|
|
|
|
x_pv.put(x_hist)
|
|
y_pv.put(y_hist)
|
|
|
|
# stats
|
|
mean_val = np.mean(_buffer)
|
|
std_val = np.std(_buffer)
|
|
|
|
m_pv.put(mean_val)
|
|
w_pv.put(std_val)
|
|
|
|
dif_vals[f"{label}_m"] = mean_val
|
|
dif_vals[f"{label}_w"] = std_val
|
|
|
|
|
|
def update_dif_PVs(xpos_dif_m_pvname, xpos_dif_w_pvname, ypos_dif_m_pvname, ypos_dif_w_pvname):
|
|
xpos_dif_m_pv, xpos_dif_w_pv, ypos_dif_m_pv, ypos_dif_w_pv = create_thread_pvs(
|
|
[xpos_dif_m_pvname, xpos_dif_w_pvname, ypos_dif_m_pvname, ypos_dif_w_pvname]
|
|
)
|
|
|
|
xpos_dif_m_pv.wait_for_connection()
|
|
xpos_dif_w_pv.wait_for_connection()
|
|
ypos_dif_m_pv.wait_for_connection()
|
|
ypos_dif_w_pv.wait_for_connection()
|
|
if not (xpos_dif_m_pv.connected and xpos_dif_w_pv.connected and ypos_dif_m_pv.connected and ypos_dif_w_pv.connected):
|
|
raise (f"Cannot connect to dif PVs.")
|
|
|
|
while True:
|
|
time.sleep(3)
|
|
xpos_dif_m_pv.put(dif_vals["xpos_odd_m"] - dif_vals["xpos_evn_m"])
|
|
xpos_dif_w_pv.put(dif_vals["xpos_odd_w"] - dif_vals["xpos_evn_w"])
|
|
ypos_dif_m_pv.put(dif_vals["ypos_odd_m"] - dif_vals["ypos_evn_m"])
|
|
ypos_dif_w_pv.put(dif_vals["ypos_odd_w"] - dif_vals["ypos_evn_w"])
|
|
|
|
|
|
def process(data, pulse_id, timestamp, params):
|
|
try:
|
|
global sent_pid
|
|
global intensity_pv, xpos_pv, ypos_pv
|
|
# Initialize on first run
|
|
if not initialized:
|
|
initialize(params)
|
|
|
|
# Read stream inputs
|
|
up = data[params["up"]] * params["up_calib"]
|
|
down = data[params["down"]] * params["down_calib"]
|
|
right = data[params["right"]] * params["right_calib"]
|
|
left = data[params["left"]] * params["left_calib"]
|
|
|
|
# Calculations
|
|
try:
|
|
intensity = down + up + left + right
|
|
intensity_uJ = intensity * params["uJ_calib"]
|
|
except:
|
|
intensity = np.nan
|
|
intensity_uJ = np.nan
|
|
|
|
if intensity > params["threshold"]:
|
|
xpos = ((right - left) / (right + left)) * params["horiz_calib"]
|
|
ypos = ((up - down) / (up + down)) * params["vert_calib"]
|
|
else:
|
|
xpos = 1.5
|
|
ypos = 1.5
|
|
|
|
# Update buffers
|
|
buffers["xpos_all"].append(xpos)
|
|
buffers["ypos_all"].append(ypos)
|
|
if pulse_id % 2:
|
|
buffers["xpos_odd"].append(xpos)
|
|
buffers["ypos_odd"].append(ypos)
|
|
else:
|
|
buffers["xpos_evn"].append(xpos)
|
|
buffers["ypos_evn"].append(ypos)
|
|
|
|
device, _ = params["up"].split(":", 1)
|
|
intensity_ch_name = device + ":INTENSITY"
|
|
xpos_ch_name = device + ":XPOS"
|
|
ypos_ch_name = device + ":YPOS"
|
|
|
|
# Set bs outputs
|
|
output = {}
|
|
output[intensity_ch_name] = intensity
|
|
output[f"{device}:INTENSITY_UJ"] = intensity_uJ
|
|
output[xpos_ch_name] = xpos
|
|
output[ypos_ch_name] = ypos
|
|
|
|
intensity_pv, xpos_pv, ypos_pv = create_thread_pvs([intensity_ch_name, xpos_ch_name, ypos_ch_name])
|
|
if epics_lock.acquire(False):
|
|
try:
|
|
if pulse_id > sent_pid:
|
|
sent_pid = pulse_id
|
|
if intensity_pv and intensity_pv.connected:
|
|
intensity_pv.put(output[intensity_ch_name])
|
|
|
|
if xpos_pv and xpos_pv.connected:
|
|
xpos_pv.put(output[xpos_ch_name])
|
|
|
|
if ypos_pv and ypos_pv.connected:
|
|
ypos_pv.put(output[ypos_ch_name])
|
|
|
|
finally:
|
|
epics_lock.release()
|
|
|
|
return output
|
|
except Exception as e:
|
|
_logger.exception(e) |