467 lines
15 KiB
Python
Executable File
467 lines
15 KiB
Python
Executable File
#!/usr/bin/env python
|
|
# *-----------------------------------------------------------------------*
|
|
# | |
|
|
# | Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) |
|
|
# | Based on Zac great first implementation |
|
|
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
|
|
# *-----------------------------------------------------------------------*
|
|
|
|
"""
|
|
Hi Zac,
|
|
|
|
at the moment there is no channel for the min or max values. The reason for this is the binning. With this you change these numbers.
|
|
At the end you need 3*2 numbers :
|
|
Physical resolution
|
|
Max resolution for ROI
|
|
Resolution of your image
|
|
And the logic how you can change the numbers ;-)
|
|
|
|
The simples trick for getting the max resolution is to set binning to 1 and write big numbers in the ROI (REGIONX_END / REGIONY_END) channels . They will be corrected to the max resolution.
|
|
|
|
Sorry for this. When you want all numbers, I can think about the export of the whole "mess". Inside the IOC there is the central checking routine. Sometimes it could be helpful to have this in EPICS.
|
|
|
|
Best regards
|
|
Helge
|
|
"""
|
|
|
|
# In [2]: np.array(range(20))
|
|
# Out[2]:
|
|
# array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
|
|
# 17, 18, 19])
|
|
|
|
# In [3]: np.array(range(20)).reshape(4,5)
|
|
# Out[3]:
|
|
# array([[ 0, 1, 2, 3, 4],
|
|
# [ 5, 6, 7, 8, 9],
|
|
# [10, 11, 12, 13, 14],
|
|
# [15, 16, 17, 18, 19]])
|
|
|
|
# shape is (imgidx,h,w) w is the fast counting index
|
|
|
|
import logging
|
|
_log = logging.getLogger(__name__)
|
|
|
|
import enum, epics, time
|
|
import numpy as np
|
|
|
|
|
|
class Camera(enum.IntEnum):
|
|
OFF = 0
|
|
RUNNING = 1
|
|
|
|
class CameraStatus(enum.IntEnum):
|
|
OFFLINE = 0
|
|
IDLE = 1
|
|
RUNNING = 2
|
|
|
|
|
|
class epics_cam(object):
|
|
|
|
def __init__(self, prefix='SARES30-CAMS156-SMX-OAV'):
|
|
if prefix is None:
|
|
self._sim={'exp':3.1315}
|
|
_log.info('simulated mode:{}'.format(self._sim))
|
|
return #simulated mode
|
|
self._pv=pv=dict()
|
|
if prefix[-1]!=':': prefix+=':'
|
|
self._prefix=prefix
|
|
|
|
def getPv(self,name):
|
|
try:
|
|
pv=self._pv[name]
|
|
except KeyError:
|
|
prefix=self._prefix
|
|
pv=epics.PV(prefix+name)
|
|
self._pv[name]=pv
|
|
return pv
|
|
|
|
def new_frame_cb(self, **kwargs):
|
|
"""kargs contains:
|
|
pvname: the name of the pv
|
|
value: the latest value
|
|
char_value: string representation of value
|
|
count: the number of data elements
|
|
ftype: the numerical CA type indicating the data type
|
|
type: the python type for the data
|
|
status: the status of the PV (1 for OK)
|
|
precision: number of decimal places of precision for floating point values
|
|
units: string for PV units
|
|
severity: PV severity
|
|
timestamp: timestamp from CA server.
|
|
read_access: read access (True/False)
|
|
write_access: write access (True/False)
|
|
access: string description of read- and write-access
|
|
host: host machine and CA port serving PV
|
|
enum_strs: the list of enumeration strings
|
|
upper_disp_limit: upper display limit
|
|
lower_disp_limit: lower display limit
|
|
upper_alarm_limit: upper alarm limit
|
|
lower_alarm_limit: lower alarm limit
|
|
upper_warning_limit: upper warning limit
|
|
lower_warning_limit: lower warning limit
|
|
upper_ctrl_limit: upper control limit
|
|
lower_ctrl_limit: lower control limit
|
|
chid: integer channel ID
|
|
cb_info: (index, self) tuple containing callback ID
|
|
and the PV object"""
|
|
print('camera.new_frame_pv_cb')
|
|
|
|
def get_image(self):
|
|
try:
|
|
pv_pic=self.getPv('FPICTURE')
|
|
except AttributeError:
|
|
imgSeq=self._sim['imgSeq']
|
|
idx=self._sim['imgIdx']
|
|
self._sim['imgIdx']=(idx + 1) % imgSeq.shape[0]
|
|
#_log.debug('simulated idx:{}'.format(idx))
|
|
self.pic=pic=imgSeq[idx]
|
|
return pic
|
|
try:
|
|
pv_pic=self.getPv('FPICTURE')
|
|
sz=self._sz
|
|
pic = pv_pic.get(count=sz[0]*sz[1], as_numpy=True).reshape(sz[::-1])
|
|
except AttributeError as e:
|
|
_log.warning("failed to fetch image")
|
|
else:
|
|
if pic.dtype==np.int16:
|
|
pic.dtype=np.uint16
|
|
try:
|
|
trf=self._transformation
|
|
except AttributeError as e:
|
|
pass
|
|
else:
|
|
if trf[1,0]==0:
|
|
pic=pic[::trf[0,0],::trf[1,1]]
|
|
else:
|
|
pic=pic[::trf[0,1],::trf[1,0]].T
|
|
|
|
self.pic=pic
|
|
return pic
|
|
|
|
def stop(self,v=CameraStatus.IDLE):
|
|
if 'pic' in self._pv:
|
|
del self._pv['pic']
|
|
if v is not None:
|
|
pv_cs = self.getPv('CAMERASTATUS')
|
|
pv_cs.put(v, wait=True)
|
|
|
|
def run(self,cb=None):
|
|
try:
|
|
pv_cam=self.getPv('CAMERA')
|
|
except AttributeError:
|
|
_log.info('simulated mode')
|
|
if cb:
|
|
_log.error('simulated mode with callback not yet supported:{}'.format(cb))
|
|
return
|
|
if pv_cam.value==Camera.OFF:
|
|
pv_cs = self.getPv('CAMERASTATUS')
|
|
pv_cs.put(CameraStatus.RUNNING, wait=True)
|
|
while pv_cam.value==Camera.OFF:
|
|
_log.warning('CAMERASTATUS:OFF, retry...');time.sleep(.5)
|
|
self.update_size()
|
|
if cb is None:
|
|
self._pv['pic'] = epics.PV(self._prefix + "FPICTURE")
|
|
else:
|
|
self._pv['pic'] = epics.PV(self._prefix + "FPICTURE", auto_monitor=True, callback=cb)
|
|
|
|
def update_size(self):
|
|
pv_w=self.getPv('WIDTH');pv_h=self.getPv('HEIGHT')
|
|
self._sz=(int(pv_w.value), int(pv_h.value))
|
|
|
|
def set_exposure(self,exp):
|
|
try:
|
|
pv_exp=self.getPv('EXPOSURE')
|
|
except AttributeError:
|
|
_log.info('simulated mode:{}'.format(exp))
|
|
self._sim['exp']=exp
|
|
return
|
|
pv_exp.put(exp, wait=True)
|
|
|
|
def get_exposure(self):
|
|
try:
|
|
pv_exp=self.getPv('EXPOSURE')
|
|
except AttributeError:
|
|
exp=self._sim['exp'];_log.info('simulated mode:{}'.format(exp))
|
|
return exp
|
|
return pv_exp.get()
|
|
|
|
def set_roi(self,rxs,rxe,rys,rye):
|
|
pv_rxs=self.getPv('REGIONX_START');pv_rxe=self.getPv('REGIONX_END')
|
|
pv_rys=self.getPv('REGIONY_START');pv_rye=self.getPv('REGIONY_END')
|
|
self.update_params((pv_rxs,rxs),(pv_rxe,rxe),(pv_rys,rys),(pv_rye,rye))
|
|
|
|
def set_binning(self,bx,by):
|
|
pv_bx=self.getPv('BINX');pv_by=self.getPv('BINY')
|
|
self.update_params((pv_bx,bx),(pv_by,by))
|
|
|
|
#def update_params(self, **kargs):
|
|
def update_params(self, *args):
|
|
"""update parameters on camera"""
|
|
for pv, val in args:
|
|
if not pv.connected:
|
|
_log.info('force connect {}'.format(pv))
|
|
pv.force_connect() #force to connect pv
|
|
_log.debug("updating {} = {}".format(pv.pvname, val))
|
|
pv.put(val, wait=True)
|
|
pv_cam=self.getPv('CAMERA')
|
|
pv_cs = self.getPv('CAMERASTATUS')
|
|
#pv_set_param=self.getPv("SET_PARAM")
|
|
pv_cam.put(CameraStatus.IDLE, wait=True)
|
|
#pv_set_param.put(1, wait=True)
|
|
pv_cam.put(CameraStatus.RUNNING, wait=True)
|
|
self.update_size()
|
|
|
|
|
|
def sim_gen(self,sz=(1500,1000),t=100,mode=0):
|
|
'generate simulation data'
|
|
if mode==0:
|
|
_log.info('generate {} pulsing wases simulation images, mode:{}...'.format(t,mode))
|
|
w,h=sz
|
|
imgSeq=np.ndarray(shape=(t,h,w),dtype=np.uint16)
|
|
x = np.linspace(-5, 5, w)
|
|
y = np.linspace(-5, 5, h)
|
|
# full coordinate arrays
|
|
xx, yy = np.meshgrid(x, y)
|
|
|
|
for i in range(t):
|
|
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin(yy+.01*i)**2)#+xx*t+yy*t)
|
|
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin((1+.1*np.sin(.2*i))*yy+.001*i**2)**2)#+xx*t+yy*t)
|
|
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+2*np.sin(i/t*2*np.pi))**2 + np.sin(yy)**2)
|
|
px=2*np.sin(i/t*2*np.pi)
|
|
fx=1
|
|
py=2*np.sin(i/t*2*np.pi)
|
|
fy=1+.3*np.sin(i/t*2*np.pi*2)
|
|
imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx*fx+px)**2 + np.sin(yy*fy+py)**2)
|
|
#np.random.bytes(100)
|
|
wr=w//4
|
|
hr=h//4
|
|
imgSeq[:,0:hr,0:wr]+=np.random.randint(0,100,(t,hr,wr),dtype=np.uint16)
|
|
elif mode==1:
|
|
import glob,PIL.Image
|
|
path='/home/zamofing_t/Documents/prj/SwissFEL/epics_ioc_modules/ESB_MX/python/SwissMX/simCamImg/*.png'
|
|
_log.info('generate simulation images:{}...'.format(path))
|
|
glb=glob.glob(path)
|
|
img = PIL.Image.open(glb[0])
|
|
sz=img.size # (w,h)
|
|
imgSeq=np.ndarray(shape=(len(glb),sz[1],sz[0]),dtype=np.uint8) # shape is (n,h,w)
|
|
for i,fn in enumerate(glb):
|
|
img=PIL.Image.open(fn)
|
|
assert(img.mode=='L') # 8 bit grayscale
|
|
assert(sz==img.size)
|
|
imgSeq[i,:,:]=np.array(img.getdata()).reshape(sz[::-1])
|
|
f=np.array(((0,0,0,0,0),
|
|
(0,1,1,1,0),
|
|
(0,1,0,0,0),
|
|
(0,1,1,0,0),
|
|
(0,1,0,0,0),
|
|
(0,0,0,0,0),),imgSeq.dtype)
|
|
imgSeq[i,0:6,0:5]=f*255
|
|
|
|
|
|
|
|
self._sim['imgSeq']=imgSeq
|
|
self._sim['imgIdx']=0
|
|
_log.info('done-> shape:{} dtype:{}'.format(imgSeq.shape,imgSeq.dtype))
|
|
|
|
def set_transformations(self,*args):
|
|
_log.error('OLD FUNCTION NOT IMPLEMENTED {}'.format(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import time, os, PIL.Image, platform, subprocess
|
|
import argparse
|
|
logging.basicConfig(level=logging.DEBUG,format='%(name)s:%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
|
|
logging.getLogger('PIL').setLevel(logging.INFO)
|
|
|
|
|
|
def default_app_open(file):
|
|
if platform.system() == 'Darwin': # macOS
|
|
subprocess.call(('open', file))
|
|
elif platform.system() == 'Windows': # Windows
|
|
os.startfile(file)
|
|
else: # linux variants
|
|
subprocess.call(('xdg-open', file))
|
|
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--ui", "-u", help="qt test", type=int, default=0)
|
|
parser.add_argument("--sim", "-s", help="simulation mode", type=int, default=None)
|
|
parser.add_argument("--delay", "-d", help="delay in simulation mode", type=float, default=None)
|
|
parser.add_argument("--prefix","-p",help="PV prefix for images: default=%(default)s",type=str,default="SARES30-CAMS156-SMX-OAV",)
|
|
args = parser.parse_args()
|
|
|
|
_log.info('Arguments:{}'.format(args.__dict__))
|
|
if args.sim is not None:
|
|
args.prefix=None # use simulated camera
|
|
elif args.prefix=='SwissMxSim':
|
|
os.environ['EPICS_CA_ADDR_LIST']='localhost' #simulated epics camera
|
|
else:
|
|
os.environ['EPICS_CA_ADDR_LIST'] ='129.129.244.255 sf-saresc-cagw.psi.ch:5062 sf-saresc-cagw.psi.ch:5066'
|
|
|
|
|
|
if not args.ui:
|
|
cam = epics_cam(prefix=args.prefix)
|
|
#cam._transformation=np.array(((-1,0),(0,1)),dtype=np.uint8) #ZAC: orig. code
|
|
if args.prefix is None:
|
|
cam.sim_gen(mode=args.sim)
|
|
|
|
#sz=(2448,2048)
|
|
#ctr=(1200,1400)
|
|
#sz=(1200,1000)
|
|
#cam.set_roi(int(ctr[0]-sz[0]/2),int(ctr[0]+sz[0]/2),int(ctr[1]-sz[1]/2),int(ctr[1]+sz[1]/2))
|
|
#cam.set_exposure(3)
|
|
#cam.set_binning(1,1)
|
|
|
|
cam.run()
|
|
n = 1
|
|
base = "/tmp/image_{:05d}.png"
|
|
while True:
|
|
img=cam.get_image()
|
|
mx= img.max()
|
|
if img.dtype==np.uint16 and mx<256:
|
|
_log.info('reformat to uint 8')
|
|
img=np.uint8(img)
|
|
print(img,img.dtype)
|
|
|
|
fn = base.format(n)
|
|
PIL.Image.fromarray(img).save(fn)
|
|
_log.info('File {} shape:{} dtype:{} max:{}'.format(fn,img.shape,img.dtype,img.max()))
|
|
default_app_open(fn)
|
|
try:
|
|
input("<enter> for next image, ctrl-c to abort")
|
|
except KeyboardInterrupt:
|
|
exit(0)
|
|
n += 1
|
|
else:
|
|
class UIcamera(epics_cam):
|
|
|
|
def __init__(self, prefix="ESB-MX-CAM"):
|
|
epics_cam.__init__(self,prefix)
|
|
pass
|
|
|
|
def new_frame_pv_cb(self, **kwargs):
|
|
pv = self._pv
|
|
sz = self._sz
|
|
if kwargs['count']==sz[0] * sz[1]:
|
|
pic=kwargs['value'].reshape(sz[::-1])
|
|
else:
|
|
sz=self.update_size()
|
|
pic=kwargs['value'].reshape(sz[::-1])
|
|
_log.debug('new_frame_pv_cb count {}'.format(kwargs['count']))
|
|
if pic.dtype==np.int16:
|
|
pic.dtype=np.uint16
|
|
try:
|
|
trf=self._transformation
|
|
except AttributeError as e:
|
|
pass
|
|
else:
|
|
if trf[1,0]==0:
|
|
pic=pic[::trf[0,0],::trf[1,1]]
|
|
else:
|
|
pic=pic[::trf[0,1],::trf[1,0]].T
|
|
#feducial test
|
|
f=np.array(((0,0,0,0,0),
|
|
(0,1,1,1,0),
|
|
(0,1,0,0,0),
|
|
(0,1,1,0,0),
|
|
(0,1,0,0,0),
|
|
(0,0,0,0,0),),pic.dtype)
|
|
pic[0:6,0:5]=f*pic.max()
|
|
if args.ui==1:
|
|
img.setImage(pic)
|
|
else:
|
|
imv.setImage(pic, autoRange=False, autoLevels=False)
|
|
|
|
|
|
def new_frame_sim_cb(self,arl=False):
|
|
imgSeq =self._sim['imgSeq']
|
|
idx =self._sim['imgIdx']
|
|
fps =self._sim['fps'];
|
|
udt =self._sim['updateTime']
|
|
self._sim['imgIdx']=(idx+1) % imgSeq.shape[0]
|
|
#_log.info('simulated idx:{}'.format(idx))
|
|
pic = imgSeq[idx]
|
|
if args.ui==1:
|
|
img.setImage(pic)
|
|
else:
|
|
imv.setImage(pic, autoRange=arl, autoLevels=arl)
|
|
|
|
if args.delay:
|
|
QtCore.QTimer.singleShot(int(1000*args.delay), self.new_frame_sim_cb)
|
|
else:
|
|
QtCore.QTimer.singleShot(0, self.new_frame_sim_cb)
|
|
now = ptime.time()
|
|
fps2 = 1.0 / (now - udt)
|
|
self._sim['updateTime'] = now
|
|
self._sim['fps'] = fps * 0.9 + fps2 * 0.1
|
|
print("%d %0.1f fps" % (idx,fps))
|
|
|
|
import sys
|
|
from pyqtgraph.Qt import QtCore, QtGui
|
|
import pyqtgraph as pg
|
|
import pyqtgraph.ptime as ptime
|
|
print(pg.__version__)
|
|
|
|
# Interpret image data as row-major instead of col-major
|
|
pg.setConfigOptions(imageAxisOrder='row-major')
|
|
|
|
app = QtGui.QApplication([])
|
|
|
|
if args.ui==1:
|
|
win = pg.GraphicsLayoutWidget()
|
|
win.show() ## show widget alone in its own window
|
|
win.setWindowTitle('pyqtgraph example: ImageItem')
|
|
view = win.addViewBox(invertY=True)
|
|
|
|
## lock the aspect ratio so pixels are always square
|
|
view.setAspectLocked(True)
|
|
|
|
## Create image item https://pyqtgraph.readthedocs.io/en/latest/graphicsItems/imageitem.html
|
|
img = pg.ImageItem(border='g')
|
|
#tr = QtGui.QTransform() # prepare ImageItem transformation:
|
|
#tr.scale(6.0, 3.0) # scale horizontal and vertical axes
|
|
#tr.translate(-1.5, -1.5) # move 3x3 image to locate center at axis origin
|
|
#img.setTransform(tr) # assign transform
|
|
view.addItem(img)
|
|
|
|
## Set initial view bounds
|
|
view.setRange(QtCore.QRectF(0, 0, 600, 600))
|
|
else:
|
|
## Create window with ImageView widget
|
|
win = QtGui.QMainWindow()
|
|
win.resize(800,800)
|
|
imv = pg.ImageView()
|
|
win.setCentralWidget(imv)
|
|
win.show()
|
|
win.setWindowTitle('pyqtgraph example: ImageView')
|
|
|
|
|
|
## Display the data and assign each frame a time value from 1.0 to 3.0
|
|
cam = UIcamera(prefix=args.prefix)
|
|
|
|
|
|
#cam.set_binning(4,4)
|
|
#cam.run()
|
|
#cam.get_image()
|
|
#imv.setImage(cam.pic, autoRange=True, autoLevels=True)
|
|
#cam.stop(None)
|
|
cam.run(cam.new_frame_pv_cb)
|
|
|
|
if args.prefix is None:
|
|
cam.sim_gen(mode=args.sim)
|
|
cam._sim['fps']=0;
|
|
cam._sim['updateTime'] = ptime.time()
|
|
cam.new_frame_sim_cb(arl=True)
|
|
|
|
if args.ui==2:
|
|
## Set a custom color map
|
|
colors = [(0, 0, 0),(45, 5, 61),(84, 42, 55),(150, 87, 60),(208, 171, 141),(255, 255, 255)]
|
|
cmap = pg.ColorMap(pos=np.linspace(0.0, 1.0, 6), color=colors)
|
|
imv.setColorMap(cmap)
|
|
|
|
## Start Qt event loop unless running in interactive mode.
|
|
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
|
|
QtGui.QApplication.instance().exec_()
|