diff --git a/site_ansto/instrument/TEST_SICS/fakeFermi/SIM_FERMI.py b/site_ansto/instrument/TEST_SICS/fakeFermi/SIM_FERMI.py new file mode 100755 index 00000000..0fd3a0bb --- /dev/null +++ b/site_ansto/instrument/TEST_SICS/fakeFermi/SIM_FERMI.py @@ -0,0 +1,281 @@ +#!/usr/bin/python + +# MODBUS +# ADU : Application Data Unit +# MBAP : MODBUS Application Protocol header +# PDU : Protocol Data Unit +# FC : Function Code +# TID : Transaction Identifier (2 Bytes) +# PID : Protocol Identifier, 0 for MODBUS, (2 Bytes) +# len : Number of bytes following the len field, (2 Bytes) +# UID : Unit Identifier (1 Byte) +# +# MODBUS TCP/IP ADU +# ADU = [MBAP][PDU] +# MBAP = [TID][PID][len][UID] +# PDU = [FC][Data] + +import binascii +from struct import * +from collections import namedtuple +from twisted.internet import reactor, protocol +from twisted.protocols.basic import LineReceiver + +# Coil addresses per command +CSTART = 0 +CSTOP = 1 +CIDLE = 2 +CRESET = 3 + +# System Status Flags +_OK = 1 << 0 +_UP_TO_SPEED = 1 << 1 +_RUN = 1 << 2 +_ALARM = 1 << 3 +_LEV = 1 << 4 +_PHLOCK = 1 << 5 +_MD = 1 << 6 +_AVC_ON = 1 << 7 + +# Interlock Status Flags +_DSP_WD_FAIL = 1 << 0 +_OSC_FAIL = 1 << 1 +_POS_ALARM = 1 << 2 +_ESTOP = 1 << 3 +_UPS_FAIL = 1 << 4 +_EXT_FLT = 1 << 5 +_CC_WD_FAIL = 1 << 6 +_OVERSPEED = 1 << 7 +_VAC_FAIL = 1 << 8 +_OVERTEMP = 1 << 9 +_REFLOSS = 1 << 10 +_SPD_SENSOR_LOSS = 1 << 11 +_COOLING_LOSS = 1 << 12 +_DSP_SHUTDOWN = 1 << 13 +_CC_SHUTDOWN_REQ = 1 << 14 +_TEST_MODE = 1 << 15 + +# Mode values +_RPM = 0 +_PHASE = 1 +# Direction +_CW = 0 +_CCW = 1 +_POSDIR = _CCW +# Some descriptive names for default register values per chopper. +# The values are indexed by UID. +RVAL = { + 'mode': {1: _RPM, 2: _PHASE, 3: _PHASE}, + 'dir' : {1: _POSDIR, 2: _POSDIR, 3: _POSDIR} +} +class Fermi_Prot(LineReceiver): + + def __init__(self): + self.mbhlen = 7 + self.fcbyte = 7 + self.datstart=8 + self.MBAP = namedtuple('MBAP','TID PID len UID') + self.MBFN={1:self.rcoils,3:self.rhregs,5:self.wcoil,16:self.wmregs} + self.setRawMode() + self.RegVal = {} + self.FermiCoil = {} + for uid in [1,2,3]: + self.FermiCoil[uid] = [0,0,0,0] + mode = RVAL['mode'][uid] + dir = RVAL['dir'][uid] + sys_stat = _AVC_ON|_OK + self.RegInfo = { + 10: ['U16', 'System Status Information'], + 12: ['U16', 'Interlock Status Information'], + 14: ['U16', 'Rotational Speed'], + 16: ['F32', 'Encoder Angle (MB350PC/R ONLY)'], + 18: ['U32', 'Phase Veto Count'], + 20: ['U32', 'Read and Clear Phase Veto Count'], + 22: ['U32', 'Read and Clear Phase Veto Count' ], + 24: ['F32', 'Read PHASE ACCURACY'], + 26: ['F32', 'Read PHASE REPEATABILITY'], + 28: ['F32', 'Read PHASE OK'], + 30: ['U32', 'Set VETO WINDOW 100ns'], + 32: ['U32', 'Set VETO WINDOW 50ns'], + 34: ['U32', 'Set MOTOR CONTROL MODE'], + 1000: ['U32', 'Set ROTATIONAL SPEED SETPOINT'], + 1002: ['F32', 'Set HOMING ANGLE SETPOINT (MB350PC/R ONLY)'], + 1004: ['F32', 'Set MOTOR KP'], + 1006: ['F32', 'Set MOTOR KI'], + 1008: ['F32', 'Set MOTOR KPhase'], + 1010: ['F32', 'Set REFERENCE DELAY'], + 1012: ['F32', 'Set REFERENCE PERIOD'], + 1014: ['U32', 'Set SYNC SOURCE'], + 1016: ['U32', 'Set MOTOR DIRECTION'] + } + self.RegVal[uid] = { + 10: sys_stat, + 12: 0, + 14: 1000, + 16: 0, + 18: 0, + 20: 0, + 22: 0, + 24: 0, + 26: 0, + 28: 0, + 30: 0, + 32: 0, + 34: mode, + 1000: 0, + 1002: 0, + 1004: 0, + 1006: 0, + 1008: 0, + 1010: 0, + 1012: 400000, + 1014: 0, + 1016: dir + } + + def getFR(self, SA,QR): + uid = self.mbap.UID + type = self.RegInfo[SA][0] + if (type == 'U32' or type == 'F32'): + NE = QR/2 + else: + NE = QR + EA = 2*NE + SA + print 'NE = %d EA = %d ' % (NE,EA) + data = [] + for a in range(SA, EA, 2): + print 'reg %d = %s' % (a, self.RegInfo[a][1]) + data += [self.RegVal[uid][a]] + return data + + def setFR(self, SA, QR, data): + uid = self.mbap.UID + type = self.RegInfo[SA][0] + if (type == 'U32' or type == 'F32'): + NE = QR/2 + fs = '>%dI' + else: + NE = QR + fs = '>%dH' + EA = 2*NE + SA + print 'NE = %d EA = %d ' % (NE,EA) + t = unpack(fs % NE,data) + print 'setFR(): t = ', map(hex, t) + i = 0 + for a in range(SA, EA, 2): + n = t[i] + print 'setFR() a = ', a, ' i = ', i, 'setFR() n = ', n + print 'set reg %d = %s' % (a, self.RegInfo[a][1]), ' to ', hex(n) + self.RegVal[uid][a] = n + i += 1 + return + + def rawDataReceived(self, ADU): +# print "Received ADU " + binascii.hexlify(ADU) +# print "Received ADU({}) {!r}".format(len(ADU),ADU) + print "Received ADU ", ADU.encode('hex') + self.ADU = ADU + dl = len(ADU) + self.mbap = self.MBAP._make(unpack('>3HB',ADU[:self.mbhlen])) + self.fcode = unpack('>B', ADU[self.fcbyte])[0] + print self.mbap, 'fcode = ', self.fcode + self.MBFN[self.fcode]() + +# if (dl == self.mbhlen + self.mbap.len - 1): +# print self.MBAP._make(unpack('> + + def connectionMade(self): + print "Connection made" + + def getPDU(self): + return self.ADU[self.datstart:] + + def rcoils(self): + # FC = Function Code + # SA = Start Address, QC = Quantity of Coils + # BC = Byte Count, CS = Coil Status (each bit = status of one coil) + # Request: FC<1B>, SA<2B>, QC<2B> + # Response: FC<1B>, BC<1B>, CS + # n = number of eights in QC plus 1 if there is a remainder. + # ie n = QC // 8 + (1 if QC % 8 else 0) + uid = self.mbap.UID + PDU = self.getPDU() + (SA, QC) = unpack('>2H',PDU) + print 'rcoils:SA=%d, QC=%d' % (SA,QC) + hdr = self.mbap._replace(len = 4) + BC = 1 # ByteCount + #TODO Coil values should be an array of bytes with each bit representing a coil + if (self.FermiCoil[uid][SA] == 0xFF00): + data = 1 + else: + data = 0 + resp = pack('>3HB', *hdr) + pack('>2B%dB' % BC, self.fcode, BC, data) + print 'resp = ', resp.encode('hex') + self.sendLine(resp) + + def wcoil(self): + uid = self.mbap.UID + PDU = self.getPDU() + (OA, OV) = unpack('>2H', self.ADU[self.datstart:]) + print 'wcoil:OA=%d, OV=%X' % (OA,OV) + self.FermiCoil[uid][OA] = OV + if (OA == CSTART): + # Set RotSpeed value + self.RegVal[uid][14] = self.RegVal[uid][1000] + if (self.RegVal[uid][34] == _PHASE): + self.RegVal[uid][10] |= (_UP_TO_SPEED|_RUN|_LEV|_PHLOCK) + else: + self.RegVal[uid][10] |= (_UP_TO_SPEED|_RUN|_LEV) + elif (OA == CSTOP): + self.RegVal[uid][14] = 0 + if (self.RegVal[uid][34] == _PHASE): + self.RegVal[uid][10] &= ~(_UP_TO_SPEED|_RUN|_LEV|_PHLOCK) + else: + self.RegVal[uid][10] &= ~(_UP_TO_SPEED|_RUN|_LEV) + elif (OA == CIDLE): + print "Set IDLE" + elif (OA == CRESET): + print "RESET" + print 'resp = ', self.ADU.encode('hex') + self.sendLine(self.ADU) + + def rhregs(self): + uid = self.mbap.UID + PDU = self.getPDU() + (SA, QR) = unpack('>2H', self.ADU[self.datstart:]) + print 'rhregs:SA=%d, QR=%d' % (SA,QR) + regval = self.getFR(SA,QR) + print 'rhregs:data = ', regval + hdr = self.mbap._replace(len = 3 + 2*QR) + print 'hdr = ', hdr + type = self.RegInfo[SA][0] + if (type == 'U32' or type == 'F32'): + NE = QR/2 + resp = pack('>3HB', *hdr) + pack('>2B%dI' % NE, self.fcode, 2*QR, *regval) + else: + resp = pack('>3HB', *hdr) + pack('>2B%dH' % QR, self.fcode, 2*QR, *regval) + print 'resp = ', resp.encode('hex') + self.sendLine(resp) + + def wmregs(self): + PDU = self.getPDU() + (SA, QR, BC) = unpack('>2HB',PDU[:5]) + data = PDU[5:] + print 'wmregs:SA=%d, QR=%d, BC=%d' % (SA,QR,BC), 'data = ', map(hex, unpack('>%dH' % QR, data)) + self.setFR(SA, QR, data) + hdr = self.mbap._replace(len = 7) + resp = pack('>3HB', *hdr) + pack('>B2H', self.fcode, SA, QR) + print 'resp = ', resp.encode('hex') + self.sendLine(resp) + + + +### +def main(): + factory = protocol.ServerFactory() + factory.protocol = Fermi_Prot + reactor.listenTCP(61502,factory) + reactor.run() + +if __name__ == '__main__': + main()