#!/usr/bin/env python # *-----------------------------------------------------------------------* # | | # | Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) | # | thanks to Xiaoqiang's great pcaspy channel access server | # | Author Thierry Zamofing (thierry.zamofing@psi.ch) | # *-----------------------------------------------------------------------* # https://pcaspy.readthedocs.io/en/latest/api.html #to be able to import manager, #modify line in /home/zamofing_t/.local/lib/python3.8/site-packages/pcaspy/__init__.py # -> from .driver import Driver, SimpleServer, PVInfo, SimplePV, manager import logging _log = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG,format='%(name)s:%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ') from pcaspy import SimpleServer, Driver, cas, PVInfo, SimplePV, manager import random, os import numpy as np os.environ['EPICS_CA_ADDR_LIST']='localhost' prefix = 'MTEST:' pvdb = { 'RAND' : { 'prec' : 3, # 'scan' : 1, # 'count': 10, }, } pvDef={ 'prec' : 3} pvSmaract={ 'DRIVE' : pvDef, 'FRM_BACK' : pvDef, 'FRM_FORW' : pvDef, 'GET_HOMED' : pvDef, 'HLM' : pvDef, 'LLM' : pvDef, 'MOTRBV' : pvDef, 'NAME' : pvDef, 'STATUS' : pvDef, 'TWF' : pvDef, 'TWR' : pvDef, 'TWV' : pvDef, } pvMotor={ '': {}, '.VAL' : {}, '.RBV' : {}, '.RTYP' : {'type':'string', 'value':'motor' }, '.JVEL' : {}, '.HLS' : {}, '.LLS' : {}, '.TWV' : {}, '.RBV' : {}, '.LVIO' : {}, '.HLM' : {}, '.LLM' : {}, '.PREC' : {}, '.EGU' : {}, } pvdbSwissMx={ 'SAR-EXPMX-FETURA:':{ 'POS_RB': pvDef, 'POS_SP': pvDef, }, 'SAR-EXPMX:':{ 'CAMERA' : pvDef, 'FPICTURE' : { 'scan' : 0, 'type':'short'}, 'HEIGHT' : pvDef, 'WIDTH' : pvDef, }, 'SAR-EXPMX:MOT_BLGT':pvMotor, 'SAR-EXPMX:MOT_CRYO':pvMotor, 'SAR-EXPMX:MOT_CX':pvMotor, 'SAR-EXPMX:MOT_CZ':pvMotor, 'SAR-EXPMX:MOT_FX':pvMotor, 'SAR-EXPMX:MOT_FY':pvMotor, 'SAR-EXPMX:MOT_ROT_Y':pvMotor, 'SARES30-CAMS156-SMX-OAV:': { 'ACQMODE': pvDef, 'CAMERA': pvDef, 'EXPOSURE': pvDef, 'FPICTURE': pvDef, 'HEIGHT': pvDef, 'WIDTH': pvDef, 'CAMERASTATUS': { 'type':'short','value' : 2}, }, 'SARES30-ESBMX1:': pvSmaract, 'SARES30-ESBMX2:': pvSmaract, 'SARES30-ESBMX3:': pvSmaract, 'SARES30-ESBMX4:': pvSmaract, 'SARES30-ESBMX5:': pvSmaract, 'SARES30-ESBMX6:': pvSmaract, 'SARES30-ESBMX7:': pvSmaract, 'SARES30-ESBMX8:': pvSmaract, 'SARES30-ESBMX9:': pvSmaract, 'SARES30-ESBMX10:': pvSmaract, 'SARES30-ESBMX11:': pvSmaract, 'SARES30-ESBMX12:': pvSmaract, 'SARES30-ESBMX13:': pvSmaract, 'SARES30-ESBMX14:': pvSmaract, 'SARES30-ESBMX15:': pvSmaract, 'SARES30-ESBMX16:': pvSmaract, } class MyServer(SimpleServer): #same as SimpleServer but changing reason for unique assignement @staticmethod def createPV(prefix, pvdb): for basename, pvinfo in pvdb.items(): pvinfo=PVInfo(pvinfo) #pvinfo.reason=basename pvinfo.reason=prefix+basename # !!! TZA changes !!! pvinfo.name=prefix+basename pv=SimplePV(pvinfo.name, pvinfo) manager.pvf[pvinfo.name]=pv if pvinfo.port not in manager.pvs: manager.pvs[pvinfo.port]={} #manager.pvs[pvinfo.port][basename]=pv manager.pvs[pvinfo.port][prefix+basename]=pv # !!! TZA changes !!! class myDriver(Driver): def __init__(self): super(myDriver, self).__init__() def read(self, reason): print(reason) #print(self.getParamInfo(reason)) if reason == 'RAND': value = [random.random() for i in range(10)] elif reason.endswith('FPICTURE'): try: cam=self.cam except AttributeError: self.cam=cam=SimCam() cam.sim_gen() self.setParam('SARES30-CAMS156-SMX-OAV:HEIGHT',cam._imgSeq.shape[1]) self.setParam('SARES30-CAMS156-SMX-OAV:WIDTH',cam._imgSeq.shape[2]) value = self.cam.get_image() else: value = self.getParam(reason) return value def setParam(self, reason, value): # tza copied from base class # make a copy of mutable objects, list, numpy.ndarray if isinstance(value, list): value = value[:] elif 'numpy.ndarray' in str(type(value)): value = value.copy() # check whether value update is needed pv = manager.pvs[self.port][reason] #self.pvDB[reason].mask |= pv.info.checkValue(value) ##tza self.pvDB[reason].value = value self.pvDB[reason].time = cas.epicsTimeStamp() if self.pvDB[reason].mask: self.pvDB[reason].flag = True # check whether alarm/severity update is needed alarm, severity = pv.info.checkAlarm(value) self.setParamStatus(reason, alarm, severity) logging.getLogger('pcaspy.Driver.setParam').debug('%s: %s', reason, self.pvDB[reason]) class SimCam: def __init__(self): pass @staticmethod def set_fiducial(pic,val): # fiducial test f=np.array(((0, 0, 0, 0, 0), (0, 1, 1, 1, 0), (0, 1, 0, 0, 0), (0, 1, 1, 0, 0), (0, 1, 0, 0, 0), (0, 0, 0, 0, 0),), pic.dtype) pic[0:6, 0:5]=f*pic.max() def sim_gen(self,sz=(1500,1000),t=100,mode=0): 'generate simulation data' if mode==0: _log.info('generate {} pulsing wases simulation images, mode:{}...'.format(t,mode)) w,h=sz imgSeq=np.ndarray(shape=(t,h,w),dtype=np.uint16) x = np.linspace(-5, 5, w) y = np.linspace(-5, 5, h) # full coordinate arrays xx, yy = np.meshgrid(x, y) for i in range(t): #imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin(yy+.01*i)**2)#+xx*t+yy*t) #imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin((1+.1*np.sin(.2*i))*yy+.001*i**2)**2)#+xx*t+yy*t) #imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+2*np.sin(i/t*2*np.pi))**2 + np.sin(yy)**2) px=2*np.sin(i/t*2*np.pi) fx=1 py=2*np.sin(i/t*2*np.pi) fy=1+.3*np.sin(i/t*2*np.pi*2) imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx*fx+px)**2 + np.sin(yy*fy+py)**2) #np.random.bytes(100) wr=w//4 hr=h//4 imgSeq[:,0:hr,0:wr]+=np.random.randint(0,100,(t,hr,wr),dtype=np.uint16) elif mode==1: import glob,PIL.Image path='/home/zamofing_t/Documents/prj/SwissFEL/epics_ioc_modules/ESB_MX/python/SwissMX/simCamImg/*.png' _log.info('generate simulation images:{}...'.format(path)) glb=glob.glob(path) img = PIL.Image.open(glb[0]) sz=img.size # (w,h) imgSeq=np.ndarray(shape=(len(glb),sz[1],sz[0]),dtype=np.uint8) # shape is (n,h,w) for i,fn in enumerate(glb): img=PIL.Image.open(fn) assert(img.mode=='L') # 8 bit grayscale assert(sz==img.size) imgSeq[i,:,:]=np.array(img.getdata()).reshape(sz[::-1]) pic=imgSeq[i] SimCam.set_fiducial(pic, 255) self._imgSeq=imgSeq self._imgIdx=0 _log.info('done-> shape:{} dtype:{}'.format(imgSeq.shape,imgSeq.dtype)) def get_image(self): imgSeq=self._imgSeq idx=self._imgIdx self._imgIdx=(idx+1)%imgSeq.shape[0] # _log.debug('simulated idx:{}'.format(idx)) pic=imgSeq[idx] return pic if __name__ == '__main__': #server = SimpleServer() server = MyServer() #server.setDebugLevel(4) server.createPV(prefix, pvdb) #driver = myDriver() for pfx,pvdb in pvdbSwissMx.items(): #print(pfx,pvdb) print(pfx,end='') for k in pvdb.keys(): print(' ',k,end='') print('') server.createPV(pfx, pvdb) print('Run server...') driver = myDriver() # process CA transactions while True: server.process(0.1)