towards epicsSwissmxSim.py ...

This commit is contained in:
2022-07-22 18:20:28 +02:00
parent 8d1357e55f
commit d2c3bff1c7
4 changed files with 276 additions and 3 deletions

View File

@@ -118,11 +118,9 @@ class epics_cam(object):
self.pic=pic=imgSeq[idx] self.pic=pic=imgSeq[idx]
return pic return pic
try: try:
pv_pic=self.getPv('FPICTURE')
sz=self._sz sz=self._sz
pic = pv_pic.get(count=sz[0]*sz[1], as_numpy=True).reshape(sz[::-1]) pic = pv_pic.get(count=sz[0]*sz[1], as_numpy=True).reshape(sz[::-1])
epics_cam.set_fiducial(pic,255) epics_cam.set_fiducial(pic,255)
except AttributeError as e: except AttributeError as e:
_log.warning("failed to fetch image") _log.warning("failed to fetch image")
else: else:

263
epicsSwissmxSim.py Executable file
View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) |
# | thanks to Xiaoqiang's great pcaspy channel access server |
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
# *-----------------------------------------------------------------------*
# https://pcaspy.readthedocs.io/en/latest/api.html
#to be able to import manager,
#modify line in /home/zamofing_t/.local/lib/python3.8/site-packages/pcaspy/__init__.py
# -> from .driver import Driver, SimpleServer, PVInfo, SimplePV, manager
import logging
_log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,format='%(name)s:%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
from pcaspy import SimpleServer, Driver, cas, PVInfo, SimplePV, manager
import random, os
import numpy as np
os.environ['EPICS_CA_ADDR_LIST']='localhost'
prefix = 'MTEST:'
pvdb = {
'RAND' : {
'prec' : 3,
# 'scan' : 1,
# 'count': 10,
},
}
pvDef={ 'prec' : 3}
pvSmaract={
'DRIVE' : pvDef,
'FRM_BACK' : pvDef,
'FRM_FORW' : pvDef,
'GET_HOMED' : pvDef,
'HLM' : pvDef,
'LLM' : pvDef,
'MOTRBV' : pvDef,
'NAME' : pvDef,
'STATUS' : pvDef,
'TWF' : pvDef,
'TWR' : pvDef,
'TWV' : pvDef,
}
pvMotor={
'': {},
'.VAL' : {},
'.RBV' : {},
'.RTYP' : {'type':'string', 'value':'motor' },
'.JVEL' : {},
'.HLS' : {},
'.LLS' : {},
'.TWV' : {},
'.RBV' : {},
'.LVIO' : {},
'.HLM' : {},
'.LLM' : {},
'.PREC' : {},
'.EGU' : {},
}
pvdbSwissMx={
'SAR-EXPMX-FETURA:':{
'POS_RB': pvDef,
'POS_SP': pvDef,
},
'SAR-EXPMX:':{
'CAMERA' : pvDef,
'FPICTURE' : { 'scan' : 0, 'type':'short'},
'HEIGHT' : pvDef,
'WIDTH' : pvDef,
},
'SAR-EXPMX:MOT_BLGT':pvMotor,
'SAR-EXPMX:MOT_CRYO':pvMotor,
'SAR-EXPMX:MOT_CX':pvMotor,
'SAR-EXPMX:MOT_CZ':pvMotor,
'SAR-EXPMX:MOT_FX':pvMotor,
'SAR-EXPMX:MOT_FY':pvMotor,
'SAR-EXPMX:MOT_ROT_Y':pvMotor,
'SARES30-CAMS156-SMX-OAV:': {
'ACQMODE': pvDef,
'CAMERA': pvDef,
'EXPOSURE': pvDef,
'FPICTURE': pvDef,
'HEIGHT': pvDef,
'WIDTH': pvDef,
'CAMERASTATUS': { 'type':'short','value' : 2},
},
'SARES30-ESBMX1:': pvSmaract,
'SARES30-ESBMX2:': pvSmaract,
'SARES30-ESBMX3:': pvSmaract,
'SARES30-ESBMX4:': pvSmaract,
'SARES30-ESBMX5:': pvSmaract,
'SARES30-ESBMX6:': pvSmaract,
'SARES30-ESBMX7:': pvSmaract,
'SARES30-ESBMX8:': pvSmaract,
'SARES30-ESBMX9:': pvSmaract,
'SARES30-ESBMX10:': pvSmaract,
'SARES30-ESBMX11:': pvSmaract,
'SARES30-ESBMX12:': pvSmaract,
'SARES30-ESBMX13:': pvSmaract,
'SARES30-ESBMX14:': pvSmaract,
'SARES30-ESBMX15:': pvSmaract,
'SARES30-ESBMX16:': pvSmaract,
}
class MyServer(SimpleServer): #same as SimpleServer but changing reason for unique assignement
@staticmethod
def createPV(prefix, pvdb):
for basename, pvinfo in pvdb.items():
pvinfo=PVInfo(pvinfo)
#pvinfo.reason=basename
pvinfo.reason=prefix+basename # !!! TZA changes !!!
pvinfo.name=prefix+basename
pv=SimplePV(pvinfo.name, pvinfo)
manager.pvf[pvinfo.name]=pv
if pvinfo.port not in manager.pvs:
manager.pvs[pvinfo.port]={}
#manager.pvs[pvinfo.port][basename]=pv
manager.pvs[pvinfo.port][prefix+basename]=pv # !!! TZA changes !!!
class myDriver(Driver):
def __init__(self):
super(myDriver, self).__init__()
def read(self, reason):
print(reason)
#print(self.getParamInfo(reason))
if reason == 'RAND':
value = [random.random() for i in range(10)]
elif reason.endswith('FPICTURE'):
try:
cam=self.cam
except AttributeError:
self.cam=cam=SimCam()
cam.sim_gen()
self.setParam('SARES30-CAMS156-SMX-OAV:HEIGHT',cam._imgSeq.shape[1])
self.setParam('SARES30-CAMS156-SMX-OAV:WIDTH',cam._imgSeq.shape[2])
value = self.cam.get_image()
else:
value = self.getParam(reason)
return value
def setParam(self, reason, value): # tza copied from base class
# make a copy of mutable objects, list, numpy.ndarray
if isinstance(value, list):
value = value[:]
elif 'numpy.ndarray' in str(type(value)):
value = value.copy()
# check whether value update is needed
pv = manager.pvs[self.port][reason]
#self.pvDB[reason].mask |= pv.info.checkValue(value) ##tza
self.pvDB[reason].value = value
self.pvDB[reason].time = cas.epicsTimeStamp()
if self.pvDB[reason].mask:
self.pvDB[reason].flag = True
# check whether alarm/severity update is needed
alarm, severity = pv.info.checkAlarm(value)
self.setParamStatus(reason, alarm, severity)
logging.getLogger('pcaspy.Driver.setParam').debug('%s: %s', reason, self.pvDB[reason])
class SimCam:
def __init__(self):
pass
@staticmethod
def set_fiducial(pic,val):
# fiducial test
f=np.array(((0, 0, 0, 0, 0),
(0, 1, 1, 1, 0),
(0, 1, 0, 0, 0),
(0, 1, 1, 0, 0),
(0, 1, 0, 0, 0),
(0, 0, 0, 0, 0),), pic.dtype)
pic[0:6, 0:5]=f*pic.max()
def sim_gen(self,sz=(1500,1000),t=100,mode=0):
'generate simulation data'
if mode==0:
_log.info('generate {} pulsing wases simulation images, mode:{}...'.format(t,mode))
w,h=sz
imgSeq=np.ndarray(shape=(t,h,w),dtype=np.uint16)
x = np.linspace(-5, 5, w)
y = np.linspace(-5, 5, h)
# full coordinate arrays
xx, yy = np.meshgrid(x, y)
for i in range(t):
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin(yy+.01*i)**2)#+xx*t+yy*t)
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin((1+.1*np.sin(.2*i))*yy+.001*i**2)**2)#+xx*t+yy*t)
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+2*np.sin(i/t*2*np.pi))**2 + np.sin(yy)**2)
px=2*np.sin(i/t*2*np.pi)
fx=1
py=2*np.sin(i/t*2*np.pi)
fy=1+.3*np.sin(i/t*2*np.pi*2)
imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx*fx+px)**2 + np.sin(yy*fy+py)**2)
#np.random.bytes(100)
wr=w//4
hr=h//4
imgSeq[:,0:hr,0:wr]+=np.random.randint(0,100,(t,hr,wr),dtype=np.uint16)
elif mode==1:
import glob,PIL.Image
path='/home/zamofing_t/Documents/prj/SwissFEL/epics_ioc_modules/ESB_MX/python/SwissMX/simCamImg/*.png'
_log.info('generate simulation images:{}...'.format(path))
glb=glob.glob(path)
img = PIL.Image.open(glb[0])
sz=img.size # (w,h)
imgSeq=np.ndarray(shape=(len(glb),sz[1],sz[0]),dtype=np.uint8) # shape is (n,h,w)
for i,fn in enumerate(glb):
img=PIL.Image.open(fn)
assert(img.mode=='L') # 8 bit grayscale
assert(sz==img.size)
imgSeq[i,:,:]=np.array(img.getdata()).reshape(sz[::-1])
pic=imgSeq[i]
SimCam.set_fiducial(pic, 255)
self._imgSeq=imgSeq
self._imgIdx=0
_log.info('done-> shape:{} dtype:{}'.format(imgSeq.shape,imgSeq.dtype))
def get_image(self):
imgSeq=self._imgSeq
idx=self._imgIdx
self._imgIdx=(idx+1)%imgSeq.shape[0]
# _log.debug('simulated idx:{}'.format(idx))
pic=imgSeq[idx]
return pic
if __name__ == '__main__':
#server = SimpleServer()
server = MyServer()
#server.setDebugLevel(4)
server.createPV(prefix, pvdb)
#driver = myDriver()
for pfx,pvdb in pvdbSwissMx.items():
#print(pfx,pvdb)
print(pfx,end='')
for k in pvdb.keys():
print(' ',k,end='')
print('')
server.createPV(pfx, pvdb)
print('Run server...')
driver = myDriver()
# process CA transactions
while True:
server.process(0.1)

