Files
dev/script/test/DeviceChannel.py
2018-01-19 10:56:53 +01:00

120 lines
4.1 KiB
Python
Executable File

class Channel(java.beans.PropertyChangeListener, Writable, Readable, DeviceBase):
def __init__(self, channel_name, type = None, size = None, callback=None, alias = None):
""" Create an object that encapsulates an Epics PV connection.
Args:
name(str): value to be written
type(str, optional): type of PV. By default gets the PV standard field type.
Scalar values: 'b', 'i', 'l', 'd', 's'.
Array: values: '[b', '[i,', '[l', '[d', '[s'.
size(int, optional): the size of the channel
callback(function, optional): The monitor callback.
alias(str): name to be used on scans.
"""
DeviceBase.__init__(self, channel_name if (alias is None) else alias)
self.channel = create_channel(channel_name, type, size)
self.callback = callback
if alias is not None:
set_device_alias(self, alias)
else:
set_device_alias(self, channel_name)
self.initialize()
def get_name(self):
"""Return the name of the channel.
"""
return self.channel.name
def get_size(self):
"""Return the size of the channel.
"""
return self.channel.size
def set_size(self, size):
"""Set the size of the channel.
"""
self.channel.size = size
def is_connected(self):
"""Return True if channel is connected.
"""
return self.channel.connected
def is_monitored(self):
"""Return True if channel is monitored
"""
return self.channel.monitored
def set_monitored(self, value):
"""Set a channel monitor to trigger the callback function defined in the constructor.
"""
self.monitored = value
def doSetMonitored(self, value):
self.channel.monitored = value
if (value):
self.channel.addPropertyChangeListener(self)
else:
self.channel.removePropertyChangeListener(self)
def propertyChange(self, pce):
if pce.getPropertyName() == "value":
val = pce.getNewValue()
self.setCache(val, None)
if self.callback is not None:
self.callback(val)
def put(self, value, timeout=None):
"""Write to channel and wait value change. In the case of a timeout throws a TimeoutException.
Args:
value(obj): value to be written
timeout(float, optional): timeout in seconds. If none waits forever.
"""
if (timeout==None):
self.channel.setValue(value)
else:
self.channel.setValueAsync(value).get(int(timeout*1000), java.util.concurrent.TimeUnit.MILLISECONDS);
def putq(self, value):
"""Write to channel and don't wait.
"""
self.channel.setValueNoWait(value)
def get(self, force = False):
"""Get channel value.
"""
val = self.channel.getValue(force)
self.setCache(val, None)
return val
def wait_for_value(self, value, timeout=None, comparator=None):
"""Wait channel to reach a value, using a given comparator. In the case of a timeout throws a TimeoutException.
Args:
value(obj): value to be verified.
timeout(float, optional): timeout in seconds. If None waits forever.
comparator (java.util.Comparator, optional). If None, uses Object.equals.
"""
if comparator is None:
if timeout is None:
self.channel.waitForValue(value)
else:
self.channel.waitForValue(value, int(timeout*1000))
else:
if timeout is None:
self.channel.waitForValue(value, comparator)
else:
self.channel.waitForValue(value, comparator, int(timeout*1000))
def close(self):
"""Close the channel.
"""
self.channel.destroy()
DeviceBase.close(self)
#Writable interface
def write(self, value):
self.put(value)
#Readable interface
def read(self):
return self.get()