Files
dev/script/_Lib/epics/_ca_kek.py
2018-04-17 12:05:48 +02:00

546 lines
15 KiB
Python

#!/usr/bin/env python
## @package ca: EPICS-CA interface module for Python.
"""CA modlue : EPICS-CA interface module for Python.
This module provide a version of EPICS-CA and Python interface.
It users C module _ca. _ca module basically maps C-API in EPICS ca library into python. Interface between ca.py and _ca module is subject for change. You should not depend on it. API in ca.py will be preserved in future releases as much as possible.
Author: Noboru Yamamoto, KEK, JAPAN. -2007.
$Revision: 1.1 $
"""
from __future__ import print_function
__version__ = "$Revision: 1.1 $"
# $Source: /cvs/G/EPICS/extensions/src/PythonCA/src/_ca_kek.py,v $
#
try:
import signal
except:
print("signal module is not avaialble")
import time,thread,gc,sys,atexit
from exceptions import ValueError
# autGIL is not compatible with Tkinter and wx. So code was removed
# force thread module to call PyEval_InitThread in it.
__foo_lock=thread.allocate_lock()
def __foo():
"""
test function foo
This function is used to ensure thread module is initialized before
loading _ca module.
"""
global __foo_lock
__foo_lock.release()
thread.exit_thread()
# See Python/Include/ceval.h
__foo_lock.acquire()
thread.start_new_thread(__foo,()) # __foo release lock
__foo_lock.acquire() # make sure threading is activated
import _ca
# version from _ca314.cpp
version=_ca.version
revision=_ca.release
# some constants for EPICS channel Access library
from cadefs import *
from caError import *
#export pend_xxx routines for global operation
pendio =_ca.pendio
pend_io=_ca.pendio
pend_event=_ca.pend_event
poll=_ca.poll
poll_event=_ca.poll
flush_io=_ca.flush
flush=_ca.flush
test_io=_ca.test_io # test_io retunrs 42 for IODONE , 43 for IOINPROGRESS
add_fd_registration=_ca.add_fd_registration
#Error Object
error=_ca.error
shutdown=_ca.__ca_task_exit
#private dictionary for Get/Put functions
__ca_dict={}
__ca_dict_lock=thread.allocate_lock()
_channel__debug=False
class channel:
"""
a channel object for EPICS Channel Access.
It does not have direct connection
to channel object in C-library for EPICS Channel Access.
for creation just supply channel name to connect
"""
dbr_types=(
DBR_NATIVE, # default type
DBR_STRING, DBR_CHAR, DBR_FLOAT,
DBR_SHORT, #/* same as DBR_INT */
DBR_ENUM, DBR_LONG, DBR_DOUBLE,
DBR_TIME_STRING, DBR_TIME_CHAR, DBR_TIME_FLOAT,
DBR_TIME_SHORT, #:/* same as DBR_TIME_INT */
DBR_TIME_ENUM, DBR_TIME_LONG, DBR_TIME_DOUBLE,
DBR_CTRL_CHAR, DBR_CTRL_LONG,
DBR_CTRL_ENUM, DBR_CTRL_DOUBLE
)
def __init__(self, name, cb=None,noflush=None):
if (not cb) : cb=self.update_info
if name == "":
raise ValueError(name)
self.name=name
self.field_type = None
self.element_count = None
self.puser = None
self.conn_state = -1
self.hostname = None
self.raccess = None
self.waccess = None
self.sevr=None
self.ts=None
self.status=None
self.evid=[]
self.autoEvid=None
self.__callbacks={}
self.cbstate=None
self.updated=False
self.val=None
self.chid=_ca.search(name,cb)
if not noflush:
self.flush()
def clear(self):
if self.chid:
self.clear_monitor()
self.flush()
_ca.clear(self.chid)
self.flush()
self.chid=None
def __del__(self):
self.clear()
def wait_conn(self, wait=20, dt=0.05):
n=0
self.pend_event(dt)
self.update_info()
self.poll()
while (not self.isConnected()):
self.update_info()
self.pend_event(dt)
n=n+1
if (n > wait ) :
raise ECA_BADCHID("%s %d"%(self.name,n))
return -1
def get(self,cb=None,Type=DBR_NATIVE, count=0, type=DBR_NATIVE, type_=DBR_NATIVE):
try:
if not self.isConnected():
raise ECA_BADCHID(self.name)
except:
raise ECA_BADCHID(self.name)
if (Type == DBR_NATIVE):
if not(type == DBR_NATIVE):
Type=type
elif not(type_ == DBR_NATIVE):
Type=type_
rType=max(Type,type,type_)
if rType not in self.dbr_types:
raise TypeError(rType)
if not cb: cb=self.update_val
self.cbstate=None
self.updated=False
try:
_ca.get(self.chid, cb, Type, count)
finally:
pass
def put(self,*val,**kw):
"""
channel.put(valu) will put scalar value to channel. You may need to call channel.flush()
"""
if( val == ()):
print("No value(s) to put")
else:
if kw.has_key('cb'):
cb=kw['cb']
else:
cb=None
#self.__lock.acquire()
try:
_ca.put(self.chid, val, self.val, cb, DBR_NATIVE)
finally:
#self.__lock.release()
pass
def put_and_notify(self,*val,**kw):
if kw.has_key('cb'):
cb=kw['cb']
else:
cb=None # ca_put_array_callback does not return value.
if( val == ()):
print("No value(s) to put")
else:
#self.__lock.acquire()
try:
_ca.put(self.chid,val,self.val,cb, DBR_NATIVE)
finally:
#self.__lock.release()
pass
def monitor(self,callback=None,count=0,evmask=(DBE_VALUE|DBE_ALARM)):
if(not callback):
raise PyCa_NoCallback
if (self.conn_state != 2):
#print self.name,self.get_info()
raise ECA_BADCHID(self.name)
self.update_info()
if (self.field_type == DBR_NATIVE):
#print self.name,self.get_info()
raise ECA_BADTYPE(self.name)
self.evid.append(_ca.monitor(self.chid,callback,count,evmask))
self.__callbacks[self.evid[-1]]=callback
return self.evid[-1]
def __clear_event(self,evid):
if(_channel__debug): print("clearing evid:",evid)
_ca.clear_monitor(evid)
del self.__callbacks[evid]
def clear_monitor(self,evid=None):
if(evid):
if ( evid in self.evid):
self.__clear_event(evid)
i=self.evid.index(evid)
del self.evid[i]
i=self.evid.index(evid)
del self.evid[i]
else:
for evid in self.evid:
self.__clear_event(evid)
self.evid=[]
def autoUpdate(self):
if self.autoEvid == None:
self.monitor(self.update_val)
self.autoEvid=self.evid[-1]
self.flush()
def clearAutoUpdate(self):
if self.autoEvid is not None:
self.clear_monitor(self.autoEvid)
self.autoEvid=None
self.flush()
def pendio(self,tmo=0.001):
v=_ca.pendio(float(tmo))
return v
def pend_io(self,tmo=0.001):
v=_ca.pendio(float(tmo))
return v
def pend_event(self,tmo=0.001):
v=_ca.pend_event(float(tmo))
return v
def poll(self):
_ca.poll()
def flush(self,wait=0.001):
v=_ca.flush(wait)
return v
def update_val(self,valstat=None):
if valstat ==None:
raise caError("No value")
#self.__lock.acquire()
try:
self.val=valstat[0]
self.sevr=valstat[1]
self.status=valstat[2]
self.cbstate=1
try:
self.ts=valstat[3]
except:
pass
try:
self.ctrl=valstat[4]
except:
pass
finally:
#self.__lock.release()
self.updated=True
pass
def clear_cbstate(self):
#self.__lock.acquire()
self.cbstate=None
#self.__lock.release()
def state(self):
self.get_info()
return (self.conn_state - ch_state.cs_conn)
def isNeverConnected(self):
self.get_info()
return (self.conn_state == ch_state.cs_never_conn)
def isConnected(self):
self.get_info()
return (self.conn_state == ch_state.cs_conn)
def isPreviouslyConnected(self):
self.get_info()
return (self.conn_state == ch_state.cs_prev_conn)
def isDisonnected(self):
self.get_info()
return (self.conn_state == ch_state.cs_prev_conn)
def isClosed(self):
self.get_info()
return (self.conn_state == ch_state.cs_closed)
def get_info(self):
"""
update channel status information. return channel staus as a tuple.
"""
#self.__lock.acquire()
try:
info=(self.field_type, self.element_count, self.puser,
self.conn_state, self.hostname, self.raccess,
self.waccess) = _ca.ch_info(self.chid)
finally:
#self.__lock.release()
pass
return info
def update_info(self):
"""
Just update channel status information. No return value.
"""
self.get_info()
def fileno(self):
"""returns socket number used to connect.Scoket id is shared by
channels which are connected to the same IOC.
It became obsolete in EPICS 3.14 version of Python-CA.
You need to use fd_register function. But you may not need it anyway in multi-thread environment.
"""
return _ca.fileno(self.chid)
# convenient functions
# you need to call Clear() function before stopping Python, otherwise it cause coredump. 2009/2/11 NY
def __Ch(name,tmo=0.01):
if (type(name) == type("")):
if (__ca_dict.has_key(name)):
ch=__ca_dict[name]
else:
try:
ch=channel(name)
ch.wait_conn()
except:
raise ECA_BADCHID(name)
tmo=20*tmo
__ca_dict_lock.acquire()
try:
__ca_dict[name]=ch
finally:
__ca_dict_lock.release()
if( ch.state() != 0):
ch.wait_conn(10)
return ch
else:
raise ECA_BADTYPE(name)
def Info(name = "",tmo=0.01):
"""
returns a tuple as channel information.
tuple format=(field_type, element_count, puser argument,
connection_status, hostname:port,
read access mode, write access mode)
"""
ch=__Ch(name,tmo=tmo)
return ch.get_info()
def ClearAll():
for name in __ca_dict.keys():
Clear(name)
# __ca_dict should be cleared before Stopping Python
atexit.register(ClearAll)
def Clear(name= ""):
if (type(name) == type("")):
__ca_dict_lock.acquire()
try:
if (__ca_dict.has_key(name)):
ch=__ca_dict[name]
del __ca_dict[name]
ch.clear()
del ch
else:
__ca_dict_lock.release()
raise ECA_BADTYPE(name)
finally:
__ca_dict_lock.release()
else:
raise ECA_BADTYPE(name)
def Get(name="",count=0,Type=DBR_NATIVE,tmo=0.01,maxtmo=3):
"""
Get value from a channel "name".
"""
ch=__Ch(name,tmo)
def CB(vals,ch=ch):
ch.update_val(vals)
ch.get(cb=CB,Type=Type,count=count)
ch.flush()
while not ch.updated:
time.sleep(tmo)
maxtmo -=tmo
if maxtmo <=0:
raise caError("No get response")
return ch.val
def Put_and_Notify(name,val=None,cb=None):
"""
Convenient function:Put_and_Notify
calls put_and_notify with callback.
If callback is None, then just put data to a channel.
"""
ch=__Ch(name,tmo=0.1)
ch.put_and_notify(val,cb=cb)
ch.flush()
return ch.val
# define synonym
Put=Put_and_Notify
def Put_and_Notify_Array(name,val,cb=None):
"""
put array test version : not tested with string arrays yet
2007.8.30 T. Matsumoto
"""
ch=__Ch(name,tmo=0.1)
apply(ch.put_and_notify,val,dict(cb=cb))
ch.flush()
return ch.val
# define synonym
Put_Array=Put_and_Notify_Array
def Monitor(name,cb=None,evmask=(DBE_VALUE|DBE_ALARM)):
ch=__Ch(name,tmo=0.1)
if not cb:
def myCB(val,ch=ch):
print(ch.name,":",val[0],val[1],val[2],TS2Ascii(val[3]))
else:
def myCB(val, ch=ch, cb=cb):
cb(ch,val)
ch.clear_monitor()
evid=ch.monitor(myCB,evmask=evmask)
ch.flush()
return evid
def ClearMonitor(name,evid=None):
ch=__Ch(name,tmo=0.1)
try:
ch.clear_monitor(evid)
return
except:
raise ECA_BADCHID(name)
#
def isIODone():
if _ca.test_io()== 42:
return 1
else:
return 0
#
# syncronus group class
# Author: N.Yamamoto
# Date: May 27.1999 (first version)
#
class SyncGroup:
def __init__(self):
self.gid=_ca.sg_create()
self.chs={}
def add(self, chs):
try:
for ch in chs:
if(not self.chs.has_key(ch)):
self.chs[ch]=0
except:
if(not self.chs.has_key(chs)):
self.chs[chs]=0
def test(self):
return _ca.sg_test(self.gid)
def reset(self):
return _ca.sg_reset(self.gid)
def wait(self,tmo=1.0):
return _ca.sg_block(self.gid,float(tmo))
def put(self,ch,*value,**kw):
if kw.has_key("Type"):
Type=kw["Type"]
else:
Type=DBR_NATIVE
if self.chs.has_key(ch):
self.chs[ch]=_ca.sg_put(self.gid, ch.chid,
self.chs[ch], value , Type)
def get(self,ch):
if self.chs.has_key(ch):
self.chs[ch]=_ca.sg_get(self.gid,
ch.chid, self.chs[ch])
else:
pass
def convert(self,ch):
if self.chs.has_key(ch):
val=_ca.ca_convert(ch.chid, self.chs[ch])
ch.update_val(val[0])
def GetAll(self,tmo=1.0):
for ch in self.chs.keys():
self.chs[ch]=_ca.sg_get(self.gid,
ch.chid, self.chs[ch])
st=_ca.sg_block(self.gid,tmo)
if st == 0:
for ch in self.chs.keys():
val=_ca.ca_convert(ch.chid,self.chs[ch])
ch.update_val(val[0])
else:
raise Exception("CA_SG time out")
# TimeStamp utilities
# time.gmtime(631152000.0)=(1990, 1, 1, 0, 0, 0, 0, 1, 0)
#
__EPICS_TS_EPOCH=631152000.0
def TS2Ascii(ts):
import math
tstr=time.ctime(ts+__EPICS_TS_EPOCH)
nsstr=".%03d"%(math.modf(ts + __EPICS_TS_EPOCH)[0]*1000)
return tstr[:-5]+nsstr+tstr[-5:]
def TS2time(ts):
return time.localtime(ts+__EPICS_TS_EPOCH)
def TS2UTC(ts):
return (ts+__EPICS_TS_EPOCH)