View File

@@ -324,7 +324,13 @@ class MotorTweak(QWidget, Ui_MotorTweak):
short_name=self.short_name, short_name=self.short_name,
precision=m.PREC, precision=m.PREC,
units=m.units) units=m.units)
self.label.setText(target[self._label_style].format(rbv=m.readback)) try:
self.label.setText(target[self._label_style].format(rbv=m.readback))
except ValueError as e:
_log.error(e)
print(self._label_style)
print(m.readback)
print(m)
def paintEvent(self, e): def paintEvent(self, e):
qp = QPainter() qp = QPainter()

View File

@@ -86,6 +86,8 @@ from PyQt5.QtWidgets import (
from PyQt5.uic import loadUiType from PyQt5.uic import loadUiType
ts.log('Import part 4/7:') ts.log('Import part 4/7:')
import CustomROI as CstROI import CustomROI as CstROI
import pyqtUsrObj as UsrGO
#from CustomROI import BeamMark, Grid, CrystalCircle #ZAC: orig. code #from CustomROI import BeamMark, Grid, CrystalCircle #ZAC: orig. code
#from GenericDialog import GenericDialog #ZAC: orig. code #from GenericDialog import GenericDialog #ZAC: orig. code
#from dialogs.PreferencesDialog import PreferencesDialog #ZAC: orig. code #from dialogs.PreferencesDialog import PreferencesDialog #ZAC: orig. code
@@ -396,6 +398,10 @@ class Main(QMainWindow, Ui_MainWindow):
tr.rotate(30) tr.rotate(30)
bm.setTransform(tr) # assign transform bm.setTransform(tr) # assign transform
self.vb.addItem(self._beammark) self.vb.addItem(self._beammark)
bm=UsrGO.BeamMark([50, 120], [30, 20])
self.vb.addItem(bm)
vi=UsrGO.Grid((120, -100), (200, 150), (30, 20), 2)
self.vb.addItem(vi)
def camera_pause_toggle(self): def camera_pause_toggle(self):
app=QApplication.instance() app=QApplication.instance()