diff --git a/csaxs_bec/bec_ipython_client/plugins/LamNI/LamNI_webpage_generator.py b/csaxs_bec/bec_ipython_client/plugins/LamNI/LamNI_webpage_generator.py new file mode 100644 index 0000000..a77e77c --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/LamNI/LamNI_webpage_generator.py @@ -0,0 +1,89 @@ +""" +LamNI/webpage_generator.py +=========================== +LamNI-specific webpage generator subclass. + +Integration (inside the LamNI __init__ / startup): +--------------------------------------------------- + from csaxs_bec.bec_ipython_client.plugins.LamNI.webpage_generator import ( + LamniWebpageGenerator, + ) + self._webpage_gen = LamniWebpageGenerator( + bec_client=client, + output_dir="~/data/raw/webpage/", + ) + self._webpage_gen.start() + +Or use the factory (auto-selects by session name "lamni"): +---------------------------------------------------------- + from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( + make_webpage_generator, + ) + self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/") + self._webpage_gen.start() + +Interactive helpers: +-------------------- + lamni._webpage_gen.status() + lamni._webpage_gen.verbosity = 2 + lamni._webpage_gen.stop() + lamni._webpage_gen.start() +""" + +from pathlib import Path + +from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( + WebpageGeneratorBase, + _safe_get, + _safe_float, + _gvar, +) + + +class LamniWebpageGenerator(WebpageGeneratorBase): + """ + LamNI-specific webpage generator. + Logo: LamNI.png from the same directory as this file. + + Override _collect_setup_data() to add LamNI-specific temperatures, + sample name, and measurement settings. + """ + + # TODO: fill in LamNI-specific device paths + # label -> dotpath under device_manager.devices + _TEMP_MAP = { + # "Sample": "lamni_temphum.temperature_sample", + # "OSA": "lamni_temphum.temperature_osa", + } + + def _logo_path(self): + return Path(__file__).parent / "LamNI.png" + + def _collect_setup_data(self) -> dict: + # ── LamNI-specific data goes here ───────────────────────────── + # Uncomment and adapt when device names are known: + # + # dm = self._bec.device_manager + # sample_name = _safe_get(dm, "lamni_samples.sample_names.sample0") or "N/A" + # temperatures = { + # label: _safe_float(_safe_get(dm, path)) + # for label, path in self._TEMP_MAP.items() + # } + # settings = { + # "Sample name": sample_name, + # "FOV x / y": ..., + # "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"), + # "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"), + # } + # return { + # "type": "lamni", + # "sample_name": sample_name, + # "temperatures": temperatures, + # "settings": settings, + # } + + # Placeholder — returns minimal info until implemented + return { + "type": "lamni", + # LamNI-specific data here + } diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py index 16f2637..c1d7a4c 100644 --- a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py +++ b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py @@ -21,13 +21,13 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import ( TomoIDManager, ) -from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( - WebpageGenerator, - VERBOSITY_SILENT, # 0 — no output - VERBOSITY_NORMAL, # 1 — startup / stop messages only (default) - VERBOSITY_VERBOSE, # 2 — one-line summary per cycle - VERBOSITY_DEBUG, # 3 — full JSON payload per cycle - ) +# from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( +# FlomniWebpageGenerator, +# VERBOSITY_SILENT, # 0 — no output +# VERBOSITY_NORMAL, # 1 — startup / stop messages only (default) +# VERBOSITY_VERBOSE, # 2 — one-line summary per cycle +# VERBOSITY_DEBUG, # 3 — full JSON payload per cycle +# ) logger = bec_logger.logger @@ -1311,12 +1311,15 @@ class Flomni( self.corr_angle_y_2 = [] self._progress_proxy = _ProgressProxy(self.client) self._progress_proxy.reset() - self._webpage_gen = WebpageGenerator( - bec_client=client, - output_dir="~/data/raw/webpage/", # adjust to your staging path - verbosity=VERBOSITY_NORMAL, + from csaxs_bec.bec_ipython_client.plugins.flomni.flomni_webpage_generator import ( + FlomniWebpageGenerator, ) - self._webpage_gen.start() + self._webpage_gen = FlomniWebpageGenerator( + bec_client=client, + output_dir="~/data/raw/webpage/", + ) + self._webpage_gen.start() + self.OMNYTools = OMNYTools(self.client) self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername) self.tomo_id_manager = TomoIDManager() diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni_webpage_generator.py b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni_webpage_generator.py new file mode 100644 index 0000000..6fdccab --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni_webpage_generator.py @@ -0,0 +1,1226 @@ +""" +flomni/webpage_generator.py +============================ +Background thread that reads tomo progress from the BEC global variable store +and writes status.json (every cycle) + status.html (once at startup) to a +staging directory. A separate upload process copies those files to the web host. + +Architecture +------------ +WebpageGeneratorBase -- all common logic: queue, progress, idle detection, + reconstruction queue, HTML, audio, phone numbers, + outdated-page warning. Import this in subclasses. +FlomniWebpageGenerator -- flomni-specific: temperatures, sample name, settings, + flOMNI logo. +make_webpage_generator() -- factory; selects the right class by session name. + Lazy-imports LamNI / omny subclasses to avoid + circular dependencies. + +Integration (inside Flomni.__init__, after self._progress_proxy.reset()): +-------------------------------------------------------------------------- + from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( + FlomniWebpageGenerator, + ) + self._webpage_gen = FlomniWebpageGenerator( + bec_client=client, + output_dir="~/data/raw/webpage/", + ) + self._webpage_gen.start() + +Interactive helpers (optional, in the iPython session): +------------------------------------------------------- + flomni._webpage_gen.status() # print current status + flomni._webpage_gen.verbosity = 2 # VERBOSE: one-line summary per cycle + flomni._webpage_gen.verbosity = 3 # DEBUG: full JSON per cycle + flomni._webpage_gen.stop() # release lock + flomni._webpage_gen.start() # restart after stop() +""" + +import datetime +import json +import os +import shutil +import socket +import threading +import time +from pathlib import Path + +from bec_lib import bec_logger + +logger = bec_logger.logger + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +_LOCK_VAR_KEY = "webpage_generator_lock" +_LOCK_STALE_AFTER_S = 45 +_CYCLE_INTERVAL_S = 15 +_TOMO_HEARTBEAT_STALE_S = 90 +_IDLE_SHORT_WINDOW_S = 300 # 5 min → switch to idle_long + audio warning + +VERBOSITY_SILENT = 0 +VERBOSITY_NORMAL = 1 +VERBOSITY_VERBOSE = 2 +VERBOSITY_DEBUG = 3 + +_PHONE_NUMBERS = [ + ("Beamline", "+41 56 310 5845"), + ("Beamline mobile", "+41 56 310 5844"), + ("Local contact", "+41 79 343 9230"), + ("Control room", "+41 56 310 5503"), +] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _now_iso() -> str: + return datetime.datetime.now().isoformat(timespec="seconds") + +def _epoch() -> float: + return time.time() + +def _heartbeat_age_s(iso_str) -> float: + if iso_str is None: + return float("inf") + try: + ts = datetime.datetime.fromisoformat(iso_str) + return (datetime.datetime.now() - ts).total_seconds() + except Exception: + return float("inf") + +def _format_duration(seconds) -> str: + if seconds is None: + return "N/A" + try: + seconds = int(float(seconds)) + except (TypeError, ValueError): + return "N/A" + h, remainder = divmod(seconds, 3600) + m, s = divmod(remainder, 60) + if h > 0: + return f"{h}h {m:02d}m {s:02d}s" + if m > 0: + return f"{m}m {s:02d}s" + return f"{s}s" + +def _check_account_match(bec_client) -> bool: + try: + active = bec_client.active_account + system_user = os.getenv("USER") or os.getlogin() + return active[1:] == system_user[1:] + except Exception: + return True + +def _safe_get(device_manager, dotpath: str): + """Navigate a dotted device path and call .get(), returning None on any error.""" + try: + obj = device_manager.devices + for part in dotpath.split("."): + obj = getattr(obj, part) + return obj.get() + except Exception: + return None + +def _safe_float(val, ndigits: int = 2): + try: + return round(float(val), ndigits) + except (TypeError, ValueError): + return None + +def _gvar(bec_client, key, fmt=None, suffix=""): + val = bec_client.get_global_var(key) + if fmt is None: + return val + if val is None: + return "N/A" + try: + return f"{val:{fmt}}{suffix}" + except Exception: + return str(val) + + +# --------------------------------------------------------------------------- +# Status derivation +# --------------------------------------------------------------------------- + +def _derive_status(progress: dict, queue_has_active_scan: bool, idle_since) -> str: + """ + Returns one of: + scanning -- tomo heartbeat fresh + running -- queue has active scan, outside tomo heartbeat window + idle_short -- idle < _IDLE_SHORT_WINDOW_S + idle_long -- idle >= _IDLE_SHORT_WINDOW_S (triggers audio warning) + unknown -- no information yet + """ + hb_age = _heartbeat_age_s(progress.get("heartbeat")) + if hb_age < _TOMO_HEARTBEAT_STALE_S: + return "scanning" + if queue_has_active_scan: + return "running" + if idle_since is not None: + return "idle_short" if (_epoch() - idle_since) < _IDLE_SHORT_WINDOW_S else "idle_long" + return "unknown" + + +# --------------------------------------------------------------------------- +# Base generator +# --------------------------------------------------------------------------- + +class WebpageGeneratorBase: + """ + Common webpage generator. Subclass and override: + _collect_setup_data() -- return dict of instrument-specific data + _logo_path() -- return Path to logo PNG, or None for text fallback + """ + + def __init__( + self, + bec_client, + output_dir: str = "~/data/raw/webpage/", + cycle_interval: float = _CYCLE_INTERVAL_S, + verbosity: int = VERBOSITY_NORMAL, + ): + self._bec = bec_client + self._output_dir = Path(output_dir).expanduser().resolve() + self._cycle_interval = cycle_interval + self._verbosity = verbosity + + self._thread = None + self._stop_event = threading.Event() + self._idle_since = None + self._last_queue_id = None # tracks queue history changes between cycles + self._owner_id = f"{socket.gethostname()}:{os.getpid()}" + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def start(self) -> bool: + """Start the generator thread if this session wins the singleton lock.""" + if not _check_account_match(self._bec): + self._log(VERBOSITY_NORMAL, + "WebpageGenerator: BEC account does not match system user. " + "Not starting.", level="warning") + return False + + if self._thread is not None and self._thread.is_alive(): + self._log(VERBOSITY_NORMAL, "WebpageGenerator already running in this session.") + return True + + if not self._acquire_lock(): + return False + + self._output_dir.mkdir(parents=True, exist_ok=True) + + # Copy logo once at startup — HTML is also written once here, + # not every cycle, since it is a static shell that only loads status.json. + self._copy_logo() + (self._output_dir / "status.html").write_text(_render_html(_PHONE_NUMBERS)) + + self._stop_event.clear() + self._thread = threading.Thread( + target=self._run, name="WebpageGenerator", daemon=True + ) + self._thread.start() + self._log(VERBOSITY_NORMAL, + f"WebpageGenerator started owner={self._owner_id} " + f"output={self._output_dir} interval={self._cycle_interval}s") + return True + + def stop(self) -> None: + """Stop the generator thread and release the singleton lock.""" + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=self._cycle_interval + 5) + self._release_lock() + self._log(VERBOSITY_NORMAL, "WebpageGenerator stopped.") + + @property + def verbosity(self) -> int: + return self._verbosity + + @verbosity.setter + def verbosity(self, val: int) -> None: + self._verbosity = val + self._log(VERBOSITY_NORMAL, f"WebpageGenerator verbosity -> {val}") + + def status(self) -> None: + """Print a human-readable status summary to the console.""" + lock = self._read_lock() + running = self._thread is not None and self._thread.is_alive() + print( + f"WebpageGenerator\n" + f" This session running : {running}\n" + f" Lock owner : {lock.get('owner_id', 'none')}\n" + f" Lock heartbeat : {lock.get('heartbeat', 'never')}\n" + f" Output dir : {self._output_dir}\n" + f" Cycle interval : {self._cycle_interval}s\n" + f" Verbosity : {self._verbosity}\n" + ) + + # ------------------------------------------------------------------ + # Logo + # ------------------------------------------------------------------ + + def _logo_path(self): + """ + Return a Path to the logo PNG for this setup, or None for text fallback. + Override in subclasses. + """ + return None + + def _copy_logo(self) -> None: + """Copy the setup logo to output_dir/logo.png if available.""" + src = self._logo_path() + if src is None: + return + src = Path(src) + if not src.exists(): + self._log(VERBOSITY_VERBOSE, + f"Logo not found at {src}, using text fallback.", level="warning") + return + try: + shutil.copy2(src, self._output_dir / "logo.png") + self._log(VERBOSITY_VERBOSE, f"Logo copied from {src}") + except Exception as exc: + self._log(VERBOSITY_NORMAL, + f"Failed to copy logo: {exc}", level="warning") + + # ------------------------------------------------------------------ + # Singleton lock + # ------------------------------------------------------------------ + + def _acquire_lock(self) -> bool: + lock = self._read_lock() + if lock: + age = _heartbeat_age_s(lock.get("heartbeat")) + if age < _LOCK_STALE_AFTER_S: + self._log(VERBOSITY_NORMAL, + f"WebpageGenerator already owned by '{lock.get('owner_id')}' " + f"({age:.0f}s ago). Not starting.") + return False + self._log(VERBOSITY_NORMAL, + f"Stale lock (owner: '{lock.get('owner_id')}', {age:.0f}s ago). Taking over.") + self._write_lock() + return True + + def _write_lock(self) -> None: + self._bec.set_global_var(_LOCK_VAR_KEY, { + "owner_id": self._owner_id, + "heartbeat": _now_iso(), + "pid": os.getpid(), + "hostname": socket.gethostname(), + }) + + def _read_lock(self) -> dict: + val = self._bec.get_global_var(_LOCK_VAR_KEY) + return val if isinstance(val, dict) else {} + + def _release_lock(self) -> None: + lock = self._read_lock() + if lock.get("owner_id") == self._owner_id: + self._bec.delete_global_var(_LOCK_VAR_KEY) + + # ------------------------------------------------------------------ + # Main loop + # ------------------------------------------------------------------ + + def _run(self) -> None: + while not self._stop_event.is_set(): + t0 = _epoch() + try: + self._cycle() + except Exception as exc: + self._log(VERBOSITY_NORMAL, + f"WebpageGenerator cycle error: {exc}", level="warning") + try: + self._write_lock() + except Exception: + pass + self._stop_event.wait(max(0.0, self._cycle_interval - (_epoch() - t0))) + + def _cycle(self) -> None: + """One generator cycle: read state -> derive status -> write status.json.""" + + # ── Progress ───────────────────────────────────────────────── + progress = self._bec.get_global_var("tomo_progress") or {} + + # ── Queue ──────────────────────────────────────────────────── + # queue_status is always 'RUNNING' while BEC is alive. + # A scan is actually executing only when info is non-empty AND + # active_request_block is set on the first entry. + try: + qi = self._bec.queue.queue_storage.current_scan_queue + primary = qi.get("primary") + queue_status = primary.status if primary is not None else "unknown" + queue_has_active_scan = ( + primary is not None + and len(primary.info) > 0 + and primary.info[0].active_request_block is not None + ) + except Exception: + queue_status = "unknown" + queue_has_active_scan = False + + # ── Idle tracking ──────────────────────────────────────────── + hb_age = _heartbeat_age_s(progress.get("heartbeat")) + tomo_active = hb_age < _TOMO_HEARTBEAT_STALE_S + + # Detect scans that start and finish entirely between two polls + # by watching for a new queue_id at the top of history. + try: + history = self._bec.queue.queue_storage.queue_history + latest_queue_id = history[0].info.queue_id if history else None + except Exception: + latest_queue_id = None + + history_changed = ( + latest_queue_id is not None + and latest_queue_id != self._last_queue_id + ) + self._last_queue_id = latest_queue_id + + if tomo_active or queue_has_active_scan or history_changed: + self._idle_since = None + elif self._idle_since is None: + self._idle_since = _epoch() + + exp_status = _derive_status(progress, queue_has_active_scan, self._idle_since) + idle_for_s = None if self._idle_since is None else (_epoch() - self._idle_since) + + # ── Reconstruction queue ────────────────────────────────────── + recon = self._collect_recon_data() + + # ── Setup-specific data (subclass hook) ─────────────────────── + setup = {} + try: + setup = self._collect_setup_data() + except Exception as exc: + self._log(VERBOSITY_VERBOSE, f"setup data error: {exc}", level="warning") + + # ── Payload ─────────────────────────────────────────────────── + payload = { + "generated_at": _now_iso(), + "generated_at_epoch": _epoch(), + "experiment_status": exp_status, + "queue_status": queue_status, + "queue_has_active_scan": queue_has_active_scan, + "idle_for_s": idle_for_s, + "idle_for_human": _format_duration(idle_for_s), + "progress": { + "tomo_type": progress.get("tomo_type", "N/A"), + "projection": progress.get("projection", 0), + "total_projections": progress.get("total_projections", 0), + "subtomo": progress.get("subtomo", 0), + "subtomo_projection": progress.get("subtomo_projection", 0), + "subtomo_total_projections": progress.get("subtomo_total_projections", 1), + "angle": progress.get("angle", 0), + "tomo_start_time": progress.get("tomo_start_time"), + "estimated_remaining_s": progress.get("estimated_remaining_time"), + "estimated_remaining_human": _format_duration( + progress.get("estimated_remaining_time") + ), + "tomo_heartbeat": progress.get("heartbeat"), + "tomo_heartbeat_age_s": round(hb_age, 1) if hb_age != float("inf") else None, + }, + "recon": recon, + "setup": setup, + "generator": { + "owner_id": self._owner_id, + "cycle_interval_s": self._cycle_interval, + }, + } + + # ── Write status.json only (HTML is static, written once at start) ── + (self._output_dir / "status.json").write_text( + json.dumps(payload, indent=2, default=str) + ) + + # ── Console feedback ────────────────────────────────────────── + self._log(VERBOSITY_VERBOSE, + f"[{_now_iso()}] {exp_status:<12} active={queue_has_active_scan} " + f"proj={payload['progress']['projection']}/" + f"{payload['progress']['total_projections']} " + f"hb={payload['progress']['tomo_heartbeat_age_s']}s " + f"idle={_format_duration(idle_for_s)}") + self._log(VERBOSITY_DEBUG, + f" payload:\n{json.dumps(payload, indent=4, default=str)}") + + # ------------------------------------------------------------------ + # Reconstruction queue + # ------------------------------------------------------------------ + + def _collect_recon_data(self) -> dict: + foldername = ( + self._bec.get_global_var("ptycho_reconstruct_foldername") or "ptycho_reconstruct" + ) + base = Path("~/data/raw/logs/reconstruction_queue").expanduser() + queue_dir = base / foldername + failed_dir = queue_dir / "failed" + + waiting = failed = 0 + try: + if queue_dir.exists(): + waiting = len(list(queue_dir.glob("*.dat"))) + except Exception: + pass + try: + if failed_dir.exists(): + failed = len(list(failed_dir.glob("*.dat"))) + except Exception: + pass + + return { + "foldername": foldername, + "folder_path": str(queue_dir), + "waiting": waiting, + "failed": failed, + } + + # ------------------------------------------------------------------ + # Subclass hooks + # ------------------------------------------------------------------ + + def _collect_setup_data(self) -> dict: + """ + Override in subclasses to return a dict of setup-specific data. + The dict is embedded in the JSON payload as 'setup' and rendered + by the HTML page as the 'Instrument details' section. + + Expected keys (all optional): + type (str) -- setup identifier, e.g. "flomni" + sample_name (str) -- current sample name + temperatures (dict) -- label -> float (degrees C) or None + settings (dict) -- label -> formatted string + """ + return {} + + # ------------------------------------------------------------------ + # Logging + # ------------------------------------------------------------------ + + def _log(self, min_verbosity: int, msg: str, level: str = "info") -> None: + if self._verbosity < min_verbosity: + return + if level == "warning": + logger.warning(msg) + elif level == "error": + logger.error(msg) + else: + print(msg) + + +# --------------------------------------------------------------------------- +# flOMNI subclass +# --------------------------------------------------------------------------- + +class FlomniWebpageGenerator(WebpageGeneratorBase): + """ + flOMNI-specific webpage generator. + Adds: temperatures, sample name, measurement settings from global vars. + Logo: flOMNI.png from the same directory as this file. + """ + + # label -> dotpath under device_manager.devices + _TEMP_MAP = { + "Heater": "flomni_temphum.temperature_heater", + "Heater setpoint": "flomni_temphum.temperature_heaterset_rb", + "OSA": "flomni_temphum.temperature_osa", + "Mirror": "flomni_temphum.temperature_mirror", + } + + def _logo_path(self): + return Path(__file__).parent / "flOMNI.png" + + def _collect_setup_data(self) -> dict: + dm = self._bec.device_manager + + # ── Sample name ─────────────────────────────────────────────── + sample_name = _safe_get(dm, "flomni_samples.sample_names.sample0") or "N/A" + + # ── Temperatures ────────────────────────────────────────────── + temperatures = { + label: _safe_float(_safe_get(dm, path)) + for label, path in self._TEMP_MAP.items() + } + + # ── Settings from global vars ───────────────────────────────── + g = self._bec + + def _fmt2(v): + try: + return f"{float(v):.2f}" + except Exception: + return "N/A" + + fovx = g.get_global_var("fovx") + fovy = g.get_global_var("fovy") + stx = g.get_global_var("stitch_x") + sty = g.get_global_var("stitch_y") + + settings = { + "Sample name": sample_name, + "FOV x / y": f"{_fmt2(fovx)} / {_fmt2(fovy)} \u00b5m", + "Step size": _gvar(g, "tomo_shellstep", ".2f", " \u00b5m"), + "Exposure time": _gvar(g, "tomo_countingtime", ".3f", " s"), + "Angle step": _gvar(g, "tomo_angle_stepsize", ".2f", "\u00b0"), + "Stitch x / y": f"{_fmt2(stx)} / {_fmt2(sty)} \u00b5m", + } + + return { + "type": "flomni", + "sample_name": sample_name, + "temperatures": temperatures, + "settings": settings, + } + + +# --------------------------------------------------------------------------- +# Factory +# --------------------------------------------------------------------------- + +def make_webpage_generator(bec_client, **kwargs): + """ + Select the appropriate WebpageGenerator subclass based on the BEC session + name (bec._ip.prompts.session_name) and return a ready-to-start instance. + + Usage (generic startup script): + from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( + make_webpage_generator, + ) + gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/") + gen.start() + """ + try: + session = bec_client._ip.prompts.session_name + except Exception: + session = "unknown" + + if session == "flomni": + return FlomniWebpageGenerator(bec_client, **kwargs) + + elif session == "lamni": + from csaxs_bec.bec_ipython_client.plugins.LamNI.webpage_generator import ( + LamniWebpageGenerator, + ) + return LamniWebpageGenerator(bec_client, **kwargs) + + elif session == "omny": + from csaxs_bec.bec_ipython_client.plugins.omny.webpage_generator import ( + OmnyWebpageGenerator, + ) + return OmnyWebpageGenerator(bec_client, **kwargs) + + else: + return WebpageGeneratorBase(bec_client, **kwargs) + + +# --------------------------------------------------------------------------- +# HTML template (written once at start(), not every cycle) +# Three themes: dark (default), light, auto (follows OS preference). +# Theme is stored in localStorage and applied via data-theme on . +# --------------------------------------------------------------------------- + +def _render_html(phone_numbers: list) -> str: + phones_html = "\n".join( + f'
' + f'{label}' + f'{num}
' + for label, num in phone_numbers + ) + + return f""" + + + + + Experiment Status + + + +
+ +
+ ⚠ Status page data is outdated — generator may have stopped +
+ +
+
+ + + · STATUS +
+
+
+ Theme + + + +
+
updating…
+
+
+ + +
+
-
+
Loading…
+
+ +
+
Tomography progress
+
+ + + + + + +
+ 0% + OVERALL +
+
+
+
Projection-
+
Sub-tomo-
+
Angle-
+
Tomo type-
+
ETA-
+
Started-
+
+
+
Sub-tomo progress-
+
+
+
+ + +
+
Reconstruction queue
+
+
Waiting-
+
Failed-
+
Queue name-
+
+
+
+ +
+
Ptychography reconstructions
+
Reconstruction images will appear here
+
+ + + + + +
+
+
+ Audio warnings: disabled +
+
+ + + +
+
+ + +
+
Contacts
+
+{phones_html} +
+
+ + + +
+ + + +""" diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/webpage_generator.py b/csaxs_bec/bec_ipython_client/plugins/flomni/webpage_generator.py deleted file mode 100644 index ee26725..0000000 --- a/csaxs_bec/bec_ipython_client/plugins/flomni/webpage_generator.py +++ /dev/null @@ -1,892 +0,0 @@ -""" -webpage_generator.py -==================== -Background thread that reads the flOMNI tomo progress from the BEC global -variable store and writes a self-contained status.json + status.html to a -configurable output directory. A separate upload process can copy those -files to the web host. - -Usage (inside Flomni.__init__, after self._progress_proxy.reset()): --------------------------------------------------------------------- - self._webpage_gen = WebpageGenerator( - bec_client=client, - output_dir="~/data/raw/webpage/", - ) - self._webpage_gen.start() - -Interactive commands (optional, in the iPython session): ---------------------------------------------------------- - flomni._webpage_gen.status() # print current status - flomni._webpage_gen.verbosity = 2 # switch to VERBOSE mid-session - flomni._webpage_gen.stop() # release lock, let another session take over - flomni._webpage_gen.start() # restart after stop() -""" - -import datetime -import json -import os -import socket -import threading -import time -from pathlib import Path - -from bec_lib import bec_logger - -logger = bec_logger.logger - -# --------------------------------------------------------------------------- -# Constants -# --------------------------------------------------------------------------- - -# BEC global-var key used as a distributed singleton lock -_LOCK_VAR_KEY = "webpage_generator_lock" - -# Heartbeat must be refreshed at least this often (seconds) or the lock -# is considered stale and another session may take over. -_LOCK_STALE_AFTER_S = 45 - -# How long between generator cycles (seconds) -_CYCLE_INTERVAL_S = 15 - -# If the tomo progress heartbeat has not been updated for this long we -# consider the tomo loop no longer actively running. -_TOMO_HEARTBEAT_STALE_S = 90 - -# After finishing normally, stay in IDLE_SHORT for this long before -# switching to IDLE_LONG (which triggers the audio warning). -_IDLE_SHORT_WINDOW_S = 300 # 5 minutes - -# Verbosity levels -VERBOSITY_SILENT = 0 # no output at all -VERBOSITY_NORMAL = 1 # startup/stop messages only -VERBOSITY_VERBOSE = 2 # each cycle summary -VERBOSITY_DEBUG = 3 # full detail each cycle - - -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- - -def _now_iso() -> str: - return datetime.datetime.now().isoformat(timespec="seconds") - - -def _epoch() -> float: - return time.time() - - -def _heartbeat_age_s(iso_str) -> float: - """Return seconds since the ISO-format heartbeat string, or infinity.""" - if iso_str is None: - return float("inf") - try: - ts = datetime.datetime.fromisoformat(iso_str) - return (datetime.datetime.now() - ts).total_seconds() - except Exception: - return float("inf") - - -def _format_duration(seconds) -> str: - if seconds is None: - return "N/A" - try: - seconds = int(float(seconds)) - except (TypeError, ValueError): - return "N/A" - h, remainder = divmod(seconds, 3600) - m, s = divmod(remainder, 60) - if h > 0: - return f"{h}h {m:02d}m {s:02d}s" - if m > 0: - return f"{m}m {s:02d}s" - return f"{s}s" - - -def _check_account_match(bec_client) -> bool: - """Return True if the BEC active account matches the system user.""" - try: - active = bec_client.active_account # e.g. "p23092" - system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092" - return active[1:] == system_user[1:] - except Exception: - return True # don't block on unknown accounts - - -# --------------------------------------------------------------------------- -# Status derivation -# --------------------------------------------------------------------------- - -def _derive_status(progress: dict, queue_has_active_scan: bool, idle_since) -> str: - """ - Derive a simple status string from available signals. - - Returns one of: - "scanning" - tomo heartbeat is fresh (tomo loop actively running) - "running" - a scan is active but outside the tomo heartbeat window - (alignment, other tasks, or brief inter-scan gap) - "idle_short" - recently finished, within IDLE_SHORT_WINDOW_S - "idle_long" - idle longer than IDLE_SHORT_WINDOW_S (trigger warning) - "unknown" - cannot determine yet - """ - hb_age = _heartbeat_age_s(progress.get("heartbeat")) - tomo_active = hb_age < _TOMO_HEARTBEAT_STALE_S - - if tomo_active: - return "scanning" - - if queue_has_active_scan: - return "running" - - if idle_since is not None: - idle_s = _epoch() - idle_since - return "idle_short" if idle_s < _IDLE_SHORT_WINDOW_S else "idle_long" - - return "unknown" - - -# --------------------------------------------------------------------------- -# Main generator class -# --------------------------------------------------------------------------- - -class WebpageGenerator: - """ - Singleton-safe background thread that generates the experiment status - page by reading BEC global variables. - - Parameters - ---------- - bec_client : BECClient - The active BEC client instance (``bec`` in the iPython session). - output_dir : str | Path - Directory where ``status.json`` and ``status.html`` are written. - Created if it does not exist. - cycle_interval : float - Seconds between update cycles. Default: 15 s. - verbosity : int - VERBOSITY_SILENT / VERBOSITY_NORMAL / VERBOSITY_VERBOSE / VERBOSITY_DEBUG. - Default: VERBOSITY_NORMAL. - """ - - def __init__( - self, - bec_client, - output_dir: str = "~/data/raw/webpage/", - cycle_interval: float = _CYCLE_INTERVAL_S, - verbosity: int = VERBOSITY_NORMAL, - ): - self._bec = bec_client - self._output_dir = Path(output_dir).expanduser().resolve() - self._cycle_interval = cycle_interval - self._verbosity = verbosity - - self._thread = None - self._stop_event = threading.Event() - - # Rolling state kept between cycles - self._idle_since = None - self._owner_id = f"{socket.gethostname()}:{os.getpid()}" - - # ------------------------------------------------------------------ - # Public API - # ------------------------------------------------------------------ - - def start(self) -> bool: - """ - Start the generator thread if this session wins the singleton lock. - Returns True if started, False if another session already owns it. - """ - if not _check_account_match(self._bec): - self._log( - VERBOSITY_NORMAL, - "WebpageGenerator: BEC account does not match system user. " - "Not starting to avoid writing data to the wrong account.", - level="warning", - ) - return False - - if self._thread is not None and self._thread.is_alive(): - self._log(VERBOSITY_NORMAL, "WebpageGenerator already running in this session.") - return True - - if not self._acquire_lock(): - return False - - self._output_dir.mkdir(parents=True, exist_ok=True) - self._stop_event.clear() - self._thread = threading.Thread( - target=self._run, - name="WebpageGenerator", - daemon=True, - ) - self._thread.start() - self._log( - VERBOSITY_NORMAL, - f"WebpageGenerator started (owner: {self._owner_id}, " - f"output: {self._output_dir}, interval: {self._cycle_interval}s)", - ) - return True - - def stop(self) -> None: - """Stop the generator thread and release the singleton lock.""" - self._stop_event.set() - if self._thread is not None: - self._thread.join(timeout=self._cycle_interval + 5) - self._release_lock() - self._log(VERBOSITY_NORMAL, "WebpageGenerator stopped.") - - @property - def verbosity(self) -> int: - return self._verbosity - - @verbosity.setter - def verbosity(self, val: int) -> None: - self._verbosity = val - self._log(VERBOSITY_NORMAL, f"WebpageGenerator verbosity set to {val}.") - - def status(self) -> None: - """Print a human-readable status summary to the console.""" - lock = self._read_lock() - running = self._thread is not None and self._thread.is_alive() - print( - f"WebpageGenerator\n" - f" This session running : {running}\n" - f" Lock owner : {lock.get('owner_id', 'none')}\n" - f" Lock heartbeat : {lock.get('heartbeat', 'never')}\n" - f" Output dir : {self._output_dir}\n" - f" Cycle interval : {self._cycle_interval}s\n" - f" Verbosity : {self._verbosity}\n" - ) - - # ------------------------------------------------------------------ - # Singleton lock helpers - # ------------------------------------------------------------------ - - def _acquire_lock(self) -> bool: - lock = self._read_lock() - if lock: - age = _heartbeat_age_s(lock.get("heartbeat")) - if age < _LOCK_STALE_AFTER_S: - self._log( - VERBOSITY_NORMAL, - f"WebpageGenerator already owned by " - f"'{lock.get('owner_id')}' " - f"(heartbeat {age:.0f}s ago). Not starting.", - ) - return False - self._log( - VERBOSITY_NORMAL, - f"Stale lock found (owner: '{lock.get('owner_id')}', " - f"{age:.0f}s ago). Taking over.", - ) - - self._write_lock() - return True - - def _write_lock(self) -> None: - self._bec.set_global_var( - _LOCK_VAR_KEY, - { - "owner_id": self._owner_id, - "heartbeat": _now_iso(), - "pid": os.getpid(), - "hostname": socket.gethostname(), - }, - ) - - def _read_lock(self) -> dict: - val = self._bec.get_global_var(_LOCK_VAR_KEY) - return val if isinstance(val, dict) else {} - - def _release_lock(self) -> None: - lock = self._read_lock() - if lock.get("owner_id") == self._owner_id: - self._bec.delete_global_var(_LOCK_VAR_KEY) - - # ------------------------------------------------------------------ - # Main loop - # ------------------------------------------------------------------ - - def _run(self) -> None: - while not self._stop_event.is_set(): - cycle_start = _epoch() - try: - self._cycle() - except Exception as exc: - self._log( - VERBOSITY_NORMAL, - f"WebpageGenerator cycle error: {exc}", - level="warning", - ) - # Refresh the singleton heartbeat - try: - self._write_lock() - except Exception: - pass - # Sleep for the remainder of the interval - elapsed = _epoch() - cycle_start - sleep_time = max(0.0, self._cycle_interval - elapsed) - self._stop_event.wait(sleep_time) - - def _cycle(self) -> None: - """One generator cycle: read state -> derive status -> write outputs.""" - - # --- Read progress from global var (readable from any session) ------- - progress = self._bec.get_global_var("tomo_progress") or {} - - # --- Read queue status ----------------------------------------------- - # NOTE: queue status is always 'RUNNING' while BEC is alive. - # An actual scan is executing only when info is non-empty AND - # active_request_block is set on the first entry. - try: - queue_info = self._bec.queue.queue_storage.current_scan_queue - primary = queue_info.get("primary") - queue_status = primary.status if primary is not None else "unknown" - queue_has_active_scan = ( - primary is not None - and len(primary.info) > 0 - and primary.info[0].active_request_block is not None - ) - except Exception: - queue_status = "unknown" - queue_has_active_scan = False - - # --- Track idle onset ------------------------------------------------ - # Use both the tomo heartbeat and the queue active-scan flag. - # This handles the brief COMPLETED gap between individual scans - # while a tomo is still running. - hb_age = _heartbeat_age_s(progress.get("heartbeat")) - tomo_active = hb_age < _TOMO_HEARTBEAT_STALE_S - - if tomo_active or queue_has_active_scan: - self._idle_since = None - elif self._idle_since is None: - self._idle_since = _epoch() - - # --- Derive experiment status ---------------------------------------- - exp_status = _derive_status(progress, queue_has_active_scan, self._idle_since) - - # --- Build payload --------------------------------------------------- - idle_for_s = None if self._idle_since is None else (_epoch() - self._idle_since) - - payload = { - "generated_at": _now_iso(), - "generated_at_epoch": _epoch(), - "experiment_status": exp_status, - "queue_status": queue_status, - "queue_has_active_scan": queue_has_active_scan, - "idle_for_s": idle_for_s, - "idle_for_human": _format_duration(idle_for_s), - "progress": { - "tomo_type": progress.get("tomo_type", "N/A"), - "projection": progress.get("projection", 0), - "total_projections": progress.get("total_projections", 0), - "subtomo": progress.get("subtomo", 0), - "subtomo_projection": progress.get("subtomo_projection", 0), - "subtomo_total_projections": progress.get("subtomo_total_projections", 1), - "angle": progress.get("angle", 0), - "tomo_start_time": progress.get("tomo_start_time"), - "estimated_remaining_s": progress.get("estimated_remaining_time"), - "estimated_remaining_human": _format_duration( - progress.get("estimated_remaining_time") - ), - "heartbeat": progress.get("heartbeat"), - "heartbeat_age_s": round(hb_age, 1) if hb_age != float("inf") else None, - }, - "generator": { - "owner_id": self._owner_id, - "cycle_interval_s": self._cycle_interval, - }, - } - - # --- Write outputs --------------------------------------------------- - json_path = self._output_dir / "status.json" - json_path.write_text(json.dumps(payload, indent=2, default=str)) - - html_path = self._output_dir / "status.html" - html_path.write_text(_render_html()) - - # --- Console feedback ------------------------------------------------ - self._log( - VERBOSITY_VERBOSE, - f"[{_now_iso()}] status={exp_status} active_scan={queue_has_active_scan} " - f"proj={payload['progress']['projection']}/" - f"{payload['progress']['total_projections']} " - f"hb_age={payload['progress']['heartbeat_age_s']}s " - f"idle={_format_duration(idle_for_s)}", - ) - self._log( - VERBOSITY_DEBUG, - f" full payload:\n{json.dumps(payload, indent=4, default=str)}", - ) - - # ------------------------------------------------------------------ - # Logging helper - # ------------------------------------------------------------------ - - def _log(self, min_verbosity: int, msg: str, level: str = "info") -> None: - if self._verbosity < min_verbosity: - return - if level == "warning": - logger.warning(msg) - elif level == "error": - logger.error(msg) - else: - print(msg) - - -# --------------------------------------------------------------------------- -# HTML template (static shell - the page fetches status.json on load/refresh) -# --------------------------------------------------------------------------- - -def _render_html() -> str: - """Return the full HTML for the status page.""" - return r""" - - - - - flOMNI - Experiment Status - - - - -
-
-

