Files
SwissMX/camera.py

483 lines
16 KiB
Python
Executable File

#!/usr/bin/env python
# *-----------------------------------------------------------------------*
# | |
# | Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) |
# | Based on Zac great first implementation |
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
# *-----------------------------------------------------------------------*
"""
Hi Zac,
at the moment there is no channel for the min or max values. The reason for this is the binning. With this you change these numbers.
At the end you need 3*2 numbers :
Physical resolution
Max resolution for ROI
Resolution of your image
And the logic how you can change the numbers ;-)
The simples trick for getting the max resolution is to set binning to 1 and write big numbers in the ROI (REGIONX_END / REGIONY_END) channels . They will be corrected to the max resolution.
Sorry for this. When you want all numbers, I can think about the export of the whole "mess". Inside the IOC there is the central checking routine. Sometimes it could be helpful to have this in EPICS.
Best regards
Helge
"""
# In [2]: np.array(range(20))
# Out[2]:
# array([ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
# 17, 18, 19])
# In [3]: np.array(range(20)).reshape(4,5)
# Out[3]:
# array([[ 0, 1, 2, 3, 4],
# [ 5, 6, 7, 8, 9],
# [10, 11, 12, 13, 14],
# [15, 16, 17, 18, 19]])
# shape is (imgidx,h,w) w is the fast counting index
import logging
_log = logging.getLogger(__name__)
import enum, epics, time
import numpy as np
class Camera(enum.IntEnum):
OFF = 0
RUNNING = 1
class CameraStatus(enum.IntEnum):
OFFLINE = 0
IDLE = 1
RUNNING = 2
class epics_cam(object):
def __init__(self, prefix='SARES30-CAMS156-SMX-OAV'):
if prefix is None:
self._sim={'exp':3.1315}
_log.info('simulated mode:{}'.format(self._sim))
return #simulated mode
self._pv=pv=dict()
if prefix[-1]!=':': prefix+=':'
self._prefix=prefix
def getPv(self,name):
try:
pv=self._pv[name]
except KeyError:
prefix=self._prefix
pv=epics.PV(prefix+name)
self._pv[name]=pv
return pv
def new_frame_cb(self, **kwargs):
"""kargs contains:
pvname: the name of the pv
value: the latest value
char_value: string representation of value
count: the number of data elements
ftype: the numerical CA type indicating the data type
type: the python type for the data
status: the status of the PV (1 for OK)
precision: number of decimal places of precision for floating point values
units: string for PV units
severity: PV severity
timestamp: timestamp from CA server.
read_access: read access (True/False)
write_access: write access (True/False)
access: string description of read- and write-access
host: host machine and CA port serving PV
enum_strs: the list of enumeration strings
upper_disp_limit: upper display limit
lower_disp_limit: lower display limit
upper_alarm_limit: upper alarm limit
lower_alarm_limit: lower alarm limit
upper_warning_limit: upper warning limit
lower_warning_limit: lower warning limit
upper_ctrl_limit: upper control limit
lower_ctrl_limit: lower control limit
chid: integer channel ID
cb_info: (index, self) tuple containing callback ID
and the PV object"""
print('camera.new_frame_pv_cb')
def get_image(self):
_log.warning('this can be very slow. Use callbacks if possible.')
try:
pv_pic=self.getPv('FPICTURE')
except AttributeError:
imgSeq=self._sim['imgSeq']
idx=self._sim['imgIdx']
self._sim['imgIdx']=(idx + 1) % imgSeq.shape[0]
#_log.debug('simulated idx:{}'.format(idx))
self.pic=pic=imgSeq[idx]
return pic
try:
pv_pic=self.getPv('FPICTURE')
sz=self._sz
pic = pv_pic.get(count=sz[0]*sz[1], as_numpy=True).reshape(sz[::-1])
epics_cam.set_fiducial(pic,255)
except AttributeError as e:
_log.warning("failed to fetch image")
else:
if pic.dtype==np.int16:
pic.dtype=np.uint16
try:
trf=self._transformation
except AttributeError as e:
pass
else:
if trf[1,0]==0:
pic=pic[::trf[0,0],::trf[1,1]]
else:
pic=pic[::trf[0,1],::trf[1,0]].T
self.pic=pic
return pic
def stop(self,v=CameraStatus.IDLE):
if 'pic' in self._pv:
del self._pv['pic']
if v is not None:
pv_cs = self.getPv('CAMERASTATUS')
pv_cs.put(v, wait=True)
def run(self,cb=None):
try:
pv_cam=self.getPv('CAMERA')
except AttributeError:
_log.info('simulated mode')
if cb:
_log.error('simulated mode with callback not yet supported:{}'.format(cb))
return
if pv_cam.value==Camera.OFF:
pv_cs = self.getPv('CAMERASTATUS')
pv_cs.put(CameraStatus.RUNNING, wait=True)
while pv_cam.value==Camera.OFF:
_log.warning('CAMERASTATUS:OFF, retry...');time.sleep(.5)
self.update_size()
if cb is None:
self._pv['pic'] = epics.PV(self._prefix + "FPICTURE")
else:
self._pv['pic'] = epics.PV(self._prefix + "FPICTURE", auto_monitor=True, callback=cb)
def update_size(self):
pv_w=self.getPv('WIDTH');pv_h=self.getPv('HEIGHT')
self._sz=(int(pv_w.value), int(pv_h.value))
def set_exposure(self,exp):
try:
pv_exp=self.getPv('EXPOSURE')
except AttributeError:
_log.info('simulated mode:{}'.format(exp))
self._sim['exp']=exp
return
pv_exp.put(exp, wait=True)
def get_exposure(self):
try:
pv_exp=self.getPv('EXPOSURE')
except AttributeError:
exp=self._sim['exp'];_log.info('simulated mode:{}'.format(exp))
return exp
return pv_exp.get()
def set_roi(self,rxs,rxe,rys,rye):
pv_rxs=self.getPv('REGIONX_START');pv_rxe=self.getPv('REGIONX_END')
pv_rys=self.getPv('REGIONY_START');pv_rye=self.getPv('REGIONY_END')
self.update_params((pv_rxs,rxs),(pv_rxe,rxe),(pv_rys,rys),(pv_rye,rye))
def set_binning(self,bx,by):
pv_bx=self.getPv('BINX');pv_by=self.getPv('BINY')
self.update_params((pv_bx,bx),(pv_by,by))
#def update_params(self, **kargs):
def update_params(self, *args):
"""update parameters on camera"""
for pv, val in args:
if not pv.connected:
_log.info('force connect {}'.format(pv))
pv.force_connect() #force to connect pv
_log.debug("updating {} = {}".format(pv.pvname, val))
pv.put(val, wait=True)
pv_cam=self.getPv('CAMERA')
pv_cs = self.getPv('CAMERASTATUS')
#pv_set_param=self.getPv("SET_PARAM")
pv_cam.put(CameraStatus.IDLE, wait=True)
#pv_set_param.put(1, wait=True)
pv_cam.put(CameraStatus.RUNNING, wait=True)
self.update_size()
@staticmethod
def set_fiducial(pic,val):
# fiducial test
f=np.array(((0, 0, 0, 0, 0),
(0, 1, 1, 1, 0),
(0, 1, 0, 0, 0),
(0, 1, 1, 0, 0),
(0, 1, 0, 0, 0),
(0, 0, 0, 0, 0),), pic.dtype)
pic[0:6, 0:5]=f*pic.max()
def sim_gen(self,sz=(1500,1000),t=100,mode=0):
'generate simulation data'
if mode==0:
_log.info('generate {} pulsing wases simulation images, mode:{}...'.format(t,mode))
w,h=sz
imgSeq=np.ndarray(shape=(t,h,w),dtype=np.uint16)
x = np.linspace(-5, 5, w)
y = np.linspace(-5, 5, h)
# full coordinate arrays
xx, yy = np.meshgrid(x, y)
for i in range(t):
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin(yy+.01*i)**2)#+xx*t+yy*t)
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin((1+.1*np.sin(.2*i))*yy+.001*i**2)**2)#+xx*t+yy*t)
#imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+2*np.sin(i/t*2*np.pi))**2 + np.sin(yy)**2)
px=2*np.sin(i/t*2*np.pi)
fx=1
py=2*np.sin(i/t*2*np.pi)
fy=1+.3*np.sin(i/t*2*np.pi*2)
imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx*fx+px)**2 + np.sin(yy*fy+py)**2)
#np.random.bytes(100)
wr=w//4
hr=h//4
imgSeq[:,0:hr,0:wr]+=np.random.randint(0,100,(t,hr,wr),dtype=np.uint16)
elif mode==1:
import glob,PIL.Image
path='/home/zamofing_t/Documents/prj/SwissFEL/epics_ioc_modules/ESB_MX/python/SwissMX/simCamImg/*.png'
_log.info('generate simulation images:{}...'.format(path))
glb=glob.glob(path)
img = PIL.Image.open(glb[0])
sz=img.size # (w,h)
imgSeq=np.ndarray(shape=(len(glb),sz[1],sz[0]),dtype=np.uint8) # shape is (n,h,w)
for i,fn in enumerate(glb):
img=PIL.Image.open(fn)
assert(img.mode=='L') # 8 bit grayscale
assert(sz==img.size)
imgSeq[i,:,:]=np.array(img.getdata()).reshape(sz[::-1])
pic=imgSeq[i]
epics_cam.set_fiducial(pic, 255)
self._sim['imgSeq']=imgSeq
self._sim['imgIdx']=0
_log.info('done-> shape:{} dtype:{}'.format(imgSeq.shape,imgSeq.dtype))
def set_transformations(self,*args):
_log.error('OLD FUNCTION NOT IMPLEMENTED {}'.format(args))
if __name__ == "__main__":
import time, os, PIL.Image, platform, subprocess
import argparse
logging.basicConfig(level=logging.DEBUG,format='%(name)s:%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ')
logging.getLogger('PIL').setLevel(logging.INFO)
def default_app_open(file):
if platform.system() == 'Darwin': # macOS
subprocess.call(('open', file))
elif platform.system() == 'Windows': # Windows
os.startfile(file)
else: # linux variants
subprocess.call(('xdg-open', file))
parser = argparse.ArgumentParser()
parser.add_argument("--ui", "-u", help="qt test", type=int, default=0)
parser.add_argument("--sim", "-s", help="simulation mode", type=int, default=None)
parser.add_argument("--delay", "-d", help="delay in simulation mode", type=float, default=None)
parser.add_argument("--prefix","-p",help="PV prefix for images: default=%(default)s",type=str,default="SARES30-CAMS156-SMX-OAV",)
args = parser.parse_args()
_log.info('Arguments:{}'.format(args.__dict__))
if args.sim is not None:
args.prefix=None # use simulated camera
elif args.prefix=='SwissMxSim':
os.environ['EPICS_CA_ADDR_LIST']='localhost' #simulated epics camera
else:
os.environ['EPICS_CA_ADDR_LIST'] ='129.129.244.255 sf-saresc-cagw.psi.ch:5062 sf-saresc-cagw.psi.ch:5066'
if not args.ui:
cam = epics_cam(prefix=args.prefix)
#cam._transformation=np.array(((-1,0),(0,1)),dtype=np.uint8) #ZAC: orig. code
if args.prefix is None:
cam.sim_gen(mode=args.sim)
#sz=(2448,2048)
#ctr=(1200,1400)
#sz=(1200,1000)
#cam.set_roi(int(ctr[0]-sz[0]/2),int(ctr[0]+sz[0]/2),int(ctr[1]-sz[1]/2),int(ctr[1]+sz[1]/2))
#cam.set_exposure(3)
#cam.set_binning(1,1)
cam.run()
n = 1
base = "/tmp/image_{:05d}.png"
while True:
img=cam.get_image()
mx= img.max()
if img.dtype==np.uint16 and mx<256:
_log.info('reformat to uint 8')
img=np.uint8(img)
print(img,img.dtype)
fn = base.format(n)
PIL.Image.fromarray(img).save(fn)
_log.info('File {} shape:{} dtype:{} max:{}'.format(fn,img.shape,img.dtype,img.max()))
default_app_open(fn)
try:
input("<enter> for next image, ctrl-c to abort")
except KeyboardInterrupt:
exit(0)
n += 1
else:
def closeEvent(cam,evt):
print(args)
try:
pv=cam._pv['pic']
pv.clear_auto_monitor() # disconnect PV monitor callback -> program exit faster.
except AttributeError:
_log.warning('disconnect PV callback failed.')
class UIcamera(epics_cam):
def __init__(self, prefix="ESB-MX-CAM"):
epics_cam.__init__(self,prefix)
pass
def new_frame_pv_cb(self, **kwargs):
pv = self._pv
sz = self._sz
if kwargs['count']==sz[0] * sz[1]:
pic=kwargs['value'].reshape(sz[::-1])
else:
sz=self.update_size()
pic=kwargs['value'].reshape(sz[::-1])
_log.debug('new_frame_pv_cb count {}'.format(kwargs['count']))
if pic.dtype==np.int16:
pic.dtype=np.uint16
try:
trf=self._transformation
except AttributeError as e:
pass
else:
if trf[1,0]==0:
pic=pic[::trf[0,0],::trf[1,1]]
else:
pic=pic[::trf[0,1],::trf[1,0]].T
#feducial test
f=np.array(((0,0,0,0,0),
(0,1,1,1,0),
(0,1,0,0,0),
(0,1,1,0,0),
(0,1,0,0,0),
(0,0,0,0,0),),pic.dtype)
pic[0:6,0:5]=f*pic.max()
if args.ui==1:
img.setImage(pic)
else:
imv.setImage(pic, autoRange=False, autoLevels=False)
def new_frame_sim_cb(self,arl=False):
imgSeq =self._sim['imgSeq']
idx =self._sim['imgIdx']
fps =self._sim['fps'];
udt =self._sim['updateTime']
self._sim['imgIdx']=(idx+1) % imgSeq.shape[0]
#_log.info('simulated idx:{}'.format(idx))
pic = imgSeq[idx]
if args.ui==1:
img.setImage(pic)
else:
imv.setImage(pic, autoRange=arl, autoLevels=arl)
if args.delay:
QtCore.QTimer.singleShot(int(1000*args.delay), self.new_frame_sim_cb)
else:
QtCore.QTimer.singleShot(0, self.new_frame_sim_cb)
now = ptime.time()
fps2 = 1.0 / (now - udt)
self._sim['updateTime'] = now
self._sim['fps'] = fps * 0.9 + fps2 * 0.1
print("%d %0.1f fps" % (idx,fps))
import sys
from pyqtgraph.Qt import QtCore, QtGui
import pyqtgraph as pg
import pyqtgraph.ptime as ptime
print(pg.__version__)
# Interpret image data as row-major instead of col-major
pg.setConfigOptions(imageAxisOrder='row-major')
app = QtGui.QApplication([])
if args.ui==1:
win = pg.GraphicsLayoutWidget()
win.show() ## show widget alone in its own window
win.setWindowTitle('pyqtgraph example: ImageItem')
view = win.addViewBox(invertY=True)
## lock the aspect ratio so pixels are always square
view.setAspectLocked(True)
## Create image item https://pyqtgraph.readthedocs.io/en/latest/graphicsItems/imageitem.html
img = pg.ImageItem(border='g')
#tr = QtGui.QTransform() # prepare ImageItem transformation:
#tr.scale(6.0, 3.0) # scale horizontal and vertical axes
#tr.translate(-1.5, -1.5) # move 3x3 image to locate center at axis origin
#img.setTransform(tr) # assign transform
view.addItem(img)
## Set initial view bounds
view.setRange(QtCore.QRectF(0, 0, 600, 600))
else:
## Create window with ImageView widget
win = QtGui.QMainWindow()
win.resize(800,800)
imv = pg.ImageView()
win.setCentralWidget(imv)
win.show()
win.setWindowTitle('pyqtgraph example: ImageView')
## Display the data and assign each frame a time value from 1.0 to 3.0
cam = UIcamera(prefix=args.prefix)
win.closeEvent=lambda x:closeEvent(cam, x)
#cam.set_binning(4,4)
#cam.run()
#cam.get_image()
#imv.setImage(cam.pic, autoRange=True, autoLevels=True)
#cam.stop(None)
cam.run(cam.new_frame_pv_cb)
if args.prefix is None:
cam.sim_gen(mode=args.sim)
cam._sim['fps']=0;
cam._sim['updateTime'] = ptime.time()
cam.new_frame_sim_cb(arl=True)
if args.ui==2:
## Set a custom color map
colors = [(0, 0, 0),(45, 5, 61),(84, 42, 55),(150, 87, 60),(208, 171, 141),(255, 255, 255)]
cmap = pg.ColorMap(pos=np.linspace(0.0, 1.0, 6), color=colors)
imv.setColorMap(cmap)
## Start Qt event loop unless running in interactive mode.
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()