Closedown
This commit is contained in:
@@ -2087,9 +2087,9 @@ public class TestingList extends Panel {
|
||||
logger.log(Level.INFO, msg);
|
||||
System.out.println(msg);
|
||||
}
|
||||
System.out.println("sTestID: " + sTestID);
|
||||
//System.out.println("sTestID: " + sTestID);
|
||||
Object retargs = args2.put(sTestID, testArgs);
|
||||
System.out.println("retargs: " + retargs);
|
||||
//System.out.println("retargs: " + retargs);
|
||||
hTests = args2;
|
||||
iRet = 0;
|
||||
} catch (Exception ex) {
|
||||
@@ -2131,7 +2131,7 @@ public class TestingList extends Panel {
|
||||
}
|
||||
//animate the custom panel (if present)
|
||||
animateCustomPanel(sDeviceName);
|
||||
System.out.println("sParallelizeCommand: " +sParallelizeCommand);
|
||||
//System.out.println("sParallelizeCommand: " +sParallelizeCommand);
|
||||
//run test(s)
|
||||
Object ret = eval(sParallelizeCommand);
|
||||
//System.out.println(ret);
|
||||
|
||||
120
script/local.py
120
script/local.py
@@ -23,9 +23,6 @@ class TestingTool:
|
||||
self.testPath = testPath
|
||||
self.testParams = testParams
|
||||
|
||||
a = 0x69
|
||||
print a
|
||||
|
||||
def getParam(self, paramName):
|
||||
"""
|
||||
get specific test parameter
|
||||
@@ -118,3 +115,120 @@ class TestingTool:
|
||||
self.log('Return message: ' + returnString)
|
||||
ret = [self.testPath, self.deviceName, returnString, testPassed]
|
||||
set_return( ret)
|
||||
|
||||
|
||||
|
||||
# RPS-Test Classes
|
||||
class Interlock:
|
||||
"""
|
||||
class for getting - setting interlock mode
|
||||
"""
|
||||
import binascii
|
||||
from array import *
|
||||
# mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5
|
||||
rps_mask1 = 0x4440 # mask for outputs 2-4
|
||||
rps_mask2 = 0x0004 # mask for output 5
|
||||
msg_default = bytearray()
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Message composition:
|
||||
0x32 = 50 = byte count
|
||||
0x69 = 105 = transfer function
|
||||
0x99,0x88 = this application port number (swapped)
|
||||
element 7 = message id
|
||||
element 16 = io-func reset=2 read mode=0 write mode=1
|
||||
"""
|
||||
self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0])
|
||||
|
||||
def _msgRead():
|
||||
self.msg_default[16] = 0
|
||||
return self.msg_default
|
||||
|
||||
def _msgWrite():
|
||||
self.msg_default[16] = 1
|
||||
return self.msg_default
|
||||
|
||||
def _msgReset():
|
||||
self.msg_default[16] = 2
|
||||
return self.msg_default
|
||||
|
||||
def setInterlockMode(self, mode)
|
||||
udp = UDPDatagram()
|
||||
msg = self._msgWrite()
|
||||
modebytes = array('B', mode)
|
||||
msg.extend(modebytes)
|
||||
#print binascii.hexlify(msg)
|
||||
if udp.send(msg) == null:
|
||||
return false
|
||||
else:
|
||||
return true
|
||||
|
||||
def getInterlockMode(self)
|
||||
udp = UDPDatagram()
|
||||
msg = self._msgRead()
|
||||
#print binascii.hexlify(msg)
|
||||
rcv = udp.send(msg)
|
||||
#print binascii.hexlify(rcv)
|
||||
return rcv
|
||||
|
||||
def masterReset(self)
|
||||
udp = UDPDatagram()
|
||||
msg = self._msgReset()
|
||||
#print binascii.hexlify(msg)
|
||||
if udp.send(msg) == null:
|
||||
return false
|
||||
else:
|
||||
return true
|
||||
|
||||
class UDPDatagram:
|
||||
"""
|
||||
class for receiving and sending a udp message
|
||||
"""
|
||||
import socket
|
||||
|
||||
# the constructor
|
||||
def __init__(self):
|
||||
# communication with interlock server
|
||||
self.serverPort = 0xBB1B # interlock server
|
||||
self.serverName = 'PROMC1' # ball server
|
||||
try:
|
||||
self.sock = socket.socket(socket.AF_INET, # Internet
|
||||
socket.SOCK_DGRAM) # UDP
|
||||
except socket.error, msg:
|
||||
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
||||
sys.exit()
|
||||
|
||||
def _sendUDP(self, message):
|
||||
try:
|
||||
# get the ip address and send
|
||||
self.sock.sendto(message, (self.serverName, self.serverPort))
|
||||
except socket.error, msg:
|
||||
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
||||
sys.exit()
|
||||
|
||||
def _listenInit(self):
|
||||
try:
|
||||
self.sock.bind('', self.ThisPort)
|
||||
except socket.error, msg:
|
||||
print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
||||
sys.exit()
|
||||
print 'Socket bind complete. '
|
||||
|
||||
def _listen(self):
|
||||
# now keep talking with the client
|
||||
while 1:
|
||||
# receive data from client (data, addr)
|
||||
data, addr = self.sock.recvfrom(1024)
|
||||
if not data:
|
||||
print 'End of data or no data'
|
||||
break
|
||||
print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip()
|
||||
|
||||
self.sock.close()
|
||||
return data[28:]
|
||||
|
||||
def send(self, message):
|
||||
self._listenInit()
|
||||
self._sendUDP(message)
|
||||
return self._listen()
|
||||
@@ -1,5 +1,5 @@
|
||||
#Fri Oct 13 11:25:53 CEST 2017
|
||||
name=bis-BMA1
|
||||
tests=RPS Tests Betriebsmode
|
||||
parameters=mode&"PXXF\:VAL\:2"&"ssp";ds&"2,IQCOM,$BMA1,1,DIA"&"vla";
|
||||
parameters=channel&"PXXF\:VAL\:2"&"ssp";ds&"2,IQCOM,$BMA1,1,DIA"&"vla";
|
||||
description=rps section till BMA1
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
#TestingList for pshell: configuration properties
|
||||
#Fri Oct 13 13:56:33 CEST 2017
|
||||
#Fri Oct 13 14:17:34 CEST 2017
|
||||
customPanel=
|
||||
showEnabledTestsOnly=
|
||||
showEnabledTestsOnly=true
|
||||
listFilter=rps-try
|
||||
|
||||
@@ -72,8 +72,7 @@ def startTest(testName, DEVICE, params):
|
||||
test.printParams()
|
||||
# If present, use the parameters here below for your test script.
|
||||
# These parameters were automatically generated: you might need to change the casting.
|
||||
mode = (test.getParam('mode')) ; expectedVal14 = (test.getParam('expectedVal14')) ; expectedVal58 = (test.getParam('expectedVal58')) ; setGetDelay = (test.getParam('setGetDelay')) ;
|
||||
test.log("mode: " + mode + " expectedVal14: " +expectedVal14)
|
||||
channel = (test.getParam('channel')) ; mode = (test.getParam('mode')) ; expectedVal14 = (test.getParam('expectedVal14')) ; expectedVal58 = (test.getParam('expectedVal58')) ; setGetDelay = (test.getParam('setGetDelay')) ;
|
||||
except:
|
||||
import traceback
|
||||
# test failed, write the report into the variables ret and success and send feedback:
|
||||
@@ -109,9 +108,7 @@ def startTest(testName, DEVICE, params):
|
||||
# now try with data from real device: this part will most probably fail: correct the PV names with existing ones.
|
||||
try:
|
||||
# set up connection to channels. "type" of data can be "d" (= double), "l" (= long).
|
||||
pv_motor_msta = Channel(test.getDeviceName() + ':IST:2' , type='d')
|
||||
pv_motor_val = Channel(test.getDeviceName() + ':IST:1' , type='d')
|
||||
pv_motor_com = Channel(test.getDeviceName() + ':COM:2' , type='d')
|
||||
rpsChannel = Channel(channel , type='d')
|
||||
except:
|
||||
import traceback
|
||||
# prepare return information: return text:
|
||||
@@ -135,9 +132,7 @@ def startTest(testName, DEVICE, params):
|
||||
scan.append([readback1], [readback1], [motor_msta, motor_val])
|
||||
|
||||
# Closing channels: all channels that were opened with Channel() must be closed before exit:
|
||||
pv_motor_msta.close()
|
||||
pv_motor_val.close()
|
||||
pv_motor_com.close()
|
||||
rpsChannel.close()
|
||||
|
||||
# IMPORTANT: if the test was successful, write the report into the variables ret and success.
|
||||
# for example, write the following:
|
||||
|
||||
@@ -219,124 +219,3 @@ startTest(test, device, parameters)
|
||||
#### IF NEEDED, ADD YOUR FUNCTIONS HERE BELOW ####
|
||||
# Indent to end left
|
||||
# def yourCustomFunction:
|
||||
|
||||
|
||||
class Interlock:
|
||||
"""
|
||||
class for getting - setting interlock mode
|
||||
"""
|
||||
from array import *
|
||||
# mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5
|
||||
rps_beamline = 0
|
||||
rps_mode = 1
|
||||
rps_expected1 = 2
|
||||
rps_expected2 = 3
|
||||
rps_mask1 = 0x4440 # mask for outputs 2-4
|
||||
rps_mask2 = 0x0004 # mask for output 5
|
||||
msg_default = bytearray()
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Message composition:
|
||||
0x32 = 50 = byte count
|
||||
0x69 = 105 = transfer function
|
||||
0x99,0x88 = this application port number (swapped)
|
||||
element 7 = message id
|
||||
element 16 = io-func reset=2 read mode=0 write mode=1
|
||||
"""
|
||||
self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0])
|
||||
|
||||
def _msgRead():
|
||||
self.msg_default[16] = 0
|
||||
return self.msg_default
|
||||
|
||||
def _msgWrite():
|
||||
self.msg_default[16] = 1
|
||||
return self.msg_default
|
||||
|
||||
def _msgReset():
|
||||
self.msg_default[16] = 2
|
||||
return self.msg_default
|
||||
|
||||
def setInterlockMode(self, mode)
|
||||
udp = UDPDatagram()
|
||||
msg = self._msgWrite()
|
||||
modebytes = array('B', mode)
|
||||
msg.extend(modebytes)
|
||||
#import binascii
|
||||
#print binascii.hexlify(msg)
|
||||
if udp.send(msg) == null:
|
||||
return false
|
||||
else:
|
||||
return true
|
||||
|
||||
def getInterlockMode(self)
|
||||
udp = UDPDatagram()
|
||||
msg = self._msgRead()
|
||||
#import binascii
|
||||
#print binascii.hexlify(msg)
|
||||
rcv = udp.send(msg)
|
||||
#print binascii.hexlify(rcv)
|
||||
return rcv
|
||||
|
||||
def masterReset(self)
|
||||
udp = UDPDatagram()
|
||||
msg = self._msgReset()
|
||||
#import binascii
|
||||
#print binascii.hexlify(msg)
|
||||
if udp.send(msg) == null:
|
||||
return false
|
||||
else:
|
||||
return true
|
||||
|
||||
class UDPDatagram:
|
||||
"""
|
||||
class for receiving and sending a udp message
|
||||
"""
|
||||
import socket
|
||||
|
||||
# the constructor
|
||||
def __init__(self):
|
||||
# communication with interlock server
|
||||
self.serverPort = 0xBB1B # interlock server
|
||||
self.serverName = 'PROMC1' # ball server
|
||||
try:
|
||||
self.sock = socket.socket(socket.AF_INET, # Internet
|
||||
socket.SOCK_DGRAM) # UDP
|
||||
except socket.error, msg:
|
||||
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
||||
sys.exit()
|
||||
|
||||
def _sendUDP(self, message):
|
||||
try:
|
||||
# get the ip address and send
|
||||
self.sock.sendto(message, (self.serverName, self.serverPort))
|
||||
except socket.error, msg:
|
||||
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
||||
sys.exit()
|
||||
|
||||
def _listenInit(self):
|
||||
try:
|
||||
self.sock.bind('', self.ThisPort)
|
||||
except socket.error, msg:
|
||||
print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
|
||||
sys.exit()
|
||||
print 'Socket bind complete. '
|
||||
|
||||
def _listen(self):
|
||||
# now keep talking with the client
|
||||
while 1:
|
||||
# receive data from client (data, addr)
|
||||
data, addr = self.sock.recvfrom(1024)
|
||||
if not data:
|
||||
print 'End of data or no data'
|
||||
break
|
||||
print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip()
|
||||
|
||||
self.sock.close()
|
||||
return data[28:]
|
||||
|
||||
def send(self, message):
|
||||
self._listenInit()
|
||||
self._sendUDP(message)
|
||||
return self._listen()
|
||||
Reference in New Issue
Block a user