diff --git a/plugins/TestingList.java b/plugins/TestingList.java index d664fd9..2ea7fce 100644 --- a/plugins/TestingList.java +++ b/plugins/TestingList.java @@ -2087,9 +2087,9 @@ public class TestingList extends Panel { logger.log(Level.INFO, msg); System.out.println(msg); } - System.out.println("sTestID: " + sTestID); + //System.out.println("sTestID: " + sTestID); Object retargs = args2.put(sTestID, testArgs); - System.out.println("retargs: " + retargs); + //System.out.println("retargs: " + retargs); hTests = args2; iRet = 0; } catch (Exception ex) { @@ -2131,7 +2131,7 @@ public class TestingList extends Panel { } //animate the custom panel (if present) animateCustomPanel(sDeviceName); - System.out.println("sParallelizeCommand: " +sParallelizeCommand); + //System.out.println("sParallelizeCommand: " +sParallelizeCommand); //run test(s) Object ret = eval(sParallelizeCommand); //System.out.println(ret); diff --git a/script/local.py b/script/local.py index cfc7de1..2dfe470 100644 --- a/script/local.py +++ b/script/local.py @@ -23,9 +23,6 @@ class TestingTool: self.testPath = testPath self.testParams = testParams - a = 0x69 - print a - def getParam(self, paramName): """ get specific test parameter @@ -118,3 +115,120 @@ class TestingTool: self.log('Return message: ' + returnString) ret = [self.testPath, self.deviceName, returnString, testPassed] set_return( ret) + + + +# RPS-Test Classes +class Interlock: +""" + class for getting - setting interlock mode +""" + import binascii + from array import * + # mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5 + rps_mask1 = 0x4440 # mask for outputs 2-4 + rps_mask2 = 0x0004 # mask for output 5 + msg_default = bytearray() + + def __init__(self): + """ + Message composition: + 0x32 = 50 = byte count + 0x69 = 105 = transfer function + 0x99,0x88 = this application port number (swapped) + element 7 = message id + element 16 = io-func reset=2 read mode=0 write mode=1 + """ + self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0]) + + def _msgRead(): + self.msg_default[16] = 0 + return self.msg_default + + def _msgWrite(): + self.msg_default[16] = 1 + return self.msg_default + + def _msgReset(): + self.msg_default[16] = 2 + return self.msg_default + + def setInterlockMode(self, mode) + udp = UDPDatagram() + msg = self._msgWrite() + modebytes = array('B', mode) + msg.extend(modebytes) + #print binascii.hexlify(msg) + if udp.send(msg) == null: + return false + else: + return true + + def getInterlockMode(self) + udp = UDPDatagram() + msg = self._msgRead() + #print binascii.hexlify(msg) + rcv = udp.send(msg) + #print binascii.hexlify(rcv) + return rcv + + def masterReset(self) + udp = UDPDatagram() + msg = self._msgReset() + #print binascii.hexlify(msg) + if udp.send(msg) == null: + return false + else: + return true + +class UDPDatagram: +""" + class for receiving and sending a udp message +""" + import socket + + # the constructor + def __init__(self): + # communication with interlock server + self.serverPort = 0xBB1B # interlock server + self.serverName = 'PROMC1' # ball server + try: + self.sock = socket.socket(socket.AF_INET, # Internet + socket.SOCK_DGRAM) # UDP + except socket.error, msg: + print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] + sys.exit() + + def _sendUDP(self, message): + try: + # get the ip address and send + self.sock.sendto(message, (self.serverName, self.serverPort)) + except socket.error, msg: + print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] + sys.exit() + + def _listenInit(self): + try: + self.sock.bind('', self.ThisPort) + except socket.error, msg: + print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] + sys.exit() + print 'Socket bind complete. ' + + def _listen(self): + # now keep talking with the client + while 1: + # receive data from client (data, addr) + data, addr = self.sock.recvfrom(1024) + if not data: + print 'End of data or no data' + break + print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip() + + self.sock.close() + return data[28:] + + def send(self, message): + self._listenInit() + self._sendUDP(message) + return self._listen() \ No newline at end of file diff --git a/script/tests/devices/bis-BMA1/.config b/script/tests/devices/bis-BMA1/.config index 61bf864..3548ada 100644 --- a/script/tests/devices/bis-BMA1/.config +++ b/script/tests/devices/bis-BMA1/.config @@ -1,5 +1,5 @@ #Fri Oct 13 11:25:53 CEST 2017 name=bis-BMA1 tests=RPS Tests Betriebsmode -parameters=mode&"PXXF\:VAL\:2"&"ssp";ds&"2,IQCOM,$BMA1,1,DIA"&"vla"; +parameters=channel&"PXXF\:VAL\:2"&"ssp";ds&"2,IQCOM,$BMA1,1,DIA"&"vla"; description=rps section till BMA1 diff --git a/script/tests/tests.properties b/script/tests/tests.properties index d3937c5..ab6a64e 100644 --- a/script/tests/tests.properties +++ b/script/tests/tests.properties @@ -1,5 +1,5 @@ #TestingList for pshell: configuration properties -#Fri Oct 13 13:56:33 CEST 2017 +#Fri Oct 13 14:17:34 CEST 2017 customPanel= -showEnabledTestsOnly= +showEnabledTestsOnly=true listFilter=rps-try diff --git a/script/tests/tests/RPS Tests Betriebsmode/Betriebsmode/Betriebsmode.py b/script/tests/tests/RPS Tests Betriebsmode/Betriebsmode/Betriebsmode.py index 83a2c8c..95d54d7 100644 --- a/script/tests/tests/RPS Tests Betriebsmode/Betriebsmode/Betriebsmode.py +++ b/script/tests/tests/RPS Tests Betriebsmode/Betriebsmode/Betriebsmode.py @@ -72,8 +72,7 @@ def startTest(testName, DEVICE, params): test.printParams() # If present, use the parameters here below for your test script. # These parameters were automatically generated: you might need to change the casting. - mode = (test.getParam('mode')) ; expectedVal14 = (test.getParam('expectedVal14')) ; expectedVal58 = (test.getParam('expectedVal58')) ; setGetDelay = (test.getParam('setGetDelay')) ; - test.log("mode: " + mode + " expectedVal14: " +expectedVal14) + channel = (test.getParam('channel')) ; mode = (test.getParam('mode')) ; expectedVal14 = (test.getParam('expectedVal14')) ; expectedVal58 = (test.getParam('expectedVal58')) ; setGetDelay = (test.getParam('setGetDelay')) ; except: import traceback # test failed, write the report into the variables ret and success and send feedback: @@ -109,9 +108,7 @@ def startTest(testName, DEVICE, params): # now try with data from real device: this part will most probably fail: correct the PV names with existing ones. try: # set up connection to channels. "type" of data can be "d" (= double), "l" (= long). - pv_motor_msta = Channel(test.getDeviceName() + ':IST:2' , type='d') - pv_motor_val = Channel(test.getDeviceName() + ':IST:1' , type='d') - pv_motor_com = Channel(test.getDeviceName() + ':COM:2' , type='d') + rpsChannel = Channel(channel , type='d') except: import traceback # prepare return information: return text: @@ -135,9 +132,7 @@ def startTest(testName, DEVICE, params): scan.append([readback1], [readback1], [motor_msta, motor_val]) # Closing channels: all channels that were opened with Channel() must be closed before exit: - pv_motor_msta.close() - pv_motor_val.close() - pv_motor_com.close() + rpsChannel.close() # IMPORTANT: if the test was successful, write the report into the variables ret and success. # for example, write the following: diff --git a/script/tests/tests/sad/rightleft/rightleft.py b/script/tests/tests/sad/rightleft/rightleft.py index 0c3874c..32385f5 100644 --- a/script/tests/tests/sad/rightleft/rightleft.py +++ b/script/tests/tests/sad/rightleft/rightleft.py @@ -219,124 +219,3 @@ startTest(test, device, parameters) #### IF NEEDED, ADD YOUR FUNCTIONS HERE BELOW #### # Indent to end left # def yourCustomFunction: - - -class Interlock: -""" -class for getting - setting interlock mode -""" - from array import * - # mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5 - rps_beamline = 0 - rps_mode = 1 - rps_expected1 = 2 - rps_expected2 = 3 - rps_mask1 = 0x4440 # mask for outputs 2-4 - rps_mask2 = 0x0004 # mask for output 5 - msg_default = bytearray() - - def __init__(self): - """ - Message composition: - 0x32 = 50 = byte count - 0x69 = 105 = transfer function - 0x99,0x88 = this application port number (swapped) - element 7 = message id - element 16 = io-func reset=2 read mode=0 write mode=1 - """ - self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0]) - - def _msgRead(): - self.msg_default[16] = 0 - return self.msg_default - - def _msgWrite(): - self.msg_default[16] = 1 - return self.msg_default - - def _msgReset(): - self.msg_default[16] = 2 - return self.msg_default - - def setInterlockMode(self, mode) - udp = UDPDatagram() - msg = self._msgWrite() - modebytes = array('B', mode) - msg.extend(modebytes) - #import binascii - #print binascii.hexlify(msg) - if udp.send(msg) == null: - return false - else: - return true - - def getInterlockMode(self) - udp = UDPDatagram() - msg = self._msgRead() - #import binascii - #print binascii.hexlify(msg) - rcv = udp.send(msg) - #print binascii.hexlify(rcv) - return rcv - - def masterReset(self) - udp = UDPDatagram() - msg = self._msgReset() - #import binascii - #print binascii.hexlify(msg) - if udp.send(msg) == null: - return false - else: - return true - -class UDPDatagram: -""" -class for receiving and sending a udp message -""" - import socket - - # the constructor - def __init__(self): - # communication with interlock server - self.serverPort = 0xBB1B # interlock server - self.serverName = 'PROMC1' # ball server - try: - self.sock = socket.socket(socket.AF_INET, # Internet - socket.SOCK_DGRAM) # UDP - except socket.error, msg: - print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] - sys.exit() - - def _sendUDP(self, message): - try: - # get the ip address and send - self.sock.sendto(message, (self.serverName, self.serverPort)) - except socket.error, msg: - print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] - sys.exit() - - def _listenInit(self): - try: - self.sock.bind('', self.ThisPort) - except socket.error, msg: - print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] - sys.exit() - print 'Socket bind complete. ' - - def _listen(self): - # now keep talking with the client - while 1: - # receive data from client (data, addr) - data, addr = self.sock.recvfrom(1024) - if not data: - print 'End of data or no data' - break - print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip() - - self.sock.close() - return data[28:] - - def send(self, message): - self._listenInit() - self._sendUDP(message) - return self._listen() \ No newline at end of file