diff --git a/script/local.py b/script/local.py index fb0d142..ef7515a 100644 --- a/script/local.py +++ b/script/local.py @@ -125,3 +125,127 @@ class TestingTool: ret = [self.testPath, self.deviceName, returnString, testPassed] set_return( ret) + + +# RPS-Test Classes +class Interlock: + """ + class for getting - setting interlock mode + """ + import binascii + from array import * + # mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5 + rps_mask1 = 0x4440 # mask for outputs 2-4 + rps_mask2 = 0x0004 # mask for output 5 + msg_default = bytearray() + + def __init__(self): + """ + Message composition at position: + 0: 0x32 = 50 = byte count + 2: 0x69 = 105 = transfer function + 4,5: 0x99,0x88 = this applications port number (swapped) + 7: 1 = message id + 16: io-func: reset=2; read mode=0; write mode=1 + """ + self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0]) + + def _msgRead(): + self.msg_default[16] = 0 + return self.msg_default + + def _msgWrite(): + self.msg_default[16] = 1 + return self.msg_default + + def _msgReset(): + self.msg_default[16] = 2 + return self.msg_default + + def setInterlockMode(self, mode): + udp = UDPDatagram() + msg = self._msgWrite() + modebytes = array('B', mode) + msg.extend(modebytes) + #print binascii.hexlify(msg) + if udp.send(msg) == null: + return false + else: + return true + + def getInterlockMode(self): + udp = UDPDatagram() + msg = self._msgRead() + #print binascii.hexlify(msg) + rcv = udp.send(msg) + #print binascii.hexlify(rcv) + return rcv + + def masterReset(self): + udp = UDPDatagram() + msg = self._msgReset() + #print binascii.hexlify(msg) + if udp.send(msg) == null: + return false + else: + return true + +class UDPDatagram: + """ + class for receiving and sending a udp message + """ + import socket + + # the constructor + def __init__(self): + # communication with interlock server + self.serverPort = 0xBB1B # interlock server + self.serverName = 'PROMC1' # ball server + try: + print 'here the socket should be open' + #self.sock = socket.socket(socket.AF_INET, # Internet + socket.SOCK_DGRAM) # UDP + #except socket.error, msg: + except msg: + print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] + sys.exit() + + def _sendUDP(self, message): + try: + # get the ip address and send + print 'here should sendto:', self.serverName, self.serverPort + print 'message: ', message + #self.sock.sendto(message, (self.serverName, self.serverPort)) + #except socket.error, msg: + except msg: + print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] + sys.exit() + + def _listenInit(self): + try: + 'here should bind to socket' + #self.sock.bind('', self.ThisPort) + #except socket.error, msg: + except msg: + print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] + sys.exit() + print 'Socket bind complete. ' + + def _listen(self): + # now keep talking with the client + while 1: + # receive data from client (data, addr) + print 'here should receive data' + #data, addr = self.sock.recvfrom(1024) + if not data: + print 'End of data or no data' + break + print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip() + + #self.sock.close() + return data[28:] + + def send(self, message): + self._listenInit() + self._sendUDP(message) + return self._listen() \ No newline at end of file diff --git a/script/tests/tests.properties b/script/tests/tests.properties index a96b42b..b2fdc28 100644 --- a/script/tests/tests.properties +++ b/script/tests/tests.properties @@ -1,5 +1,5 @@ #TestingList for pshell: configuration properties -#Tue Oct 17 14:44:47 CEST 2017 +#Tue Oct 17 14:46:39 CEST 2017 customPanel= showEnabledTestsOnly=true listFilter=rps-test diff --git a/script/tests/tests/RPS Tests Betriebsmode/Betriebsmode/Betriebsmode.py b/script/tests/tests/RPS Tests Betriebsmode/Betriebsmode/Betriebsmode.py index 7c3daf1..b560491 100644 --- a/script/tests/tests/RPS Tests Betriebsmode/Betriebsmode/Betriebsmode.py +++ b/script/tests/tests/RPS Tests Betriebsmode/Betriebsmode/Betriebsmode.py @@ -14,7 +14,6 @@ def startTest(testName, DEVICE, params): # by default, assume the test failed: ret = 'Test failed' status = False - test = TestingTool() # put the whole custom code under try/catch. try: # get the path of this script: @@ -85,9 +84,7 @@ def startTest(testName, DEVICE, params): # now try with data from real device: this part will most probably fail: correct the PV names with existing ones. try: - ilk = Interlock() - ilk.masterReset() - ilk.setInterlockMode(mode) + # set up connection to channels. "type" of data can be "d" (= double), "l" (= long). rpsChannel = Channel(channel , type='d') except: