339 lines
13 KiB
Python
339 lines
13 KiB
Python
import ch.psi.pshell.epics.Motor
|
|
|
|
|
|
class WireScanInfo(DeviceBase):
|
|
def __init__(self, name, prefix):
|
|
DeviceBase.__init__(self, name)
|
|
self.prefix = prefix
|
|
self.nb_cycles = Channel(self.prefix + ":NB_CYCL_SP", 'l')
|
|
self.curr_cycl = Channel(self.prefix + ":CURR_CYCL", 'l', callback = self.on_cycle_change)
|
|
self.curr_cycl.set_monitored(True)
|
|
set_device_alias(self.curr_cycl, "current_cycle")
|
|
self.current_cycle = self.curr_cycl.get()
|
|
self.status_channels=[]
|
|
for s in ("SCANNING", "SCAN_DONE", "INITIALIZING", "INIT_DONE", "ABORTED", "ERROR"):
|
|
c = Channel(self.prefix + ":" + s, 'i', callback = self.on_status_change);
|
|
c.set_monitored(True)
|
|
self.status_channels.append(c)
|
|
self.cycles = self.nb_cycles.get()
|
|
self.on_status_change(None)
|
|
self.initialize()
|
|
|
|
self.home_offsets = []
|
|
for s in ("W1X_U0_SP", "W1Y_U0_SP", "W2X_U0_SP", "W2Y_U0_SP", "FOIL_U0_SP"):
|
|
self.home_offsets.append(caget(self.prefix + ":" +s, 'd'))
|
|
|
|
def on_status_change(self, val):
|
|
try:
|
|
if self.status_channels[0].get() == 1:
|
|
self.setCache("Scanning " + str(self.current_cycle) + "/" + str(self.cycles), None)
|
|
self.setState(State.Busy)
|
|
elif self.status_channels[1].get() == 1:
|
|
self.setCache("Scan done", None)
|
|
self.setState(State.Ready)
|
|
elif self.status_channels[2].get() == 1:
|
|
self.setCache("Traveling", None)
|
|
self.setState(State.Paused)
|
|
elif self.status_channels[3].get() == 1:
|
|
self.setCache("At start", None)
|
|
self.setState(State.Ready)
|
|
elif self.status_channels[4].get() == 1:
|
|
self.setCache("Abort", None)
|
|
self.setState(State.Ready)
|
|
elif self.status_channels[5].get() == 1:
|
|
self.setCache("Error", None)
|
|
self.setState(State.Fault)
|
|
else:
|
|
pass #All zero, a transition
|
|
except:
|
|
self.setCache("offline", None)
|
|
self.setState(State.Offline)
|
|
|
|
def on_cycle_change(self, val):
|
|
#print "Wire scan cycle change: ", val
|
|
self.current_cycle = val
|
|
self.on_status_change(val)
|
|
|
|
def doClose(self):
|
|
self.nb_cycles.close()
|
|
self.curr_cycl.close()
|
|
for c in self.status_channels:
|
|
c.close()
|
|
|
|
def get_wire_pos(self, pos_motor):
|
|
if (pos_motor is None) or math.isnan(pos_motor):
|
|
return [pos_motor] * 4
|
|
w1x = (pos_motor - self.home_offsets[0]) / -(math.sqrt(2))
|
|
w1y = (pos_motor - self.home_offsets[1]) / (math.sqrt(2))
|
|
w2x = (pos_motor - self.home_offsets[2]) / -(math.sqrt(2))
|
|
w2y = (pos_motor - self.home_offsets[3]) / (math.sqrt(2))
|
|
return [w1x, w1y, w2x, w2y]
|
|
|
|
def get_motor_pos(self, pos_wire, wire_type):
|
|
if (pos_wire is None) or math.isnan(pos_wire):
|
|
return pos_wire
|
|
if wire_type == WireScanner.WireX1: return self.home_offsets[0] - pos_wire * math.sqrt(2)
|
|
if wire_type == WireScanner.WireY1: return self.home_offsets[1] + pos_wire * math.sqrt(2)
|
|
if wire_type == WireScanner.WireX2: return self.home_offsets[2] - pos_wire * math.sqrt(2)
|
|
if wire_type == WireScanner.WireY2: return self.home_offsets[3] + pos_wire * math.sqrt(2)
|
|
return None
|
|
|
|
def is_valid(self):
|
|
return caget(self.prefix + ":VALID", 'i')==1
|
|
|
|
def _get_channel(self, bunch, wire, name):
|
|
return self.prefix + ":B" + str(int(bunch)) + "_" + wire.upper() + "_" + name
|
|
|
|
def set_out_com(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "CENTER_OF_MASS"), float(value))
|
|
|
|
def set_out_rms(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "RMS"), float(value))
|
|
|
|
def set_out_amp(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "FIT_AMPLITUDE"), float(value))
|
|
|
|
def set_out_mean(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "FIT_MEAN"), float(value))
|
|
|
|
def set_out_off(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "FIT_OFFSET"), float(value))
|
|
|
|
def set_out_sigma(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "FIT_STANDARD_DEVIATION"), float(value))
|
|
|
|
def set_out_pos(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "POSITION"), to_array(value, 'd'))
|
|
|
|
def set_out_samples(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "AMPLITUDE"), to_array(value, 'd'))
|
|
|
|
def set_out_gauss(self, bunch, wire, value):
|
|
caput(self._get_channel(bunch, wire, "FIT_GAUSS_FUNCTION"), to_array(value, 'd'))
|
|
|
|
|
|
def new_scan_info_device(name, prefix):
|
|
return WireScanInfo(name, prefix)
|
|
|
|
def get_wire_pos(wire_scanner, pos):
|
|
return wire_scanner.get_wire_pos(pos)
|
|
|
|
def get_scan_selection(scan_type, index = 0):
|
|
if scan_type == WireScanner.WireX1: return WireScanner.W1X
|
|
if scan_type == WireScanner.WireY1: return WireScanner.W1Y
|
|
if scan_type == WireScanner.WireX2: return WireScanner.W2X
|
|
if scan_type == WireScanner.WireY2: return WireScanner.W2Y
|
|
if scan_type == WireScanner.Set1: return WireScanner.W1X if (index==0) else WireScanner.W1Y
|
|
if scan_type == WireScanner.Set2: return WireScanner.W2X if (index==0) else WireScanner.W2Y
|
|
return None
|
|
|
|
class WireScanner(WireScanInfo):
|
|
|
|
ScanType = [WireX1, WireY1, WireX2, WireY2, Set1, Set2, BackGarage, BackFoil] = ['X1', 'Y1', 'X2', 'Y2', 'Set1', 'Set2', 'BackgroundGarage', 'BackgroundFoil']
|
|
Selection = [Garage, W1X, W1Y, W2X, W2Y, Foil] = "GARAGE", "W1X", "W1Y", "W2X", "W2Y", "FOIL"
|
|
|
|
def __init__(self, prefix, scan_range, cycles=None, velocity=None, continuous = None):
|
|
WireScanInfo.__init__(self, "Wire Scan " + prefix, prefix)
|
|
self.motor = ch.psi.pshell.epics.Motor("WireScanner motor", self.prefix + ":MOTOR_1")
|
|
self.motor.uploadConfig()
|
|
self.motor.initialize()
|
|
#self.motor_bs_readback = Channel(self.prefix + ":ENC_1_BS") #, callback = self.on_readback_change)
|
|
self.motor_bs_readback = Channel("TESTIOC:TESTCALCOUT:Output")
|
|
#self.motor_bs_readback.set_monitored(True)
|
|
self.wire_velocity = Channel(self.prefix + ":SCAN_VELO_SP") #wire coordinates
|
|
self.motor_velocity = Channel(self.prefix + ":SCAN_M_VELO") #motor coordinates
|
|
self.travel_velocity = Channel(self.prefix + ":TRAVEL_VELO_SP") #motor coordinates
|
|
self.wire_sel = Channel(self.prefix + ":WIRE_SP", 'l')
|
|
self.selection = None
|
|
self.u0 = None
|
|
self.offset = None
|
|
self.range = None
|
|
self.start = None
|
|
self.end = None
|
|
self.scan_range = scan_range
|
|
|
|
if velocity is not None:
|
|
self.set_velocity(velocity)
|
|
if cycles is not None:
|
|
self.nb_cycles.write(int(cycles))
|
|
if continuous is not None:
|
|
caputq(self.prefix + ":SCAN_MODE_SP", 0 if continuous else 1)
|
|
|
|
self.readback = self.motor_bs_readback.get()
|
|
self.cycles = self.nb_cycles.get()
|
|
self.velocity = self.wire_velocity.get()
|
|
self.initialize()
|
|
|
|
#def on_readback_change(self, val):
|
|
# self.readback = val
|
|
|
|
def set_velocity(self, velocity):
|
|
self.wire_velocity.write(float(velocity))
|
|
self.velocity = self.wire_velocity.get()
|
|
|
|
def set_travel_velocity(self):
|
|
tv = self.travel_velocity.read()
|
|
self.set_velocity(tv/math.sqrt(2))
|
|
self.motor.speed=(tv)
|
|
|
|
|
|
def set_selection(self, sel):
|
|
if not sel in WireScanner.Selection:
|
|
raise Exception("Invalid Wire Scan selection: " + str(sel))
|
|
self.selection = sel
|
|
self.u0 = Channel(self.prefix + ":" + self.selection + "_U0_SP")
|
|
self.offset = Channel(self.prefix + ":" + self.selection + "_OFF_SP")
|
|
self.range = Channel(self.prefix + ":" + self.selection + "_RANGE_SP")
|
|
self.start = Channel(self.prefix + ":" + self.selection + "_START_SP")
|
|
self.end = Channel(self.prefix + ":" + self.selection + "_END_SP")
|
|
self.wire_sel.put(WireScanner.Selection.index(sel))
|
|
|
|
#Setting parameters
|
|
if self.scan_range is not None:
|
|
if sel in ["W1X", "W2X"]: self.start.write(float(self.scan_range[0]))
|
|
if sel in ["W1Y", "W2Y"]: self.start.write(float(self.scan_range[2]))
|
|
if sel in ["W1X", "W2X"]: self.end.write(float(self.scan_range[1]))
|
|
if sel in ["W1Y", "W2Y"]: self.end.write(float(self.scan_range[3]))
|
|
print "Sel: ", sel
|
|
print "Range: ", self.scan_range
|
|
print "Scan start: ", self.start.read()
|
|
print "End start: ", self.end.read()
|
|
|
|
|
|
def abort(self):
|
|
caputq(self.prefix + ":ABORT.PROC", 1)
|
|
|
|
def init(self, wait=False):
|
|
#if self.selection is not None:
|
|
# self.wire_sel.put(WireScanner.Selection.index(self.selection))
|
|
caputq(self.prefix + ":INIT.PROC", 1)
|
|
if wait:
|
|
self.wait_in_selection()
|
|
|
|
def park(self, wait=False):
|
|
caputq(self.prefix + ":GARAGE_SEL.PROC", 1)
|
|
caputq(self.prefix + ":INIT.PROC", 1)
|
|
if wait:
|
|
self.wait_in_selection()
|
|
|
|
def wait_in_selection(self):
|
|
time.sleep(0.5) #Some time for the status change
|
|
self.waitValue("At start", 60000)
|
|
|
|
def scan(self):
|
|
self.cycles = self.nb_cycles.get()
|
|
caputq(self.prefix + ":SCAN_WIRE", 1)
|
|
|
|
def get_sel_wire_pos(self, pos_motor=None):
|
|
if pos_motor is None:
|
|
pos_motor = self.motor_bs_readback.get()
|
|
wire_pos = self.get_wire_pos(pos_motor)
|
|
if self.selection == WireScanner.W1X: return wire_pos[0]
|
|
if self.selection == WireScanner.W1Y: return wire_pos[1]
|
|
if self.selection == WireScanner.W2X: return wire_pos[2]
|
|
if self.selection == WireScanner.W2Y: return wire_pos[3]
|
|
return float('nan')
|
|
|
|
def doClose(self):
|
|
WireScanInfo.doClose(self)
|
|
self.motor.close()
|
|
self.motor_bs_readback.close()
|
|
self.wire_velocity.close()
|
|
self.motor_velocity.close()
|
|
self.travel_velocity.close()
|
|
self.wire_sel.close()
|
|
if self.u0 is not None: self.u0.close()
|
|
if self.offset is not None: self.offset.close()
|
|
if self.range is not None: self.range.close()
|
|
if self.start is not None: self.start.close()
|
|
if self.end is not None: self.end.close()
|
|
"""
|
|
def get_cicle_time(self):
|
|
range = abs(self.start.get() -self.end.get())
|
|
speed = self.motor_velocity.get()
|
|
return (range / speed)
|
|
|
|
def get_total_time(self):
|
|
return self.get_cicle_time() * self.cycles
|
|
"""
|
|
|
|
|
|
|
|
|
|
prefix="STEST-DWSC"
|
|
scan_range = None
|
|
cycles = 1
|
|
|
|
if scan_range is None or len(scan_range) !=4:
|
|
scan_range = [ caget(prefix+":W2X_START_SP", 'd'), \
|
|
caget(prefix+":W2X_END_SP", 'd'), \
|
|
caget(prefix+":W2Y_START_SP", 'd'), \
|
|
caget(prefix+":W2Y_END_SP", 'd') ]
|
|
|
|
scan_range = [ caget(prefix+":W1X_START_SP", 'd'), \
|
|
caget(prefix+":W1X_END_SP", 'd'), \
|
|
caget(prefix+":W1Y_START_SP", 'd'), \
|
|
caget(prefix+":W1Y_END_SP", 'd') ]
|
|
|
|
velocity = abs(scan_range[1]-scan_range[0])*100/2000
|
|
|
|
print scan_range
|
|
print velocity
|
|
|
|
scanner = WireScanner(prefix, scan_range = scan_range, cycles=cycles, velocity=None, continuous = True)
|
|
add_device(scanner, True)
|
|
add_device(scanner.motor , True)
|
|
scanner.motor.monitored=True
|
|
|
|
|
|
#"m_pos" -> scanner.motor_bs_readback
|
|
m_pos = scanner.motor.readback
|
|
class w_pos(Readable):
|
|
def read(self):
|
|
return scanner.get_sel_wire_pos(m_pos.take())
|
|
|
|
|
|
|
|
|
|
scanner.park(wait=True)
|
|
scanner.set_velocity(velocity)
|
|
scanner.set_selection(WireScanner.W1X)
|
|
scanner.init(wait=True)
|
|
scanner.curr_cycl.write(0)
|
|
scan_complete=False
|
|
cur_cycle = 1.0
|
|
#Scan
|
|
|
|
|
|
scanner.scan() #scanner.waitState(State.Busy, 60000) Not needed as stream filter will make the wait
|
|
|
|
|
|
def check_end_scan(record, scan):
|
|
global scan_complete,cur_cycle
|
|
print scanner.take()
|
|
if scanner.take() == "Scan done":
|
|
print "Done"
|
|
scan_complete=True
|
|
scan.abort()
|
|
record.cancel() #So it won't be saved
|
|
|
|
try:
|
|
l=[w_pos()] ; #l.extend(st.getReadables()); l.append(Timestamp())
|
|
print "Start scan"
|
|
#mscan (st, l, -1, -1, take_initial = True, after_read = check_end_scan)
|
|
mscan (m_pos, l, -1, -1, take_initial = True, after_read = check_end_scan)
|
|
#print "End scan"
|
|
#tscan([w_pos()] + st.getReadables() + [Timestamp(),], 10, 0.5)
|
|
except:
|
|
print sys.exc_info()[1]
|
|
if not scanner.isReady():
|
|
print "Aborting scan"
|
|
scanner.abort()
|
|
if not scan_complete:
|
|
raise
|
|
finally:
|
|
print "Finished"
|
|
#scanner.close()
|
|
#st.close()
|
|
|