added first control box
- added python display daemon - on control boxes, the uplink is the leftmost plug by default
This commit is contained in:
234
display.py
Normal file
234
display.py
Normal file
@ -0,0 +1,234 @@
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import serial
|
||||
import socket
|
||||
import threading
|
||||
|
||||
ESC = 0x1b
|
||||
ESCESC = bytes((ESC, ESC))
|
||||
|
||||
MODE_GRAPH = 0x20
|
||||
MODE_CONSOLE = 0x21
|
||||
|
||||
SET_POS = 0x30
|
||||
SET_FONT = 0x31
|
||||
SET_COLOR = 0x32
|
||||
|
||||
CLEAR = 0x40
|
||||
LINES = 0x41
|
||||
RECT = 0x42
|
||||
ICON = 0x43
|
||||
TEXT = 0x44
|
||||
COPYRECT = 0x45
|
||||
PLOT = 0x46
|
||||
|
||||
TOUCH = 0x50
|
||||
TOUCH_MODE = 0x51
|
||||
|
||||
SAVE_ATTRS = 0xa0
|
||||
SEL_ATTRS = 0xc0
|
||||
|
||||
IDENT = 0xf3
|
||||
|
||||
FONT_GEO = [(6, 8), (8, 16), (20, 40)]
|
||||
|
||||
TOPGAP = 8 # upper part of display is not useable
|
||||
HEIGHT = 120
|
||||
WIDTH = 480
|
||||
|
||||
|
||||
def xy(x, y):
|
||||
x = min(480, int(x))
|
||||
return bytes([(min(127, y + TOPGAP) << 1) + (x >> 8), x % 256])
|
||||
|
||||
|
||||
class Display:
|
||||
fg = 15
|
||||
bg = 0
|
||||
touch = 0
|
||||
thread = None
|
||||
bar = None
|
||||
blinkchar = ' '
|
||||
menu = None
|
||||
|
||||
def __init__(self, dev='/dev/ttyS1', timeout=0.5):
|
||||
self.event = threading.Event()
|
||||
self.term = serial.Serial(dev, baudrate=115200, timeout=timeout)
|
||||
self.gethost(False)
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.send(MODE_GRAPH)
|
||||
self.font(2)
|
||||
self.send(CLEAR, self.bg)
|
||||
|
||||
def start(self):
|
||||
if self.thread is None or not self.thread.is_alive():
|
||||
self.thread = threading.Thread(target=self.gethostthread)
|
||||
self.thread.daemon = True
|
||||
self.thread.start()
|
||||
self.event.wait(1) # wait for thread to be started
|
||||
print('refresh')
|
||||
self.refresh()
|
||||
|
||||
def color(self, fg=15, bg=0):
|
||||
self.fg = fg
|
||||
self.bg = bg
|
||||
|
||||
def send(self, code, *args):
|
||||
out = [code]
|
||||
for arg in args:
|
||||
if isinstance(arg, bytes):
|
||||
out.extend(list(arg))
|
||||
elif isinstance(arg, str):
|
||||
out.append(arg.encode('latin-1'))
|
||||
else:
|
||||
out.append(arg)
|
||||
self.term.write(bytes([ESC,ESC,len(out)] + out))
|
||||
|
||||
def version(self):
|
||||
self.term.write(bytes([ESC,ESC,1,0xf3]))
|
||||
reply = self.term.read(4)
|
||||
assert reply[0:2] == ESCESC
|
||||
return self.term.read(reply[2]-1)
|
||||
|
||||
def font(self, size):
|
||||
self.fontsize = size
|
||||
self.send(SET_FONT, size)
|
||||
self.colwid, self.rowhei = FONT_GEO[self.fontsize]
|
||||
self.nrows = HEIGHT // self.rowhei
|
||||
self.ncols = WIDTH // self.colwid
|
||||
self.textbuffer = [" " * self.ncols] * self.nrows
|
||||
self.send(SET_COLOR, self.bg, self.bg, self.fg, self.fg)
|
||||
|
||||
def text(self, text, row=0, left=0, right=None, font=2):
|
||||
if font != self.fontsize:
|
||||
self.font(font)
|
||||
if right is None:
|
||||
right = self.ncols
|
||||
if right < 0:
|
||||
right += self.ncols
|
||||
if left < 0:
|
||||
left += self.ncols
|
||||
for line in text.split('\n'):
|
||||
padded = line + " " * (right - left - len(line))
|
||||
self.send(SET_POS, xy(left * self.colwid, TOPGAP + row * self.rowhei))
|
||||
self.send(TEXT, padded.encode('latin-1'))
|
||||
|
||||
def show(self, *args):
|
||||
self.send(CLEAR, self.bg)
|
||||
if len(args) == 1:
|
||||
self.text(args[0], 1)
|
||||
else:
|
||||
for row, line in enumerate(args):
|
||||
if row < 3:
|
||||
self.text(line, row)
|
||||
|
||||
def gethost(self, truehostname):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
try:
|
||||
s.connect(('8.8.8.8', 80)) # 8.8.8.8: google DNS
|
||||
self.hostip = s.getsockname()[0]
|
||||
except Exception as e:
|
||||
self.hostip = ''
|
||||
if self.hostip and truehostname:
|
||||
self.hostname = socket.gethostbyaddr(self.hostip)[0]
|
||||
else:
|
||||
self.hostname = socket.gethostname()
|
||||
|
||||
def gethostthread(self):
|
||||
self.gethost(True)
|
||||
self.event.set()
|
||||
time.sleep(1)
|
||||
while True:
|
||||
self.event.clear()
|
||||
self.gethost(True)
|
||||
self.event.wait(1)
|
||||
|
||||
def set_touch(self, on):
|
||||
self.send(TOUCH_MODE, on)
|
||||
self.touch = on
|
||||
|
||||
def gettouch(self):
|
||||
if not self.touch:
|
||||
self.set_touch(1)
|
||||
while True:
|
||||
ch2 = self.term.read(2)
|
||||
while ch2 != ESCESC:
|
||||
if len(ch2) < 2:
|
||||
return
|
||||
ch2 = ch2[1:2] + self.term.read(1)
|
||||
length = self.term.read(1)
|
||||
if not length:
|
||||
return
|
||||
data = self.term.read(length[0])
|
||||
if len(data) < length[0]:
|
||||
return
|
||||
if data[0] == TOUCH and len(data) == 3:
|
||||
x = (data[1] % 2) * 256 + data[2]
|
||||
print(x)
|
||||
return x
|
||||
print('skipped', data)
|
||||
|
||||
def menu_reboot(self, x):
|
||||
if x is None:
|
||||
return
|
||||
if x < 120:
|
||||
self.show('reboot ...')
|
||||
os.system('reboot now')
|
||||
else:
|
||||
self.std_display()
|
||||
|
||||
def menu_shutdown(self, x):
|
||||
if x is None:
|
||||
return
|
||||
if x < 120:
|
||||
self.show('shutdown ...')
|
||||
os.system('shutdown now')
|
||||
else:
|
||||
self.std_display()
|
||||
|
||||
def menu_main(self, x):
|
||||
if x is None:
|
||||
return
|
||||
print(x)
|
||||
if x < 160:
|
||||
self.std_display()
|
||||
elif x < 320:
|
||||
self.menu = self.menu_reboot
|
||||
self.show('reboot?', '( OK ) (cancel)')
|
||||
else:
|
||||
self.menu = self.menu_shutdown
|
||||
self.show('shutdown?', '( OK ) (cancel)')
|
||||
|
||||
def std_display(self):
|
||||
self.menu = self.net_display
|
||||
self.net_display(None)
|
||||
|
||||
def net_display(self, x):
|
||||
if x is None:
|
||||
hostip = self.hostip or 'no network.'
|
||||
self.text(hostip.replace('.', self.blinkchar, 1), 0, 0, 15)
|
||||
self.text(self.hostname, 1)
|
||||
self.event.set()
|
||||
self.blinkchar = ' ' if self.blinkchar == '.' else '.'
|
||||
else:
|
||||
self.menu = self.menu_main
|
||||
self.show('(cancel)(reboot)(shdown)')
|
||||
|
||||
def refresh(self):
|
||||
func = self.menu or self.net_display
|
||||
func(self.gettouch())
|
||||
|
||||
def bootmode(self):
|
||||
self.send(0xf0, 0xcb, 0xef, 0x20, 0x18)
|
||||
|
||||
|
||||
d = Display()
|
||||
|
||||
if len(sys.argv) == 1:
|
||||
d.start()
|
||||
while True:
|
||||
d.refresh()
|
||||
|
Reference in New Issue
Block a user