This commit is contained in:
boccioli_m
2017-10-17 14:46:53 +02:00
parent d4c87d8039
commit 3985d4a76e
3 changed files with 126 additions and 5 deletions

View File

@@ -125,3 +125,127 @@ class TestingTool:
ret = [self.testPath, self.deviceName, returnString, testPassed]
set_return( ret)
# RPS-Test Classes
class Interlock:
"""
class for getting - setting interlock mode
"""
import binascii
from array import *
# mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5
rps_mask1 = 0x4440 # mask for outputs 2-4
rps_mask2 = 0x0004 # mask for output 5
msg_default = bytearray()
def __init__(self):
"""
Message composition at position:
0: 0x32 = 50 = byte count
2: 0x69 = 105 = transfer function
4,5: 0x99,0x88 = this applications port number (swapped)
7: 1 = message id
16: io-func: reset=2; read mode=0; write mode=1
"""
self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0])
def _msgRead():
self.msg_default[16] = 0
return self.msg_default
def _msgWrite():
self.msg_default[16] = 1
return self.msg_default
def _msgReset():
self.msg_default[16] = 2
return self.msg_default
def setInterlockMode(self, mode):
udp = UDPDatagram()
msg = self._msgWrite()
modebytes = array('B', mode)
msg.extend(modebytes)
#print binascii.hexlify(msg)
if udp.send(msg) == null:
return false
else:
return true
def getInterlockMode(self):
udp = UDPDatagram()
msg = self._msgRead()
#print binascii.hexlify(msg)
rcv = udp.send(msg)
#print binascii.hexlify(rcv)
return rcv
def masterReset(self):
udp = UDPDatagram()
msg = self._msgReset()
#print binascii.hexlify(msg)
if udp.send(msg) == null:
return false
else:
return true
class UDPDatagram:
"""
class for receiving and sending a udp message
"""
import socket
# the constructor
def __init__(self):
# communication with interlock server
self.serverPort = 0xBB1B # interlock server
self.serverName = 'PROMC1' # ball server
try:
print 'here the socket should be open'
#self.sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
#except socket.error, msg:
except msg:
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
def _sendUDP(self, message):
try:
# get the ip address and send
print 'here should sendto:', self.serverName, self.serverPort
print 'message: ', message
#self.sock.sendto(message, (self.serverName, self.serverPort))
#except socket.error, msg:
except msg:
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
def _listenInit(self):
try:
'here should bind to socket'
#self.sock.bind('', self.ThisPort)
#except socket.error, msg:
except msg:
print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
print 'Socket bind complete. '
def _listen(self):
# now keep talking with the client
while 1:
# receive data from client (data, addr)
print 'here should receive data'
#data, addr = self.sock.recvfrom(1024)
if not data:
print 'End of data or no data'
break
print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip()
#self.sock.close()
return data[28:]
def send(self, message):
self._listenInit()
self._sendUDP(message)
return self._listen()

View File

@@ -1,5 +1,5 @@
#TestingList for pshell: configuration properties
#Tue Oct 17 14:44:47 CEST 2017
#Tue Oct 17 14:46:39 CEST 2017
customPanel=
showEnabledTestsOnly=true
listFilter=rps-test

View File

@@ -14,7 +14,6 @@ def startTest(testName, DEVICE, params):
# by default, assume the test failed:
ret = 'Test failed'
status = False
test = TestingTool()
# put the whole custom code under try/catch.
try:
# get the path of this script:
@@ -85,9 +84,7 @@ def startTest(testName, DEVICE, params):
# now try with data from real device: this part will most probably fail: correct the PV names with existing ones.
try:
ilk = Interlock()
ilk.masterReset()
ilk.setInterlockMode(mode)
# set up connection to channels. "type" of data can be "d" (= double), "l" (= long).
rpsChannel = Channel(channel , type='d')
except: