#!/usr/bin/env python3 import sys import os import time import platform import re from select import select from subprocess import check_output, STDOUT, PIPE, Popen IMAGES_HOST = 'l_samenv@linse-c' IMAGES_DIR = 'boxes/%s_images' IMAGES_PAT = '*.lz4' EMMC = {'rpi', 'ionopimax'} # write over USB, needs rpiboot. else SD card is assumed def prt(text): print(time.strftime('%H:%M:%S'), text) def cmd(command, verbose=False): if verbose: print(f'> {command}') return check_output(command.split(), stderr=STDOUT).decode('latin-1') class HostSystem: def __init__(self, kind): self.extdisks = {} self.kind = kind self.imgdir = IMAGES_DIR % self.kind def list_external(self): print('--- external disks:') for disk, info in self.extdisks.items(): print(disk, info) class Linux(HostSystem): rescue = """ Remark: not all cables might work. Example on Markus Mac: - does not work: black USB C to USB C cable - works: white USB A to USB C cable plugged into docking station. Please unpower the box with compute module, start this script again and put power immediately.""" def get_disks(self): return [line for line in cmd('lsblk -d').split('\n') if line] def rpiboot(self): lines = [] with Popen(['sudo', './rpiboot', '-d', 'mass-storage-gadget64'], stdout=PIPE) as p: tmo = 1 t = 0 while select([p.stdout], [], [], tmo)[0]: rawline = p.stdout.readline() if not rawline: return tmo != 1 line = rawline.decode('latin-1').strip() print(line) if lines is None: pass # print(line) elif 'Waiting' in line: tmo = 30 lines.append(line) t = time.time() elif tmo == 1: lines.append(line) else: print('\n'.join(lines)) print(line) lines = None else: p.terminate() return False def find_raspi(self): input('make sure device is not connected [confirm]') prevset = set(self.get_disks()) input('now connect [confirm]') disks = self.wait_disks(prevset, 5) if not disks: print('can not yet detect disk, try to run rpiboot') print('if is blocked at "Waiting for BCM...", turn off and on power of target rpi') # cmd('sudo ./rpiboot -d mass-storage-gadget64') self.rpiboot() disks = self.wait_disks(prevset, 15) if not disks: raise RuntimeError('raspi disk does not appear') return disks def wait_disks(self, prevset, delay): for i in range(int(delay)): disks = self.get_disks() diskset = set(disks) if diskset != prevset: print('\n'.join(disks)) if i: print('') return diskset - prevset print('.', end='') time.sleep(1) return None def write_image(self): proposed = [v.split()[0] for v in self.find_raspi()] self.list_external() if len(proposed) == 1: dev = proposed[0] elif len(proposed) > 1: if self.kind in EMMC: print('several potential devices:', proposed) return dev = input('select device to write to {" ".join(proposed)}: ') else: print('not found') return images = check_output(['sudo', '-u', 'l_samenv', 'ssh', IMAGES_HOST, f'cd {self.imgdir} ; ls {IMAGES_PAT}'] ).decode('latin-1').split('\n') file = 'image' for file in sorted(images): print(file) print('') image = input(f'select image to write to {dev} [{file}]: ') or file prt(f'unmount {dev} ...') print('you may be prompted for your account password for sudo') os.system(f'sudo umount /dev/{dev}') print('') prt('get, uncompress and write... (takes about 15 min)') os.system(f'ssh {IMAGES_HOST} "dd if={self.imgdir}/{image} bs=512k" | lz4 -d | sudo dd of=/dev/{dev} bs=512k') prt('unmount bootfs') for _ in range(3): if os.path.exists('/Volumes/bootfs'): prt('unmount bootfs') os.system('sudo umount bootfs') break time.sleep(1.0) prt('done') os.system(f'sudo umount /dev/{dev}') class Darwin(HostSystem): rescue = """ Remark: not all cables might work. Example on Markus Mac: - does not work: black USB C to USB C cable - works: white USB A to USB C cable plugged into docking station. Please unpower the box with compute module, start this script again and put power immediately.""" def rpiboot(self): if os.path.exists('/Volumes/bootfs'): prt('unmount bootfs') os.system('diskutil unmount bootfs') lines = [] with Popen(['rpiboot'], stdout=PIPE) as p: tmo = 1 t = 0 while select([p.stdout], [], [], tmo)[0]: rawline = p.stdout.readline() if not rawline: return tmo != 1 line = rawline.decode('latin-1').strip() if lines is None: print(line) elif 'Waiting' in line: tmo = 2 lines.append(line) t = time.time() elif tmo == 1: lines.append(line) else: print('\n'.join(lines)) print(line) lines = None else: p.terminate() return False def find_raspi(self): # collect info about disks dlist = '\n' + cmd('/usr/sbin/diskutil list') self.extdisks.clear() proposed = [] noname = re.compile(r' +0: +\*\d+\.?\d* GB +disk') for dinfo in dlist.split('\n/dev/'): if not dinfo: continue disk, info = dinfo.split(maxsplit=1) if 'external' in info.split('\n')[0]: self.extdisks[disk] = info if self.kind in EMMC: if 'Windows_FAT_32 bootfs' in info and 'Linux' in info: proposed.append(disk) elif noname.search(info): proposed.append(disk) elif self.kind == 'ionopi': if 'Apple' not in info: proposed.append(disk) return proposed def write_image(self): if self.kind in EMMC: print("\nmake sure microUSB 'SLAVE' is connected to this computer") print("and microUSB 'POWER' is connected to a USB power supply\n") prt('try to find CM (compute module on PoE device) ...') else: pass proposed = self.find_raspi() if not proposed: if self.kind in EMMC: # we may install rpiboot here if not available # git clone --depth=1 https://github.com/raspberrypi/usbboot # cd usbboot # make # ./rpiboot print('not found - put rpi in bootloader mode') if not self.rpiboot() and not self.rpiboot(): print(self.rescue) return for _ in range(10): proposed = self.find_raspi() if proposed: break time.sleep(0.1) else: self.list_external() print('can not find CM') return self.list_external() if len(proposed) == 1: dev = proposed[0] elif len(proposed) > 1: if self.kind in EMMC: print('several potential devices for CM:', proposed) return dev = input('select device to write to {" ".join(proposed)}: ') else: print('none of above disks seems to be the sd card') return images = check_output( ['sudo', '-u', 'zolliker', 'ssh', IMAGES_HOST, f'cd {self.imgdir} ; ls {IMAGES_PAT}']).decode( 'latin-1').split('\n') file = 'image' for file in sorted(images): print(file) print('') image = input(f'select image to write to {dev} [{file}]: ') or file prt(f'unmount {dev} ...') print('you may be prompted for your account password for sudo') os.system(f'sudo diskutil unmountDisk /dev/{dev}') print('') prt('get, uncompress and write... (takes about 15 min)') os.system(f'ssh {IMAGES_HOST} "dd if={self.imgdir}/{image} bs=512k" | lz4 -d | sudo dd of=/dev/{dev} bs=512k') prt('unmount bootfs') for _ in range(3): if os.path.exists('/Volumes/bootfs'): prt('unmount bootfs') os.system('diskutil unmount bootfs') break time.sleep(1.0) prt('done') os.system(f'diskutil unmountDisk /dev/{dev}') if len(sys.argv) == 2: cls = locals()[platform.system()] writer = cls(sys.argv[1]) writer.write_image() else: print(""" Usage: > write_to rpi # write to compute module 4 in a dual-eth raspi" > write_to ionopimax # write to compute module 3 using PoE board for a iono pi max" > write_to ionopi # write to sd card for ionopi """)