Files
x07mb/script/test/TestSmplDone3.py
2022-03-04 14:28:28 +01:00

144 lines
4.5 KiB
Python

import java.lang.System as System
class Channel2(java.beans.PropertyChangeListener, Writable, Readable):
def __init__(self, channel_name, type = None, size = None, callback=None, alias = None, monitored=None):
""" Create an object that encapsulates an Epics PV connection.
Args:
channel_name(str):name of the channel
type(str, optional): type of PV. By default gets the PV standard field type.
Scalar values: 'b', 'i', 'l', 'd', 's'.
Array values: '[b', '[i,', '[l', '[d', '[s'.
size(int, optional): the size of the channel
callback(function, optional): The monitor callback.
alias(str): name to be used on scans.
"""
#System.gc()
self.channel = create_channel(channel_name, type, size)
self.callback = callback
self._alias = alias
if monitored is not None: self.set_monitored(monitored)
#FinalizeTrigger.ensureFinalizer(self)
def get_channel_name(self):
"""Return the name of the channel.
"""
return self.channel.name
def get_size(self):
"""Return the size of the channel.
"""
return self.channel.size
def set_size(self, size):
"""Set the size of the channel.
"""
self.channel.size = size
def is_connected(self):
"""Return True if channel is connected.
"""
return self.channel.connected
def is_monitored(self):
"""Return True if channel is monitored
"""
return self.channel.monitored
def set_monitored(self, value):
"""Set a channel monitor to trigger the callback function defined in the constructor.
"""
self.channel.monitored = value
if (value):
self.channel.addPropertyChangeListener(self)
else:
self.channel.removePropertyChangeListener(self)
def propertyChange(self, pce):
if pce.getPropertyName() == "value":
if self.callback is not None:
self.callback(pce.getNewValue())
def put(self, value, timeout=None):
"""Write to channel and wait value change. In the case of a timeout throws a TimeoutException.
Args:
value(obj): value to be written
timeout(float, optional): timeout in seconds. If none waits forever.
"""
if (timeout==None):
self.channel.setValue(value)
else:
self.channel.setValueAsync(value).get(int(timeout*1000), java.util.concurrent.TimeUnit.MILLISECONDS);
def putq(self, value):
"""Write to channel and don't wait.
"""
self.channel.setValueNoWait(value)
def get(self, force = False):
"""Get channel value.
"""
return self.channel.getValue(force)
def wait_for_value(self, value, timeout=None, comparator=None):
"""Wait channel to reach a value, using a given comparator. In the case of a timeout throws a TimeoutException.
Args:
value(obj): value to be verified.
timeout(float, optional): timeout in seconds. If None waits forever.
comparator (java.util.Comparator, optional). If None, uses Object.equals.
"""
if comparator is None:
if timeout is None:
self.channel.waitForValue(value)
else:
self.channel.waitForValue(value, int(timeout*1000))
else:
if timeout is None:
self.channel.waitForValue(value, comparator)
else:
self.channel.waitForValue(value, comparator, int(timeout*1000))
def close(self):
"""Close the channel.
"""
Epics.closeChannel(self.channel)
def getName(self):
return self.get_channel_name()
def setAlias(self, alias):
self._alias = alias
def getAlias(self):
return self._alias if self._alias else self.getName()
def write(self, value):
self.put(value)
def read(self):
return self.get()
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
#def finalize(self): pass #TODO: if not overriden, __del__ is not called!!!
#def __del__(self):
# self.close()
import sys
name = 'X07MB-OP2:SMPL-DONE'
#name = 'X07MB-OP2-SAI_07:SUM'
try:
for i in range(1000):
print i
c=Channel2(name)
for j in range(200):
c.get()
time.sleep(0.00)
except:
print sys.exc_info()
finally:
print i,j