#!/usr/bin/env python # *-----------------------------------------------------------------------* # | | # | Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) | # | Based on Zac great first implementation | # | Author Thierry Zamofing (thierry.zamofing@psi.ch) | # *-----------------------------------------------------------------------* """ Hi Zac, at the moment there is no channel for the min or max values. The reason for this is the binning. With this you change these numbers. At the end you need 3*2 numbers : Physical resolution Max resolution for ROI Resolution of your image And the logic how you can change the numbers ;-) The simples trick for getting the max resolution is to set binning to 1 and write big numbers in the ROI (REGIONX_END / REGIONY_END) channels . They will be corrected to the max resolution. Sorry for this. When you want all numbers, I can think about the export of the whole "mess". Inside the IOC there is the central checking routine. Sometimes it could be helpful to have this in EPICS. Best regards Helge """ import logging _log = logging.getLogger(__name__) import enum, epics, time import numpy as np class Camera(enum.IntEnum): OFF = 0 RUNNING = 1 class CameraStatus(enum.IntEnum): OFFLINE = 0 IDLE = 1 RUNNING = 2 class epics_cam(object): def __init__(self, prefix='SARES30-CAMS156-SMX-OAV'): if prefix is None: self._sim={'exp':3.1315} _log.info('simulated mode:{}'.format(self._sim)) return #simulated mode self._pv=pv=dict() if prefix[-1]!=':': prefix+=':' self._prefix=prefix def getPv(self,name): try: pv=self._pv[name] except KeyError: prefix=self._prefix pv=epics.PV(prefix+name) self._pv[name]=pv return pv def new_frame_cb(self, **kwargs): """kargs contains: pvname: the name of the pv value: the latest value char_value: string representation of value count: the number of data elements ftype: the numerical CA type indicating the data type type: the python type for the data status: the status of the PV (1 for OK) precision: number of decimal places of precision for floating point values units: string for PV units severity: PV severity timestamp: timestamp from CA server. read_access: read access (True/False) write_access: write access (True/False) access: string description of read- and write-access host: host machine and CA port serving PV enum_strs: the list of enumeration strings upper_disp_limit: upper display limit lower_disp_limit: lower display limit upper_alarm_limit: upper alarm limit lower_alarm_limit: lower alarm limit upper_warning_limit: upper warning limit lower_warning_limit: lower warning limit upper_ctrl_limit: upper control limit lower_ctrl_limit: lower control limit chid: integer channel ID cb_info: (index, self) tuple containing callback ID and the PV object""" print('camera.new_frame_pv_cb') def get_image(self): try: pv_pic=self.getPv('FPICTURE') except AttributeError: imgSeq=self._sim['imgSeq'] idx=self._sim['imgIdx'] self._sim['imgIdx']=(idx + 1) % imgSeq.shape[0] _log.info('simulated idx:{}'.format(idx)) self.pic=pic=imgSeq[idx] return pic try: pv_pic=self.getPv('FPICTURE') sz=self._sz pic = pv_pic.get(count=sz[0]*sz[1], as_numpy=True).reshape(sz[::-1]) except AttributeError as e: _log.warning("failed to fetch image") else: if pic.dtype==np.int16: pic.dtype=np.uint16 try: trf=self._transformation except AttributeError as e: pass else: if trf[1,0]==0: pic=pic[::trf[0,0],::trf[1,1]] else: pic=pic[::trf[0,1],::trf[1,0]].T self.pic=pic return pic def stop(self,v=CameraStatus.IDLE): if 'pic' in self._pv: del self._pv['pic'] if v is not None: pv_cs = self.getPv('CAMERASTATUS') pv_cs.put(v, wait=True) def run(self,cb=None): try: pv_cam=self.getPv('CAMERA') except AttributeError: _log.info('simulated mode') if cb: _log.error('simulated mode with callback not yet supported:{}'.format(cb)) return if pv_cam.value==Camera.OFF: pv_cs = self.getPv('CAMERASTATUS') pv_cs.put(CameraStatus.RUNNING, wait=True) while pv_cam.value==Camera.OFF: _log.warning('CAMERASTATUS:OFF, retry...');time.sleep(.5) self.update_size() if cb is None: self._pv['pic'] = epics.PV(self._prefix + "FPICTURE") else: self._pv['pic'] = epics.PV(self._prefix + "FPICTURE", auto_monitor=True, callback=cb) def update_size(self): pv_w=self.getPv('WIDTH');pv_h=self.getPv('HEIGHT') self._sz=(int(pv_w.value), int(pv_h.value)) def set_exposure(self,exp): try: pv_exp=self.getPv('EXPOSURE') except AttributeError: _log.info('simulated mode:{}'.format(exp)) self._sim['exp']=exp return pv_exp.put(exp, wait=True) def get_exposure(self): try: pv_exp=self.getPv('EXPOSURE') except AttributeError: exp=self._sim['exp'];_log.info('simulated mode:{}'.format(exp)) return exp return pv_exp.get() def set_roi(self,rxs,rxe,rys,rye): pv_rxs=self.getPv('REGIONX_START');pv_rxe=self.getPv('REGIONX_END') pv_rys=self.getPv('REGIONY_START');pv_rye=self.getPv('REGIONY_END') self.update_params((pv_rxs,rxs),(pv_rxe,rxe),(pv_rys,rys),(pv_rye,rye)) def set_binning(self,bx,by): pv_bx=self.getPv('BINX');pv_by=self.getPv('BINY') self.update_params((pv_bx,bx),(pv_by,by)) #def update_params(self, **kargs): def update_params(self, *args): """update parameters on camera""" for pv, val in args: if not pv.connected: _log.info('force connect {}'.format(pv)) pv.force_connect() #force to connect pv _log.debug("updating {} = {}".format(pv.pvname, val)) pv.put(val, wait=True) pv_cam=self.getPv('CAMERA') pv_cs = self.getPv('CAMERASTATUS') #pv_set_param=self.getPv("SET_PARAM") pv_cam.put(CameraStatus.IDLE, wait=True) #pv_set_param.put(1, wait=True) pv_cam.put(CameraStatus.RUNNING, wait=True) self.update_size() def sim_gen(self,sz=(1500,1000),t=100,mode=0): 'generate simulation data' _log.info('generate simulation images, mode:{}...'.format(mode)) w,h=sz self._imgSeq=imgSeq=np.ndarray(shape=(t,h,w),dtype=np.uint16) x = np.linspace(-5, 5, w) y = np.linspace(-5, 5, h) # full coordinate arrays xx, yy = np.meshgrid(x, y) for i in range(t): #imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin(yy+.01*i)**2)#+xx*t+yy*t) #imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+.1*i)**2 + np.sin((1+.1*np.sin(.2*i))*yy+.001*i**2)**2)#+xx*t+yy*t) #imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx+2*np.sin(i/t*2*np.pi))**2 + np.sin(yy)**2) px=2*np.sin(i/t*2*np.pi) fx=1 py=2*np.sin(i/t*2*np.pi) fy=1+.3*np.sin(i/t*2*np.pi*2) imgSeq[i,:,:] = 100*np.sqrt(np.sin(xx*fx+px)**2 + np.sin(yy*fy+py)**2) #np.random.bytes(100) wr=w//4 hr=h//4 imgSeq[:,0:hr,0:wr]+=np.random.randint(0,100,(t,hr,wr),dtype=np.uint16) self._sim['imgSeq']=imgSeq self._sim['imgIdx']=0 _log.info('dome') def set_transformations(self,*args): _log.error('OLD FUNCTION NOT IMPLEMENTED {}'.format(args)) if __name__ == "__main__": import time, os, PIL.Image, platform, subprocess import argparse logging.basicConfig(level=logging.DEBUG,format='%(levelname)s:%(module)s:%(lineno)d:%(funcName)s:%(message)s ') def default_app_open(file): if platform.system() == 'Darwin': # macOS subprocess.call(('open', file)) elif platform.system() == 'Windows': # Windows os.startfile(file) else: # linux variants subprocess.call(('xdg-open', file)) parser = argparse.ArgumentParser() parser.add_argument("--ui", "-u", help="qt test", type=int, default=0) parser.add_argument("--sim", "-s", help="simulation mode", type=int, default=None) parser.add_argument("--prefix","-p",help="PV prefix for images: default=%(default)s",type=str,default="SARES30-CAMS156-SMX-OAV",) args = parser.parse_args() _log.info('Arguments:{}'.format(args.__dict__)) if args.sim is not None: args.prefix=None # use simulated camera elif args.prefix=='SwissMxSim': os.environ['EPICS_CA_ADDR_LIST']='localhost' #simulated epics camera else: os.environ['EPICS_CA_ADDR_LIST'] ='129.129.244.255 sf-saresc-cagw.psi.ch:5062 sf-saresc-cagw.psi.ch:5066' if not args.ui: cam = epics_cam(prefix=args.prefix) #cam._transformation=np.array(((-1,0),(0,1)),dtype=np.uint8) #ZAC: orig. code if args.prefix is None: cam.sim_gen(mode=args.sim) #sz=(2448,2048) #ctr=(1200,1400) #sz=(1200,1000) #cam.set_roi(int(ctr[0]-sz[0]/2),int(ctr[0]+sz[0]/2),int(ctr[1]-sz[1]/2),int(ctr[1]+sz[1]/2)) #cam.set_exposure(3) #cam.set_binning(1,1) cam.run() n = 1 base = "/tmp/image_{:05d}.png" while True: img=cam.get_image() mx= img.max() if img.dtype==np.uint16 and mx<256: _log.info('reformat to uint 8') img=np.uint8(img) print(img,img.dtype) fn = base.format(n) PIL.Image.fromarray(img).save(fn) _log.info('File {} shape:{} dtype:{} max:{}'.format(fn,img.shape,img.dtype,img.max())) default_app_open(fn) try: input(" for next image, ctrl-c to abort") except KeyboardInterrupt: exit(0) n += 1 else: class UIcamera(epics_cam): def __init__(self, prefix="ESB-MX-CAM"): epics_cam.__init__(self,prefix) pass def new_frame_pv_cb(self, **kwargs): pv = self._pv sz = self._sz if kwargs['count']==sz[0] * sz[1]: pic=kwargs['value'].reshape(sz[::-1]) else: sz=self.update_size() pic=kwargs['value'].reshape(sz[::-1]) _log.debug('new_frame_pv_cb count {}'.format(kwargs['count'])) if pic.dtype==np.int16: pic.dtype=np.uint16 try: trf=self._transformation except AttributeError as e: pass else: if trf[1,0]==0: pic=pic[::trf[0,0],::trf[1,1]] else: pic=pic[::trf[0,1],::trf[1,0]].T #feducial test f=np.array(((0,0,0,0,0), (0,1,1,1,0), (0,1,0,0,0), (0,1,1,0,0), (0,1,0,0,0), (0,0,0,0,0),),pic.dtype) pic[0:6,0:5]=f*pic.max() imv.setImage(pic, autoRange=False, autoLevels=False) def new_frame_sim_cb(self,arl=False): imgSeq =self._sim['imgSeq'] idx =self._sim['imgIdx'] fps =self._sim['fps']; udt =self._sim['updateTime'] self._sim['imgIdx']=(idx+1) % imgSeq.shape[0] #_log.info('simulated idx:{}'.format(idx)) pic = imgSeq[idx] imv.setImage(pic, autoRange=arl, autoLevels=arl) QtCore.QTimer.singleShot(1, self.new_frame_sim_cb) now = ptime.time() fps2 = 1.0 / (now - udt) self._sim['updateTime'] = now self._sim['fps'] = fps * 0.9 + fps2 * 0.1 print("%d %0.1f fps" % (idx,fps)) import sys from pyqtgraph.Qt import QtCore, QtGui import pyqtgraph as pg import pyqtgraph.ptime as ptime print(pg.__version__) # Interpret image data as row-major instead of col-major pg.setConfigOptions(imageAxisOrder='row-major') app = QtGui.QApplication([]) ## Create window with ImageView widget win = QtGui.QMainWindow() win.resize(800,800) imv = pg.ImageView() win.setCentralWidget(imv) win.show() win.setWindowTitle('pyqtgraph example: ImageView') ## Display the data and assign each frame a time value from 1.0 to 3.0 cam = UIcamera(prefix=args.prefix) #cam.set_binning(4,4) #cam.run() #cam.get_image() #imv.setImage(cam.pic, autoRange=True, autoLevels=True) #cam.stop(None) cam.run(cam.new_frame_pv_cb) if args.prefix is None: cam.sim_gen(mode=args.sim) cam._sim['fps']=0; cam._sim['updateTime'] = ptime.time() cam.new_frame_sim_cb(arl=True) ## Set a custom color map colors = [(0, 0, 0),(45, 5, 61),(84, 42, 55),(150, 87, 60),(208, 171, 141),(255, 255, 255)] cmap = pg.ColorMap(pos=np.linspace(0.0, 1.0, 6), color=colors) imv.setColorMap(cmap) ## Start Qt event loop unless running in interactive mode. if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): QtGui.QApplication.instance().exec_()