################################################################################################### # Deployment specific global definitions - executed after startup.py ################################################################################################### #Uncomment this line to create the simulated devices needed to the tutorial scripts. #run("tutorial/devices") # The code below is necessary to run the Testing List plug-in: global print_log, sendFeedback, sys, inspect, os, traceback import sys, inspect, os, traceback class TestingTool: """ Common tools for running the test """ def __init__(self, testName, testPath, deviceName, testParams): """ Init class with test data """ self.deviceName = deviceName self.testName = testName self.testPath = testPath self.testParams = testParams print('\n') self.log('='*80) def getParam(self, paramName): """ get specific test parameter paramName = the name of the parameter for which the value must be read """ try: return self.testParams[paramName]["value"] except: self.log('Could not retrieve testing parameter ' + paramName + ' - Details: ' + traceback.format_exc()) return None def getName(self): """ get test name """ return self.testName def getPlotName(self): """ get test plot name """ return self.deviceName + ' - ' + self.testName def getDeviceName(self): """ get device name """ return self.deviceName def getPath(self): """ get test path """ return self.testPath def log(self, text, severity=0): """ Print/log information text = the string to be printed/logged severity = 0:Info, 1:Warning, 2:Error """ import time time.ctime() now = time.strftime('%Y.%m.%d %H:%M:%S') sev = ['INF','WAR','ERR'] logText = sev[severity] + ' - ' + self.deviceName + ' ' + self.testName + ': ' + str(text) print now + ' ' + logText try: log(logText) except: #cannot write log. maybe busy? Wait and retry time.sleep(1) try: log(logText) except: #cannot write log print now + ' ' + sev[2] + ' - ' + self.deviceName + ' ' + self.testName + ': ' + 'cannot write Log in Data' def printParams(self): """ Print/log the pshell parameters params = the JSON object with the parameters passed by pshell """ columnsFormat = '{0:17} {1:>20} {2}' self.log('-'*80) self.log(columnsFormat.format("Parameter", "Value", "Description")) self.log('-'*80) for key in self.testParams.keys(): self.log(columnsFormat.format(key, self.testParams[key]['value'], self.testParams[key]['description'])) self.log('-'*80) def sendFeedback(self, returnString, testPassed): """ Prepare and send feedback to calling tool returnString = the string containing info on the test result testPassed = true if the test was successful, false if test had a problem """ self.log('End of test. Result:') #self.log('Device: ' + self.deviceName) #self.log('Test name: ' + self.testName) #self.log('Test path: ' + self.testPath) severity = 0 if testPassed: self.log('**** TEST PASSED ****',severity) else: severity = 2 self.log('**** TEST FAILED ****',severity) if returnString != '': self.log(returnString,severity) ret = [self.testPath, self.deviceName, returnString, testPassed] set_return( ret) # RPS-Test Classes class Interlock: """ class for getting - setting interlock mode """ from array import * # mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5 rps_mask1 = 0x4440 # mask for outputs 2-4 rps_mask2 = 0x0004 # mask for output 5 msg_default = bytearray() def __init__(self): """ Message composition at position: 0: 0x32 = 50 = byte count 2: 0x69 = 105 = transfer function 4,5: 0x99,0x88 = this applications port number (swapped) 7: 1 = message id 16: io-func: reset=2; read mode=0; write mode=1 """ self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0]) def _msgRead(self): self.msg_default[16] = 0 return self.msg_default def _msgWrite(self): self.msg_default[16] = 1 return self.msg_default def _msgReset(self): self.msg_default[16] = 2 return self.msg_default def setInterlockMode(self, mode): import binascii udp = UDPDatagram() msg = self._msgWrite() msg.extend(array('B', mode)) print 'setInterlockMode', binascii.hexlify(msg) if not udp.send(msg): return False else: return True def getInterlockMode(self): import binascii udp = UDPDatagram() msg = self._msgRead() print 'getInterlockMode', binascii.hexlify(msg) rcv = udp.send(msg) #print 'getInterlockMode', binascii.hexlify(rcv) return rcv def masterReset(self): import binascii udp = UDPDatagram() msg = self._msgReset() print 'masterReset', binascii.hexlify(msg) if not udp.send(msg): return False else: return True class UDPDatagram: """ class for receiving and sending a udp message """ import socket def __init__(self): # communication with interlock server self.serverPort = 0xBB1B # interlock server self.serverName = 'PROMC1' # ball server try: print 'here the socket should be open' #self.sock = socket.socket(socket.AF_INET, # Internet # socket.SOCK_DGRAM) # UDP #except socket.error, msg: except : #print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] sys.exit() def _sendUDP(self, message): import binascii try: # get the ip address and send print 'here should sendto:', self.serverName, self.serverPort print 'message: ', binascii.hexlify(message) #self.sock.sendto(message, (self.serverName, self.serverPort)) #except socket.error, msg: except : #print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] sys.exit() def _listenInit(self): try: 'here should bind to socket' #self.sock.bind('', self.ThisPort) #except socket.error, msg: except: #print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1] sys.exit() print 'Socket bind complete. ' def _listen(self): # now keep talking with the client print 'here should receive data' #while 1: while 0: # receive data from client (data, addr) #data, addr = self.sock.recvfrom(1024) if not data: print 'End of data or no data' break print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip() #self.sock.close() #return data[28:] return [9]*50 def send(self, message): self._listenInit() self._sendUDP(message) return self._listen()