flOMNI Status Page

-

Click to enable the page. Audio warnings require a user interaction to activate.

- -
-
- -
- -
- -
updating…
-
- -
-
-
-
Loading…
-
- -
-
- - - - - - -
- 0% - OVERALL -
-
- -
-
- Projection - - -
-
- Sub-tomo - - -
-
- Angle - - -
-
- Tomo type - - -
-
- ETA - - -
-
- Started - - -
-
- -
-
- Sub-tomo progress - - -
-
-
-
-
-
- -
-
-
- Audio warnings: initialising… -
-
- - -
-
- - - -
- - - - -""" diff --git a/csaxs_bec/bec_ipython_client/plugins/omny/omny_webpage_generator.py b/csaxs_bec/bec_ipython_client/plugins/omny/omny_webpage_generator.py new file mode 100644 index 0000000..1a04a77 --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/omny/omny_webpage_generator.py @@ -0,0 +1,96 @@ +""" +omny/webpage_generator.py +========================== +OMNY-specific webpage generator subclass. + +Integration (inside the OMNY __init__ / startup): +-------------------------------------------------- + from csaxs_bec.bec_ipython_client.plugins.omny.webpage_generator import ( + OmnyWebpageGenerator, + ) + self._webpage_gen = OmnyWebpageGenerator( + bec_client=client, + output_dir="~/data/raw/webpage/", + ) + self._webpage_gen.start() + +Or use the factory (auto-selects by session name "omny"): +--------------------------------------------------------- + from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( + make_webpage_generator, + ) + self._webpage_gen = make_webpage_generator(bec, output_dir="~/data/raw/webpage/") + self._webpage_gen.start() + +Interactive helpers: +-------------------- + omny._webpage_gen.status() + omny._webpage_gen.verbosity = 2 + omny._webpage_gen.stop() + omny._webpage_gen.start() +""" + +from pathlib import Path + +from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( + WebpageGeneratorBase, + _safe_get, + _safe_float, + _gvar, +) + + +class OmnyWebpageGenerator(WebpageGeneratorBase): + """ + OMNY-specific webpage generator. + Logo: OMNY.png from the same directory as this file. + + Override _collect_setup_data() to add OMNY-specific temperatures, + sample name, and measurement settings. + + The old OMNY spec webpage showed: + - Cryo temperatures (XOMNY-TEMP-CRYO-A/B) + - Per-channel temperatures (XOMNY-TEMP1..48) + - Dewar pressure / LN2 flow + - Interferometer strengths (OINTERF) + Map these to BEC device paths below once available. + """ + + # TODO: fill in OMNY-specific device paths + # label -> dotpath under device_manager.devices + _TEMP_MAP = { + # "Sample (cryo A)": "omny_temp.cryo_a", + # "Cryo head (B)": "omny_temp.cryo_b", + } + + def _logo_path(self): + return Path(__file__).parent / "OMNY.png" + + def _collect_setup_data(self) -> dict: + # ── OMNY-specific data goes here ────────────────────────────── + # Uncomment and adapt when device names are known: + # + # dm = self._bec.device_manager + # sample_name = _safe_get(dm, "omny_samples.sample_names.sample0") or "N/A" + # temperatures = { + # label: _safe_float(_safe_get(dm, path)) + # for label, path in self._TEMP_MAP.items() + # } + # settings = { + # "Sample name": sample_name, + # "FOV x / y": ..., + # "Exposure time": _gvar(self._bec, "tomo_countingtime", ".3f", " s"), + # "Angle step": _gvar(self._bec, "tomo_angle_stepsize", ".2f", "\u00b0"), + # } + # return { + # "type": "omny", + # "sample_name": sample_name, + # "temperatures": temperatures, + # "settings": settings, + # } + + # Placeholder — returns minimal info until implemented + return { + "type": "omny", + # OMNY-specific data here + }