Files
ncs/script/local.py
boccioli_m fd2df928b2 Closedown
2017-10-20 10:33:13 +02:00

256 lines
8.2 KiB
Python

###################################################################################################
# Deployment specific global definitions - executed after startup.py
###################################################################################################
#Uncomment this line to create the simulated devices needed to the tutorial scripts.
#run("tutorial/devices")
# The code below is necessary to run the Testing List plug-in:
global print_log, sendFeedback, sys, inspect, os, traceback
import sys, inspect, os, traceback
class TestingTool:
"""
Common tools for running the test
"""
def __init__(self, testName, testPath, deviceName, testParams):
"""
Init class with test data
"""
self.deviceName = deviceName
self.testName = testName
self.testPath = testPath
self.testParams = testParams
print('\n')
self.log('='*80)
def getParam(self, paramName):
"""
get specific test parameter
paramName = the name of the parameter for which the value must be read
"""
try:
return self.testParams[paramName]["value"]
except:
self.log('Could not retrieve testing parameter ' + paramName + ' - Details: ' + traceback.format_exc())
return None
def getName(self):
"""
get test name
"""
return self.testName
def getPlotName(self):
"""
get test plot name
"""
return self.deviceName + ' - ' + self.testName
def getDeviceName(self):
"""
get device name
"""
return self.deviceName
def getPath(self):
"""
get test path
"""
return self.testPath
def log(self, text, severity=0):
"""
Print/log information
text = the string to be printed/logged
severity = 0:Info, 1:Warning, 2:Error
"""
import time
time.ctime()
now = time.strftime('%Y.%m.%d %H:%M:%S')
sev = ['INF','WAR','ERR']
logText = sev[severity] + ' - ' + self.deviceName + ' ' + self.testName + ': ' + str(text)
print now + ' ' + logText
try:
log(logText)
except:
#cannot write log. maybe busy? Wait and retry
time.sleep(1)
try:
log(logText)
except:
#cannot write log
print now + ' ' + sev[2] + ' - ' + self.deviceName + ' ' + self.testName + ': ' + 'cannot write Log in Data'
def printParams(self):
"""
Print/log the pshell parameters
params = the JSON object with the parameters passed by pshell
"""
columnsFormat = '{0:17} {1:>20} {2}'
self.log('-'*80)
self.log(columnsFormat.format("Parameter", "Value", "Description"))
self.log('-'*80)
for key in self.testParams.keys():
self.log(columnsFormat.format(key,
self.testParams[key]['value'],
self.testParams[key]['description']))
self.log('-'*80)
def sendFeedback(self, returnString, testPassed):
"""
Prepare and send feedback to calling tool
returnString = the string containing info on the test result
testPassed = true if the test was successful, false if test had a problem
"""
self.log('End of test. Result:')
#self.log('Device: ' + self.deviceName)
#self.log('Test name: ' + self.testName)
#self.log('Test path: ' + self.testPath)
severity = 0
if testPassed:
self.log('**** TEST PASSED ****',severity)
else:
severity = 2
self.log('**** TEST FAILED ****',severity)
if returnString != '':
self.log(returnString,severity)
ret = [self.testPath, self.deviceName, returnString, testPassed]
set_return( ret)
# RPS-Test Classes
class Interlock:
"""
class for getting - setting interlock mode
"""
from array import *
# mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5
rps_mask1 = 0x4440 # mask for outputs 2-4
rps_mask2 = 0x0004 # mask for output 5
msg_default = bytearray()
def __init__(self):
"""
Message composition at position:
0: 0x32 = 50 = byte count
2: 0x69 = 105 = transfer function
4,5: 0x99,0x88 = this applications port number (swapped)
7: 1 = message id
16: io-func: reset=2; read mode=0; write mode=1
"""
self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0])
def _msgRead(self):
self.msg_default[16] = 0
return self.msg_default
def _msgWrite(self):
self.msg_default[16] = 1
return self.msg_default
def _msgReset(self):
self.msg_default[16] = 2
return self.msg_default
def setInterlockMode(self, mode):
import binascii
udp = UDPDatagram()
msg = self._msgWrite()
msg.extend(array('B', mode))
print 'setInterlockMode', binascii.hexlify(msg)
if not udp.send(msg):
return False
else:
return True
def getInterlockMode(self):
import binascii
udp = UDPDatagram()
msg = self._msgRead()
print 'getInterlockMode', binascii.hexlify(msg)
rcv = udp.send(msg)
#print 'getInterlockMode', binascii.hexlify(rcv)
return rcv
def masterReset(self):
import binascii
udp = UDPDatagram()
msg = self._msgReset()
print 'masterReset', binascii.hexlify(msg)
if not udp.send(msg):
return False
else:
return True
class UDPDatagram:
"""
class for receiving and sending a udp message
"""
import socket
def __init__(self):
# communication with interlock server
self.serverPort = 0xBB1B # interlock server
self.serverName = 'PROMC1' # ball server
try:
print 'here the socket should be open'
#self.sock = socket.socket(socket.AF_INET, # Internet
# socket.SOCK_DGRAM) # UDP
#except socket.error, msg:
except :
#print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
def _sendUDP(self, message):
import binascii
try:
# get the ip address and send
print 'here should sendto:', self.serverName, self.serverPort
print 'message: ', binascii.hexlify(message)
#self.sock.sendto(message, (self.serverName, self.serverPort))
#except socket.error, msg:
except :
#print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
def _listenInit(self):
try:
'here should bind to socket'
#self.sock.bind('', self.ThisPort)
#except socket.error, msg:
except:
#print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
print 'Socket bind complete. '
def _listen(self):
# now keep talking with the client
print 'here should receive data'
#while 1:
while 0:
# receive data from client (data, addr)
#data, addr = self.sock.recvfrom(1024)
if not data:
print 'End of data or no data'
break
print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip()
#self.sock.close()
#return data[28:]
return [9]*50
def send(self, message):
self._listenInit()
self._sendUDP(message)
return self._listen()