frappy/secop/bytesio.py
Markus Zolliker 49ad153605 trinamic driver and bytesio module
Change-Id: Id634e7514fecab6fd6bc3edf81e25ad41c2bb12f
2021-06-08 14:58:19 +02:00

91 lines
3.9 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""byte oriented stream communication"""
import time
import re
from secop.lib.asynconn import AsynConn, ConnectionClosed
from secop.modules import Property, Command
from secop.stringio import BaseIO
from secop.datatypes import BLOBType, IntRange, ArrayOf, TupleOf, StringType
from secop.errors import CommunicationFailedError, CommunicationSilentError
HEX_CODE = re.compile(r'[0-9a-fA-F][0-9a-fA-F]$')
class BytesIO(BaseIO):
identification = Property(
"""identification
a list of tuples with commands and expected responses, to be sent on connect.
commands and responses are whitespace separated items
an item is either:
- a two digit hexadecimal number (byte value)
- a character
- ?? indicating ignored bytes in responses
""", datatype=ArrayOf(TupleOf(StringType(), StringType())), default=[], export=False)
def connectStart(self):
if not self.is_connected:
uri = self.uri
self._conn = AsynConn(uri, b'')
self.is_connected = True
for command, match in self.identification:
cmdbytes = bytes([int(c, 16) if HEX_CODE.match(c) else ord(c) for c in command.split()])
replypat = [int(c, 16) if HEX_CODE.match(c.replace('??', '-1')) else ord(c) for c in command.split()]
reply = self.communicate(cmdbytes, len(replypat))
if any(b != c and c != -1 for b, c in zip(reply, replypat)):
self.closeConnection()
raise CommunicationFailedError('bad response: %r does not match %r' % (command, match))
@Command((BLOBType(), IntRange(0)), result=BLOBType())
def communicate(self, command, nbytes):
"""send a command and receive nbytes as reply"""
if not self.is_connected:
self.read_is_connected() # try to reconnect
if not self._conn:
raise CommunicationSilentError('can not connect to %r' % self.uri)
try:
with self._lock:
# read garbage and wait before send
try:
if self.wait_before:
time.sleep(self.wait_before)
garbage = self._conn.flush_recv()
if garbage:
self.log.debug('garbage: %r', garbage)
self._conn.send(command)
self.log.debug('send: %r', command)
reply = self._conn.readbytes(nbytes, self.timeout)
except ConnectionClosed:
self.closeConnection()
raise CommunicationFailedError('disconnected')
self.log.debug('recv: %r', reply)
return reply
except Exception as e:
if str(e) == self._last_error:
raise CommunicationSilentError(str(e))
self._last_error = str(e)
self.log.error(self._last_error)
raise