add files and simulated modes
This commit is contained in:
@@ -1,4 +1,10 @@
|
||||
#!/usr/bin/env python
|
||||
# *-----------------------------------------------------------------------*
|
||||
# | |
|
||||
# | Copyright (c) 2022 by Paul Scherrer Institute (http://www.psi.ch) |
|
||||
# | Based on Zac great first implementation |
|
||||
# | Author Thierry Zamofing (thierry.zamofing@psi.ch) |
|
||||
# *-----------------------------------------------------------------------*
|
||||
|
||||
import logging
|
||||
_log=logging.getLogger(__name__)
|
||||
@@ -39,7 +45,11 @@ all = 0xff
|
||||
|
||||
class IlluminationControl(object):
|
||||
|
||||
def __init__(self,hostname: str,port: int):
|
||||
def __init__(self,hostname: str="129.129.221.92",port: int=1003):
|
||||
if hostname is None: #simulated mode
|
||||
self._sim={'stat':0}
|
||||
_log.info('simulated mode:{}'.format(self._sim))
|
||||
return
|
||||
self._hostname = hostname
|
||||
self._port = port
|
||||
self.connect()
|
||||
@@ -59,14 +69,15 @@ class IlluminationControl(object):
|
||||
|
||||
def is_on(self, led_flags: int) -> bool:
|
||||
"""check status for light state, see CONSTANTS STATUS_BITS_*"""
|
||||
try:
|
||||
s = self.status()[0]
|
||||
except:
|
||||
print("error reading status")
|
||||
return False
|
||||
s = self.status()
|
||||
return bool(s & led_flags)
|
||||
|
||||
def send_command(self, cmd: bytes):
|
||||
try:
|
||||
s=self._socket
|
||||
except AttributeError: #simulated mode
|
||||
_log.info('simulated mode:{}'.format(cmd))
|
||||
return
|
||||
try:
|
||||
self._socket.sendall(cmd)
|
||||
except (BrokenPipeError, OSError) as e:
|
||||
@@ -75,9 +86,14 @@ class IlluminationControl(object):
|
||||
try:
|
||||
self._socket.sendall(cmd)
|
||||
except Exception as e:
|
||||
raise IlluminationControlException("unrecoverable socket error: {}".format(e.args))
|
||||
_log.error("unrecoverable socket error: {}".format(e.args))
|
||||
|
||||
def status(self):
|
||||
try:
|
||||
s=self._socket
|
||||
except AttributeError: #simulated mode
|
||||
_log.info('simulated mode')
|
||||
return self._sim['stat']
|
||||
self.send_command(bytes((stat,all)))
|
||||
resp = self._socket.recv(1024)
|
||||
return resp[0]&0x1f
|
||||
|
||||
Reference in New Issue
Block a user