Currently it lets you set coils and registers for three controllers. It will also set the up_to_speed, lev_complete, and run flags for the system status. TODO Phase control and locking.
282 lines
8.3 KiB
Python
Executable File
282 lines
8.3 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
# MODBUS
|
|
# ADU : Application Data Unit
|
|
# MBAP : MODBUS Application Protocol header
|
|
# PDU : Protocol Data Unit
|
|
# FC : Function Code
|
|
# TID : Transaction Identifier (2 Bytes)
|
|
# PID : Protocol Identifier, 0 for MODBUS, (2 Bytes)
|
|
# len : Number of bytes following the len field, (2 Bytes)
|
|
# UID : Unit Identifier (1 Byte)
|
|
#
|
|
# MODBUS TCP/IP ADU
|
|
# ADU = [MBAP][PDU]
|
|
# MBAP = [TID][PID][len][UID]
|
|
# PDU = [FC][Data]
|
|
|
|
import binascii
|
|
from struct import *
|
|
from collections import namedtuple
|
|
from twisted.internet import reactor, protocol
|
|
from twisted.protocols.basic import LineReceiver
|
|
|
|
# Coil addresses per command
|
|
CSTART = 0
|
|
CSTOP = 1
|
|
CIDLE = 2
|
|
CRESET = 3
|
|
|
|
# System Status Flags
|
|
_OK = 1 << 0
|
|
_UP_TO_SPEED = 1 << 1
|
|
_RUN = 1 << 2
|
|
_ALARM = 1 << 3
|
|
_LEV = 1 << 4
|
|
_PHLOCK = 1 << 5
|
|
_MD = 1 << 6
|
|
_AVC_ON = 1 << 7
|
|
|
|
# Interlock Status Flags
|
|
_DSP_WD_FAIL = 1 << 0
|
|
_OSC_FAIL = 1 << 1
|
|
_POS_ALARM = 1 << 2
|
|
_ESTOP = 1 << 3
|
|
_UPS_FAIL = 1 << 4
|
|
_EXT_FLT = 1 << 5
|
|
_CC_WD_FAIL = 1 << 6
|
|
_OVERSPEED = 1 << 7
|
|
_VAC_FAIL = 1 << 8
|
|
_OVERTEMP = 1 << 9
|
|
_REFLOSS = 1 << 10
|
|
_SPD_SENSOR_LOSS = 1 << 11
|
|
_COOLING_LOSS = 1 << 12
|
|
_DSP_SHUTDOWN = 1 << 13
|
|
_CC_SHUTDOWN_REQ = 1 << 14
|
|
_TEST_MODE = 1 << 15
|
|
|
|
# Mode values
|
|
_RPM = 0
|
|
_PHASE = 1
|
|
# Direction
|
|
_CW = 0
|
|
_CCW = 1
|
|
_POSDIR = _CCW
|
|
# Some descriptive names for default register values per chopper.
|
|
# The values are indexed by UID.
|
|
RVAL = {
|
|
'mode': {1: _RPM, 2: _PHASE, 3: _PHASE},
|
|
'dir' : {1: _POSDIR, 2: _POSDIR, 3: _POSDIR}
|
|
}
|
|
class Fermi_Prot(LineReceiver):
|
|
|
|
def __init__(self):
|
|
self.mbhlen = 7
|
|
self.fcbyte = 7
|
|
self.datstart=8
|
|
self.MBAP = namedtuple('MBAP','TID PID len UID')
|
|
self.MBFN={1:self.rcoils,3:self.rhregs,5:self.wcoil,16:self.wmregs}
|
|
self.setRawMode()
|
|
self.RegVal = {}
|
|
self.FermiCoil = {}
|
|
for uid in [1,2,3]:
|
|
self.FermiCoil[uid] = [0,0,0,0]
|
|
mode = RVAL['mode'][uid]
|
|
dir = RVAL['dir'][uid]
|
|
sys_stat = _AVC_ON|_OK
|
|
self.RegInfo = {
|
|
10: ['U16', 'System Status Information'],
|
|
12: ['U16', 'Interlock Status Information'],
|
|
14: ['U16', 'Rotational Speed'],
|
|
16: ['F32', 'Encoder Angle (MB350PC/R ONLY)'],
|
|
18: ['U32', 'Phase Veto Count'],
|
|
20: ['U32', 'Read and Clear Phase Veto Count'],
|
|
22: ['U32', 'Read and Clear Phase Veto Count' ],
|
|
24: ['F32', 'Read PHASE ACCURACY'],
|
|
26: ['F32', 'Read PHASE REPEATABILITY'],
|
|
28: ['F32', 'Read PHASE OK'],
|
|
30: ['U32', 'Set VETO WINDOW 100ns'],
|
|
32: ['U32', 'Set VETO WINDOW 50ns'],
|
|
34: ['U32', 'Set MOTOR CONTROL MODE'],
|
|
1000: ['U32', 'Set ROTATIONAL SPEED SETPOINT'],
|
|
1002: ['F32', 'Set HOMING ANGLE SETPOINT (MB350PC/R ONLY)'],
|
|
1004: ['F32', 'Set MOTOR KP'],
|
|
1006: ['F32', 'Set MOTOR KI'],
|
|
1008: ['F32', 'Set MOTOR KPhase'],
|
|
1010: ['F32', 'Set REFERENCE DELAY'],
|
|
1012: ['F32', 'Set REFERENCE PERIOD'],
|
|
1014: ['U32', 'Set SYNC SOURCE'],
|
|
1016: ['U32', 'Set MOTOR DIRECTION']
|
|
}
|
|
self.RegVal[uid] = {
|
|
10: sys_stat,
|
|
12: 0,
|
|
14: 1000,
|
|
16: 0,
|
|
18: 0,
|
|
20: 0,
|
|
22: 0,
|
|
24: 0,
|
|
26: 0,
|
|
28: 0,
|
|
30: 0,
|
|
32: 0,
|
|
34: mode,
|
|
1000: 0,
|
|
1002: 0,
|
|
1004: 0,
|
|
1006: 0,
|
|
1008: 0,
|
|
1010: 0,
|
|
1012: 400000,
|
|
1014: 0,
|
|
1016: dir
|
|
}
|
|
|
|
def getFR(self, SA,QR):
|
|
uid = self.mbap.UID
|
|
type = self.RegInfo[SA][0]
|
|
if (type == 'U32' or type == 'F32'):
|
|
NE = QR/2
|
|
else:
|
|
NE = QR
|
|
EA = 2*NE + SA
|
|
print 'NE = %d EA = %d ' % (NE,EA)
|
|
data = []
|
|
for a in range(SA, EA, 2):
|
|
print 'reg %d = %s' % (a, self.RegInfo[a][1])
|
|
data += [self.RegVal[uid][a]]
|
|
return data
|
|
|
|
def setFR(self, SA, QR, data):
|
|
uid = self.mbap.UID
|
|
type = self.RegInfo[SA][0]
|
|
if (type == 'U32' or type == 'F32'):
|
|
NE = QR/2
|
|
fs = '>%dI'
|
|
else:
|
|
NE = QR
|
|
fs = '>%dH'
|
|
EA = 2*NE + SA
|
|
print 'NE = %d EA = %d ' % (NE,EA)
|
|
t = unpack(fs % NE,data)
|
|
print 'setFR(): t = ', map(hex, t)
|
|
i = 0
|
|
for a in range(SA, EA, 2):
|
|
n = t[i]
|
|
print 'setFR() a = ', a, ' i = ', i, 'setFR() n = ', n
|
|
print 'set reg %d = %s' % (a, self.RegInfo[a][1]), ' to ', hex(n)
|
|
self.RegVal[uid][a] = n
|
|
i += 1
|
|
return
|
|
|
|
def rawDataReceived(self, ADU):
|
|
# print "Received ADU " + binascii.hexlify(ADU)
|
|
# print "Received ADU({}) {!r}".format(len(ADU),ADU)
|
|
print "Received ADU ", ADU.encode('hex')
|
|
self.ADU = ADU
|
|
dl = len(ADU)
|
|
self.mbap = self.MBAP._make(unpack('>3HB',ADU[:self.mbhlen]))
|
|
self.fcode = unpack('>B', ADU[self.fcbyte])[0]
|
|
print self.mbap, 'fcode = ', self.fcode
|
|
self.MBFN[self.fcode]()
|
|
|
|
# if (dl == self.mbhlen + self.mbap.len - 1):
|
|
# print self.MBAP._make(unpack('>
|
|
|
|
def connectionMade(self):
|
|
print "Connection made"
|
|
|
|
def getPDU(self):
|
|
return self.ADU[self.datstart:]
|
|
|
|
def rcoils(self):
|
|
# FC = Function Code
|
|
# SA = Start Address, QC = Quantity of Coils
|
|
# BC = Byte Count, CS = Coil Status (each bit = status of one coil)
|
|
# Request: FC<1B>, SA<2B>, QC<2B>
|
|
# Response: FC<1B>, BC<1B>, CS<n*B>
|
|
# n = number of eights in QC plus 1 if there is a remainder.
|
|
# ie n = QC // 8 + (1 if QC % 8 else 0)
|
|
uid = self.mbap.UID
|
|
PDU = self.getPDU()
|
|
(SA, QC) = unpack('>2H',PDU)
|
|
print 'rcoils:SA=%d, QC=%d' % (SA,QC)
|
|
hdr = self.mbap._replace(len = 4)
|
|
BC = 1 # ByteCount
|
|
#TODO Coil values should be an array of bytes with each bit representing a coil
|
|
if (self.FermiCoil[uid][SA] == 0xFF00):
|
|
data = 1
|
|
else:
|
|
data = 0
|
|
resp = pack('>3HB', *hdr) + pack('>2B%dB' % BC, self.fcode, BC, data)
|
|
print 'resp = ', resp.encode('hex')
|
|
self.sendLine(resp)
|
|
|
|
def wcoil(self):
|
|
uid = self.mbap.UID
|
|
PDU = self.getPDU()
|
|
(OA, OV) = unpack('>2H', self.ADU[self.datstart:])
|
|
print 'wcoil:OA=%d, OV=%X' % (OA,OV)
|
|
self.FermiCoil[uid][OA] = OV
|
|
if (OA == CSTART):
|
|
# Set RotSpeed value
|
|
self.RegVal[uid][14] = self.RegVal[uid][1000]
|
|
if (self.RegVal[uid][34] == _PHASE):
|
|
self.RegVal[uid][10] |= (_UP_TO_SPEED|_RUN|_LEV|_PHLOCK)
|
|
else:
|
|
self.RegVal[uid][10] |= (_UP_TO_SPEED|_RUN|_LEV)
|
|
elif (OA == CSTOP):
|
|
self.RegVal[uid][14] = 0
|
|
if (self.RegVal[uid][34] == _PHASE):
|
|
self.RegVal[uid][10] &= ~(_UP_TO_SPEED|_RUN|_LEV|_PHLOCK)
|
|
else:
|
|
self.RegVal[uid][10] &= ~(_UP_TO_SPEED|_RUN|_LEV)
|
|
elif (OA == CIDLE):
|
|
print "Set IDLE"
|
|
elif (OA == CRESET):
|
|
print "RESET"
|
|
print 'resp = ', self.ADU.encode('hex')
|
|
self.sendLine(self.ADU)
|
|
|
|
def rhregs(self):
|
|
uid = self.mbap.UID
|
|
PDU = self.getPDU()
|
|
(SA, QR) = unpack('>2H', self.ADU[self.datstart:])
|
|
print 'rhregs:SA=%d, QR=%d' % (SA,QR)
|
|
regval = self.getFR(SA,QR)
|
|
print 'rhregs:data = ', regval
|
|
hdr = self.mbap._replace(len = 3 + 2*QR)
|
|
print 'hdr = ', hdr
|
|
type = self.RegInfo[SA][0]
|
|
if (type == 'U32' or type == 'F32'):
|
|
NE = QR/2
|
|
resp = pack('>3HB', *hdr) + pack('>2B%dI' % NE, self.fcode, 2*QR, *regval)
|
|
else:
|
|
resp = pack('>3HB', *hdr) + pack('>2B%dH' % QR, self.fcode, 2*QR, *regval)
|
|
print 'resp = ', resp.encode('hex')
|
|
self.sendLine(resp)
|
|
|
|
def wmregs(self):
|
|
PDU = self.getPDU()
|
|
(SA, QR, BC) = unpack('>2HB',PDU[:5])
|
|
data = PDU[5:]
|
|
print 'wmregs:SA=%d, QR=%d, BC=%d' % (SA,QR,BC), 'data = ', map(hex, unpack('>%dH' % QR, data))
|
|
self.setFR(SA, QR, data)
|
|
hdr = self.mbap._replace(len = 7)
|
|
resp = pack('>3HB', *hdr) + pack('>B2H', self.fcode, SA, QR)
|
|
print 'resp = ', resp.encode('hex')
|
|
self.sendLine(resp)
|
|
|
|
|
|
|
|
###
|
|
def main():
|
|
factory = protocol.ServerFactory()
|
|
factory.protocol = Fermi_Prot
|
|
reactor.listenTCP(61502,factory)
|
|
reactor.run()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|