Files
ncs/script/tests/tests/sad/rightleft/rightleft.py
boccioli_m 6708ffee85 Closedown
2017-10-13 13:56:27 +02:00

342 lines
13 KiB
Python

# Test name: rightleft
# move around a value and plot the result
# Copyright (c) 2015 Paul Scherrer Institute. All rights reserved.
###### Init - DO NOT MODIFY THE CODE BELOW ######
global sys, inspect, os, traceback
import sys, inspect, os, traceback
from array import *
a = bytearray()
a = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0])
mode = "2,IQCOM,$BMA1,1,DIA"
modebytes = array('B', mode)
a.extend(modebytes)
#a.append(4)
#a.fromlist([1,2,3,4])
import binascii
print binascii.hexlify(a)
print a
a[16] = 1
print a
print binascii.hexlify(a)
def startTest(testName, DEVICE, params):
"""
Main method running the test
"""
# by default, assume the test failed:
ret = 'Test failed'
status = False
# put the whole custom code under try/catch.
try:
# get the path of this script:
testPath = inspect.getfile(inspect.currentframe())
# init the testing tool class:
test = TestingTool(testName, testPath, DEVICE, params)
################ END OF Init #####################
######### WRITE YOUR CODE HERE BELOW #############
"""
All the code in this section # WRITE YOUR CODE HERE BELOW # is just an example and can be modified/deleted.
It must be indented to the same level as this comment.
-----------------------------------
GETTING INPUTS:
-----------------------------------
If needed, the following methods are available:
test.getPath() string, path of this test file
test.getName() string, name of this test
test.getDeviceName() string, device for which the test must run (typically it is the beginning of a process variable name)
test.getPlotName() string, name to be given to the plot when using setPlotTitle(). Example: scan.setPlotTitle(test.getPlotName())
-----------------------------------
GETTING TEST PARAMETERS:
-----------------------------------
if you need to get parameters for the test, use:
myParamValue = test.getParam('myParamName')
the calls to getParam are added to the code automatically, one per parameter, when creating the new test.
NOTE: Casting may be necessary.
See the test config for the list of parameters specific to the test.
-----------------------------------
SETTING OUTPUTS:
-----------------------------------
When the test has ended (error or success), this method must be called in order to return to pshell:
test.sendFeedback(ret,success)
ret string, a text summarizing the result of the test.
success bool, True = test successful. False = test failed.
-----------------------------------
LOG INFO:
-----------------------------------
when some information must be shown on the log on pshell, use the following line:
test.log('text to log')
"""
########## Example (can be removed) ######
# print the list of parameters passed. If any error, stop and send feedback.
# test.log("Example - Test name: " + test.getName())
# test.log("Example - Device name: " + test.getDeviceName() )
try:
test.log("-------------------------------------------")
test.log("Running test with the following parameters:")
test.printParams()
# If present, use the parameters here below for your test script.
# These parameters were automatically generated: you might need to change the casting.
delay = float(test.getParam('delay')) ; centre = float(test.getParam('centre')) ; moveAround = float(test.getParam('moveAround')) ; steps = float(test.getParam('steps')) ;
except:
# test failed, write the report into the variables ret and success and send feedback:
import traceback
ret = 'Could not retrieve testing parameters - ' + traceback.format_exc()
success = False
test.sendFeedback(ret, success)
return
# loop to read channels for a while and plot the channels values.
# initialise plot tab with 2 plots: pass here the axis names:
scan = ManualScan(['sample'], ['Status (sta)', 'Position (VAL)'])
# set plot name(tab title):
plotName = test.getPlotName()
scan.setPlotTitle(plotName)
# start plots. See further below how to add points to the plots (scan):
scan.start()
#in this example we initialise also a plot type to show how to add several curves on the same plot
p1 = plot(None,name="motor_sta", title = plotName + " Multi curves")[0]
#opionally set plot ranges
#p1.getAxis(p1.AxisId.X).setRange(0.0,80.0)
#p1.getAxis(p1.AxisId.Y).setRange(0.0,70.0)
p1.addSeries(LinePlotSeries("motor_val"))
# inject a sinus into the plot, as example
from math import sin
motor_val = 0
# for testing: remove:
for sample in range(int(-moveAround), int(moveAround)+1, int(steps)):
break
readback1 = sample #the x axis.
sleep(delay) # settling time.
# get value (it is translated to a caget):
motor_val = motor_val + +1
if motor_val > 50:
motor_val = 0
# get value:
motor_sta = sin(float(readback1)/10.0)*10.0-10.0
# add values to plot:
#scan.append([readback1], [readback1], [motor_sta, motor_val])
# add values to 2 colour plot
p1.getSeries(0).appendData(motor_val, motor_sta)
# now try with data from real device: this part will most probably fail: correct the PV names with existing ones.
try:
# set up connection to channels. "type" of data can be "d" (= double), "l" (= long).
pv_motor_sta = Channel(test.getDeviceName() + ':ao' , type='d')
pv_motor_val = Channel(test.getDeviceName() + ':ai' , type='d')
pv_motor_sta = Channel('DMAF1:IST3:1', type='l')
except:
# prepare return information: return text:
import traceback
ret = 'Unable to create channel - ' + traceback.format_exc()
# prepare return information: return success:
success = False
# send return information:
test.sendFeedback(ret, success)
return
# send a command to a channel (it is translated to a caput): uncomment this line below to try it
#pv_motor_com.put(1.0, timeout=None) # optionally, a timeout can be given.
# take 100 samples of the channels and plot them:
forwardsValues = {}
maxError = 0.0
maxErrorPos = 0.0
for sample in range(int(-moveAround), int(moveAround)+1, int(steps)):
readback1 = sample #the x axis.
sleep(delay) # settling time.
# get value:
motor_val = float(readback1)
pv_motor_val.put(motor_val)
# get value (it is translated to a caget):
motor_sta = float(pv_motor_sta.get())
# add values to plot:
#scan.append([readback1], [readback1], [motor_sta, motor_val])
# 2 colour plot
p1.getSeries(0).appendData(motor_val, motor_sta)
forwardsValues[sample] = motor_sta
for sample in range(int(moveAround), int(-moveAround)-1, int(-steps)):
readback1 = sample #the x axis.
sleep(delay) # settling time.
# get value:
motor_val = float(readback1)
pv_motor_val.put(motor_val+1.0)
# get value (it is translated to a caget):
motor_sta = float(pv_motor_sta.get())
# add values to plot:
# scan.append([readback1], [readback1], [motor_sta, motor_val])
# 2 colour plot
p1.getSeries(1).appendData(motor_val, motor_sta)
if abs(forwardsValues[sample] - motor_sta) > maxError:
maxError = abs(forwardsValues[sample] - motor_sta)
maxErrorPos = sample
import java.awt.Color
p1.addMarker(maxErrorPos, None, "Max Err = " + str(maxError), java.awt.Color.GREEN)
# Closing channels: all channels that were opened with Channel() must be closed before exit:
pv_motor_sta.close()
pv_motor_val.close()
# IMPORTANT: if the test was successful, write the report into the variables ret and success.
# for example, write the following:
ret = "Test successful, max error: " + str(maxError) + " at " + str(maxErrorPos)
success = True
#test.sendFeedback(ret, success)
# once the test is finished, no need to do anything. The code below yours will do the rest.
################ End of Example ##########
################ END OF YOUR CODE ################
###### Final - DO NOT MODIFY THE CODE BELOW ######
# just in case the feedback was forgotten.
test.sendFeedback(ret, success)
except (KeyboardInterrupt):
# user stop error handler.
import traceback
ret = 'Test stopped by user.'
success = False
test.sendFeedback(ret, success)
except:
# generic error handler.
import traceback
ret = traceback.format_exc()
success = False
test.sendFeedback(ret, success)
# launch the test.
startTest(test, device, parameters)
################ END OF Final ####################
#### IF NEEDED, ADD YOUR FUNCTIONS HERE BELOW ####
# Indent to end left
# def yourCustomFunction:
class Interlock:
"""
class for getting - setting interlock mode
"""
from array import *
# mode, test pattern for outputs 2-4, test pattern for output 5, expected pattern for 2-4, expected pattern for 5
rps_beamline = 0
rps_mode = 1
rps_expected1 = 2
rps_expected2 = 3
rps_mask1 = 0x4440 # mask for outputs 2-4
rps_mask2 = 0x0004 # mask for output 5
msg_default = bytearray()
def __init__(self):
"""
Message composition:
0x32 = 50 = byte count
0x69 = 105 = transfer function
0x99,0x88 = this application port number (swapped)
element 7 = message id
element 16 = io-func reset=2 read mode=0 write mode=1
"""
self.msg_default = array('B',[0x32,0,0x69,0,0x99,0x88,1,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0])
def _msgRead():
self.msg_default[16] = 0
return self.msg_default
def _msgWrite():
self.msg_default[16] = 1
return self.msg_default
def _msgReset():
self.msg_default[16] = 2
return self.msg_default
def setInterlockMode(self, mode)
udp = UDPDatagram()
msg = self._msgWrite()
modebytes = array('B', mode)
msg.extend(modebytes)
#import binascii
#print binascii.hexlify(msg)
if udp.send(msg) == null:
return false
else:
return true
def getInterlockMode(self)
udp = UDPDatagram()
msg = self._msgRead()
#import binascii
#print binascii.hexlify(msg)
rcv = udp.send(msg)
#print binascii.hexlify(rcv)
return rcv
def masterReset(self)
udp = UDPDatagram()
msg = self._msgReset()
#import binascii
#print binascii.hexlify(msg)
if udp.send(msg) == null:
return false
else:
return true
class UDPDatagram:
"""
class for receiving and sending a udp message
"""
import socket
# the constructor
def __init__(self):
# communication with interlock server
self.serverPort = 0xBB1B # interlock server
self.serverName = 'PROMC1' # ball server
try:
self.sock = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
except socket.error, msg:
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
def _sendUDP(self, message):
try:
# get the ip address and send
self.sock.sendto(message, (self.serverName, self.serverPort))
except socket.error, msg:
print 'Failed to create socket. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
def _listenInit(self):
try:
self.sock.bind('', self.ThisPort)
except socket.error, msg:
print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
sys.exit()
print 'Socket bind complete. '
def _listen(self):
# now keep talking with the client
while 1:
# receive data from client (data, addr)
data, addr = self.sock.recvfrom(1024)
if not data:
print 'End of data or no data'
break
print 'Message[' + addr[0] + ':' + str(addr[1]) + '] - ' + data.strip()
self.sock.close()
return data[28:]
def send(self, message):
self._listenInit()
self._sendUDP(message)
return self._listen()