import ch.psi.pshell.epics.Motor class WireScanInfo(DeviceBase): def __init__(self, name, prefix): DeviceBase.__init__(self, name) self.prefix = prefix self.nb_cycles = Channel(self.prefix + ":NB_CYCL_SP", 'l') self.curr_cycl = Channel(self.prefix + ":CURR_CYCL", 'l', callback = self.on_cycle_change) self.curr_cycl.set_monitored(True) set_device_alias(self.curr_cycl, "current_cycle") self.current_cycle = self.curr_cycl.get() self.status_channels=[] for s in ("SCANNING", "SCAN_DONE", "INITIALIZING", "INIT_DONE", "ABORTED", "ERROR"): c = Channel(self.prefix + ":" + s, 'i', callback = self.on_status_change); c.set_monitored(True) self.status_channels.append(c) self.cycles = self.nb_cycles.get() self.on_status_change(None) self.initialize() self.home_offsets = [] for s in ("W1X_U0_SP", "W1Y_U0_SP", "W2X_U0_SP", "W2Y_U0_SP", "FOIL_U0_SP"): self.home_offsets.append(caget(self.prefix + ":" +s, 'd')) def on_status_change(self, val): try: if self.status_channels[0].get() == 1: self.setCache("Scanning " + str(self.current_cycle) + "/" + str(self.cycles), None) self.setState(State.Busy) elif self.status_channels[1].get() == 1: self.setCache("Scan done", None) self.setState(State.Ready) elif self.status_channels[2].get() == 1: self.setCache("Traveling", None) self.setState(State.Paused) elif self.status_channels[3].get() == 1: self.setCache("At start", None) self.setState(State.Ready) elif self.status_channels[4].get() == 1: self.setCache("Abort", None) self.setState(State.Ready) elif self.status_channels[5].get() == 1: self.setCache("Error", None) self.setState(State.Fault) else: pass #All zero, a transition except: self.setCache("offline", None) self.setState(State.Offline) def on_cycle_change(self, val): #print "Wire scan cycle change: ", val self.current_cycle = val self.on_status_change(val) def doClose(self): self.nb_cycles.close() self.curr_cycl.close() for c in self.status_channels: c.close() def get_wire_pos(self, pos_motor): if (pos_motor is None) or math.isnan(pos_motor): return [pos_motor] * 4 w1x = (pos_motor - self.home_offsets[0]) / -(math.sqrt(2)) w1y = (pos_motor - self.home_offsets[1]) / (math.sqrt(2)) w2x = (pos_motor - self.home_offsets[2]) / -(math.sqrt(2)) w2y = (pos_motor - self.home_offsets[3]) / (math.sqrt(2)) return [w1x, w1y, w2x, w2y] def is_valid(self): return caget(self.prefix + ":VALID", 'i')==1 def _get_channel(self, bunch, wire, name): return self.prefix + ":B" + str(int(bunch)) + "_" + wire.upper() + "_" + name def set_out_com(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "CENTER_OF_MASS"), float(value)) def set_out_rms(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "RMS"), float(value)) def set_out_amp(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "FIT_AMPLITUDE"), float(value)) def set_out_mean(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "FIT_MEAN"), float(value)) def set_out_off(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "FIT_OFFSET"), float(value)) def set_out_sigma(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "FIT_STANDARD_DEVIATION"), float(value)) def set_out_pos(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "POSITION"), to_array(value, 'd')) def set_out_samples(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "AMPLITUDE"), to_array(value, 'd')) def set_out_gauss(self, bunch, wire, value): caput(self._get_channel(bunch, wire, "FIT_GAUSS_FUNCTION"), to_array(value, 'd')) def new_scan_info_device(name, prefix): return WireScanInfo(name, prefix) def get_wire_pos(wire_scanner, pos): return wire_scanner.get_wire_pos(pos) def get_scan_selection(scan_type, index = 0): if scan_type == WireScanner.WireX1: return WireScanner.W1X if scan_type == WireScanner.WireY1: return WireScanner.W1Y if scan_type == WireScanner.WireX2: return WireScanner.W2X if scan_type == WireScanner.WireY2: return WireScanner.W2Y if scan_type == WireScanner.Set1: return WireScanner.W1X if (index==0) else WireScanner.W1Y if scan_type == WireScanner.Set2: return WireScanner.W2X if (index==0) else WireScanner.W2Y return None class WireScanner(WireScanInfo): ScanType = [WireX1, WireY1, WireX2, WireY2, Set1, Set2, BackGarage, BackFoil] = ['X1', 'Y1', 'X2', 'Y2', 'Set1', 'Set2', 'BackgroundGarage', 'BackgroundFoil'] Selection = [Garage, W1X, W1Y, W2X, W2Y, Foil] = "GARAGE", "W1X", "W1Y", "W2X", "W2Y", "FOIL" def __init__(self, prefix, scan_range, cycles=None, velocity=None, continuous = None): WireScanInfo.__init__(self, "Wire Scan " + prefix, prefix) self.motor = ch.psi.pshell.epics.Motor("WireScanner motor", self.prefix + ":MOTOR_1") self.motor.uploadConfig() self.motor.initialize() self.motor_bs_readback = Channel(self.prefix + ":ENC_1_BS") #, callback = self.on_readback_change) #self.motor_bs_readback.set_monitored(True) self.wire_velocity = Channel(self.prefix + ":SCAN_VELO_SP") #wire coordinates self.motor_velocity = Channel(self.prefix + ":SCAN_M_VELO") #motor coordinates self.travel_velocity = Channel(self.prefix + ":TRAVEL_VELO_SP") #motor coordinates self.wire_sel = Channel(self.prefix + ":WIRE_SP", 'l') self.selection = None self.u0 = None self.offset = None self.range = None self.start = None self.end = None self.scan_range = scan_range; if velocity is not None: self.wire_velocity.write(float(velocity)) if cycles is not None: self.nb_cycles.write(int(cycles)) if continuous is not None: caputq(self.prefix + ":SCAN_MODE_SP", 0 if continuous else 1) self.readback = self.motor_bs_readback.get() self.cycles = self.nb_cycles.get() self.velocity = self.wire_velocity.get() self.initialize() #def on_readback_change(self, val): # self.readback = val def set_selection(self, sel): if not sel in WireScanner.Selection: raise Exception("Invalid Wire Scan selection: " + str(sel)) self.selection = sel self.u0 = Channel(self.prefix + ":" + self.selection + "_U0_SP") self.offset = Channel(self.prefix + ":" + self.selection + "_OFF_SP") self.range = Channel(self.prefix + ":" + self.selection + "_RANGE_SP") self.start = Channel(self.prefix + ":" + self.selection + "_START_SP") self.end = Channel(self.prefix + ":" + self.selection + "_END_SP") self.wire_sel.put(WireScanner.Selection.index(sel)) #Setting parameters if self.scan_range is not None: if sel in ["W1X", "W2X"]: self.start.write(float(self.scan_range[0])) if sel in ["W1Y", "W2Y"]: self.start.write(float(self.scan_range[2])) if sel in ["W1X", "W2X"]: self.end.write(float(self.scan_range[1])) if sel in ["W1Y", "W2Y"]: self.end.write(float(self.scan_range[3])) def abort(self): caputq(self.prefix + ":ABORT.PROC", 1) def init(self, wait=False): #if self.selection is not None: # self.wire_sel.put(WireScanner.Selection.index(self.selection)) caputq(self.prefix + ":INIT.PROC", 1) if wait: self.wait_in_selection() def park(self, wait=False): caputq(self.prefix + ":GARAGE_SEL.PROC", 1) caputq(self.prefix + ":INIT.PROC", 1) if wait: self.wait_in_selection() def wait_in_selection(self): time.sleep(0.5) #Some time for the status change self.waitValue("At start", 60000) def scan(self): self.cycles = self.nb_cycles.get() caputq(self.prefix + ":SCAN_WIRE", 1) def get_sel_wire_pos(self, pos_motor=None): if pos_motor is None: pos_motor = self.motor_bs_readback.get() wire_pos = self.get_wire_pos(pos_motor) if self.selection == WireScanner.W1X: return wire_pos[0] if self.selection == WireScanner.W1Y: return wire_pos[1] if self.selection == WireScanner.W2X: return wire_pos[2] if self.selection == WireScanner.W2Y: return wire_pos[3] return float('nan') def doClose(self): WireScanInfo.doClose(self) self.motor.close() self.motor_bs_readback.close() self.wire_velocity.close() self.motor_velocity.close() self.travel_velocity.close() self.wire_sel.close() if self.u0 is not None: self.u0.close() if self.offset is not None: self.offset.close() if self.range is not None: self.range.close() if self.start is not None: self.start.close() if self.end is not None: self.end.close() """ def get_cicle_time(self): range = abs(self.start.get() -self.end.get()) speed = self.motor_velocity.get() return (range / speed) def get_total_time(self): return self.get_cicle_time() * self.cycles """