from cam_server.pipeline.data_processing import processor from cam_server.utils import create_thread_pvs, epics_lock from logging import getLogger import math _logger = getLogger(__name__) #Relevant parameters # max_frame_rate #max processing frame rate (EPICS) # max_output_rate #max displaying frame rate (ScreenPanel) # single_optics #default: False # PSF #defauit 2.91um # throw_epics_errors #in case of permanent pipelines can recreate EPICS channels after failure, but does stream #Output PVs x_pos_pv, y_pos_pv, x_sig_pv, y_sig_pv = None, None, None, None x_amp_pv, y_amp_pv, x_int_pv, y_int_pv = None, None, None, None #Channel names x_pos_name, y_pos_name, x_sig_name, y_sig_name = None, None, None, None x_amp_name, y_amp_name, x_int_name, y_int_name = None, None, None, None sent_pid = -1 initialized = False run_once = False GAUSS_INT_FACTOR = math.sqrt(2 * math.pi) def initialize(parameters): global initialized global x_pos_name, y_pos_name, x_sig_name, y_sig_name, x_amp_name, y_amp_name, x_int_name, y_int_name prefix = parameters["camera_name"] x_pos_name = prefix + ":POS-X" y_pos_name = prefix + ":POS-Y" x_sig_name = prefix + ":SIG-X" y_sig_name = prefix + ":SIG-Y" x_amp_name = prefix + ":AMPL-X" y_amp_name = prefix + ":AMPL-Y" x_int_name = prefix + ":INTEG-X" y_int_name = prefix + ":INTEG-Y" initialized = True def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata): global initialized, sent_pid, run_once global x_pos_pv, y_pos_pv, x_sig_pv, y_sig_pv, x_amp_pv, y_amp_pv, x_int_pv, y_int_pv global x_pos_name, y_pos_name, x_sig_name, y_sig_name, x_amp_name, y_amp_name, x_int_name, y_int_name if not initialized: initialize(parameters) initialized = True throw_epics_errors = parameters.get("throw_epics_errors", True) single_optics = parameters.get("single_optics", False) PSF = parameters.get("PSF", 2.91) # um good_region = parameters["image_good_region"] detached = parameters.get("detached_instance", False) ret = processor.process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata) if good_region: x_pos_key, y_pos_key = "gr_x_fit_mean", "gr_y_fit_mean" x_sig_key, y_sig_key = "gr_x_fit_standard_deviation", "gr_y_fit_standard_deviation" x_amp_key, y_amp_key = "gr_x_fit_amplitude", "gr_y_fit_amplitude" else: x_pos_key, y_pos_key = "x_fit_mean", "y_fit_mean" x_sig_key, y_sig_key = "x_fit_standard_deviation", "y_fit_standard_deviation" x_amp_key, y_amp_key = "x_fit_amplitude", "y_fit_amplitude" x_pos, y_pos = ret[x_pos_key], ret[y_pos_key] x_sig, y_sig = ret[x_sig_key], ret[y_sig_key] x_amp, y_amp = ret[x_amp_key], ret[y_amp_key] x_int, y_int = x_amp * x_sig * GAUSS_INT_FACTOR, y_amp * y_sig * GAUSS_INT_FACTOR if single_optics: x_sig, y_sig = math.hypot(x_sig, PSF), math.hypot(y_sig, PSF) ret[x_sig_key], ret[y_sig_key] = x_sig, y_sig x_pos_pv, y_pos_pv, x_sig_pv, y_sig_pv, x_amp_pv, y_amp_pv, x_int_pv, y_int_pv = create_thread_pvs( [x_pos_name, y_pos_name, x_sig_name, y_sig_name, x_amp_name, y_amp_name, x_int_name, y_int_name]) if not detached: #Only write to EPICS in the "main" instance epics_error = None if pulse_id > sent_pid: sent_pid = pulse_id if epics_lock.acquire(False): try: for (pv, value) in [(x_pos_pv, x_pos), (y_pos_pv, y_pos), (x_sig_pv, x_sig), (y_sig_pv, y_sig), (x_amp_pv, x_amp), (y_amp_pv, y_amp), (x_int_pv, x_int), (y_int_pv, y_int)]: if pv and pv.connected: pv.put(value) else: if not run_once: _logger.warning("PV not connected: %s" % (str(pv),)) except Exception as e: epics_error = "Error writing PVs: %s" % (str(e),) finally: epics_lock.release() else: _logger.warning("Cannot aquire EPICS lock") if epics_error: _logger.warning(epics_error) if throw_epics_errors: raise Exception(epics_error) run_once = True return ret else: _logger.warning("Invalid PID: %s last:%s" % (str(pulse_id), str(sent_pid),)) sent_pid = 0 # It should be single threaded, assumes the camera instance restarted