from logging import getLogger import json import threading import epics import sys _logger = getLogger(__name__) CHANNEL_NAME = "SARFE10-PSSS059:SPECTRUM_CENTER" pv = None initialized = False #epics_lock=threading.RLock() def create_pv(name, **args): if True: #with epics_lock: if epics.ca.current_context() is None: try: if epics.ca.initial_context is None: _logger.info("Creating initial EPICS context for pid:" + str(os.getpid()) + " thread: " + str(threading.get_ident())) epics.ca.initialize_libca() else: # TODO: using epics.ca.use_initial_context() generates a segmentation fault #_logger.info("Using initial EPICS context for pid:" + str(os.getpid()) + " thread: " + str(threading.get_ident())) #epics.ca.use_initial_context() #_logger.info("Creating EPICS context for pid:" + str(os.getpid()) + " thread: " + str(threading.get_ident())) epics.ca.create_context() except: _logger.warning("Error creating PV context: " + str(sys.exc_info()[1])) return epics.PV(name, **args) def initialize(parameters): global pv _logger.info("Init EPICS") epics.ca.clear_cache() #pv = epics.PV(CHANNEL_NAME) pv=create_pv(CHANNEL_NAME) pv.wait_for_connection() _thread_pvs=None _thread_pvs_lock=threading.RLock() def create_thread_pv(pv_name, wait=True): global _thread_pvs, _thread_pvs_lock with _thread_pvs_lock: global _thread_pvs #,epics_lock if _thread_pvs is None: _thread_pvs={} epics.ca.clear_cache() if threading.get_ident() not in _thread_pvs.keys(): _thread_pvs[threading.get_ident()] = {} if pv_name in _thread_pvs[threading.get_ident()].keys(): return _thread_pvs[threading.get_ident()][pv_name] pv=create_pv(pv_name) _thread_pvs[threading.get_ident()][pv_name] = pv if wait: pv.wait_for_connection() return pv def create_thread_pvs(pv_names): ret=[] for name in pv_names: ret.append(create_thread_pv(name, False)) for pv in ret: pv.wait_for_connection() return ret def get_thread_pv(name): global _thread_pvs, _thread_pvs_lock with _thread_pvs_lock: if _thread_pvs is not None: if threading.get_ident() in _thread_pvs.keys(): ret = _thread_pvs[threading.get_ident()].get(name) return ret def process_image(image, pulse_id, timestamp, x_axis, y_axis, parameters, bsdata=None): """ global initialized if True: #with epics_lock: if threading.get_ident() not in _thread_pvs.keys(): #_thread_init.append( threading.get_ident()) epics.ca.create_context() pv=epics.PV(CHANNEL_NAME) pv.wait_for_connection() _thread_pvs[threading.get_ident()] = pv #if not initialized: # initialize(parameters) # initialized = True """ import os import signal os.killpg(os.getpgid(os.getpid()), signal.SIGTERM) #os.kill(os.getpid(), signal.SIGTERM) [pv]=create_thread_pvs([CHANNEL_NAME]) # Add return values return_value = dict() # Add return values return_value["x_axis"] = x_axis return_value["y_axis"] = y_axis return_value["image"] = image return_value["width"] = image.shape[1] return_value["height"] = image.shape[0] return_value["timestamp"] = timestamp #pv = _thread_pvs[threading.get_ident()] #pv = get_thread_pv(CHANNEL_NAME) if True: #with epics_lock: if pv and pv.connected: return_value["value"] = float(pv.value) else: return_value["value"] = float(-100.0) #pv.put(return_value["value"], timeout=0) #_logger.info (str(threading.get_ident()) + " - " + str(return_value["value"])) return_value["processing_parameters"] = json.dumps(parameters) return return_value