diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py index 9eb65f5..4276371 100644 --- a/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py +++ b/csaxs_bec/bec_ipython_client/plugins/flomni/flomni.py @@ -21,6 +21,14 @@ from csaxs_bec.bec_ipython_client.plugins.omny.omny_general_tools import ( TomoIDManager, ) +from csaxs_bec.bec_ipython_client.plugins.flomni.webpage_generator import ( + WebpageGenerator, + VERBOSITY_SILENT, # 0 — no output + VERBOSITY_NORMAL, # 1 — startup / stop messages only (default) + VERBOSITY_VERBOSE, # 2 — one-line summary per cycle + VERBOSITY_DEBUG, # 3 — full JSON payload per cycle + ) + logger = bec_logger.logger if builtins.__dict__.get("bec") is not None: @@ -1303,6 +1311,12 @@ class Flomni( self.corr_angle_y_2 = [] self._progress_proxy = _ProgressProxy(self.client) self._progress_proxy.reset() + self._webpage_gen = WebpageGenerator( + bec_client=client, + output_dir="~/data/raw/webpage/", # adjust to your staging path + verbosity=VERBOSITY_NORMAL, + ) + self._webpage_gen.start() self.OMNYTools = OMNYTools(self.client) self.reconstructor = PtychoReconstructor(self.ptycho_reconstruct_foldername) self.tomo_id_manager = TomoIDManager() @@ -2249,8 +2263,8 @@ class Flomni( + ' 888 888 "Y88888P" 888 888 888 Y888 8888888 \n' ) padding = 20 - fovxy = f"{self.fovx:.2f}/{self.fovy:.2f}" - stitching = f"{self.stitch_x:.2f}/{self.stitch_y:.2f}" + fovxy = f"{self.fovx:.1f}/{self.fovy:.1f}" + stitching = f"{self.stitch_x:.0f}/{self.stitch_y:.0f}" dataset_id = str(self.client.queue.next_dataset_number) account = bec.active_account content = [ @@ -2267,7 +2281,7 @@ class Flomni( f"{'Exposure time:':<{padding}}{self.tomo_countingtime:>{padding}.2f}\n", f"{'Fermat spiral step size:':<{padding}}{self.tomo_shellstep:>{padding}.2f}\n", f"{'FOV:':<{padding}}{fovxy:>{padding}}\n", - f"{'Stitching:':<{padding}}{stitching:>{padding}.0f}\n", + f"{'Stitching:':<{padding}}{stitching:>{padding}}\n", f"{'Number of individual sub-tomograms:':<{padding}}{8:>{padding}}\n", f"{'Angular step within sub-tomogram:':<{padding}}{self.tomo_angle_stepsize:>{padding}.2f}\n", ] diff --git a/csaxs_bec/bec_ipython_client/plugins/flomni/webpage_generator.py b/csaxs_bec/bec_ipython_client/plugins/flomni/webpage_generator.py new file mode 100644 index 0000000..ee26725 --- /dev/null +++ b/csaxs_bec/bec_ipython_client/plugins/flomni/webpage_generator.py @@ -0,0 +1,892 @@ +""" +webpage_generator.py +==================== +Background thread that reads the flOMNI tomo progress from the BEC global +variable store and writes a self-contained status.json + status.html to a +configurable output directory. A separate upload process can copy those +files to the web host. + +Usage (inside Flomni.__init__, after self._progress_proxy.reset()): +-------------------------------------------------------------------- + self._webpage_gen = WebpageGenerator( + bec_client=client, + output_dir="~/data/raw/webpage/", + ) + self._webpage_gen.start() + +Interactive commands (optional, in the iPython session): +--------------------------------------------------------- + flomni._webpage_gen.status() # print current status + flomni._webpage_gen.verbosity = 2 # switch to VERBOSE mid-session + flomni._webpage_gen.stop() # release lock, let another session take over + flomni._webpage_gen.start() # restart after stop() +""" + +import datetime +import json +import os +import socket +import threading +import time +from pathlib import Path + +from bec_lib import bec_logger + +logger = bec_logger.logger + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +# BEC global-var key used as a distributed singleton lock +_LOCK_VAR_KEY = "webpage_generator_lock" + +# Heartbeat must be refreshed at least this often (seconds) or the lock +# is considered stale and another session may take over. +_LOCK_STALE_AFTER_S = 45 + +# How long between generator cycles (seconds) +_CYCLE_INTERVAL_S = 15 + +# If the tomo progress heartbeat has not been updated for this long we +# consider the tomo loop no longer actively running. +_TOMO_HEARTBEAT_STALE_S = 90 + +# After finishing normally, stay in IDLE_SHORT for this long before +# switching to IDLE_LONG (which triggers the audio warning). +_IDLE_SHORT_WINDOW_S = 300 # 5 minutes + +# Verbosity levels +VERBOSITY_SILENT = 0 # no output at all +VERBOSITY_NORMAL = 1 # startup/stop messages only +VERBOSITY_VERBOSE = 2 # each cycle summary +VERBOSITY_DEBUG = 3 # full detail each cycle + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _now_iso() -> str: + return datetime.datetime.now().isoformat(timespec="seconds") + + +def _epoch() -> float: + return time.time() + + +def _heartbeat_age_s(iso_str) -> float: + """Return seconds since the ISO-format heartbeat string, or infinity.""" + if iso_str is None: + return float("inf") + try: + ts = datetime.datetime.fromisoformat(iso_str) + return (datetime.datetime.now() - ts).total_seconds() + except Exception: + return float("inf") + + +def _format_duration(seconds) -> str: + if seconds is None: + return "N/A" + try: + seconds = int(float(seconds)) + except (TypeError, ValueError): + return "N/A" + h, remainder = divmod(seconds, 3600) + m, s = divmod(remainder, 60) + if h > 0: + return f"{h}h {m:02d}m {s:02d}s" + if m > 0: + return f"{m}m {s:02d}s" + return f"{s}s" + + +def _check_account_match(bec_client) -> bool: + """Return True if the BEC active account matches the system user.""" + try: + active = bec_client.active_account # e.g. "p23092" + system_user = os.getenv("USER") or os.getlogin() # e.g. "e23092" + return active[1:] == system_user[1:] + except Exception: + return True # don't block on unknown accounts + + +# --------------------------------------------------------------------------- +# Status derivation +# --------------------------------------------------------------------------- + +def _derive_status(progress: dict, queue_has_active_scan: bool, idle_since) -> str: + """ + Derive a simple status string from available signals. + + Returns one of: + "scanning" - tomo heartbeat is fresh (tomo loop actively running) + "running" - a scan is active but outside the tomo heartbeat window + (alignment, other tasks, or brief inter-scan gap) + "idle_short" - recently finished, within IDLE_SHORT_WINDOW_S + "idle_long" - idle longer than IDLE_SHORT_WINDOW_S (trigger warning) + "unknown" - cannot determine yet + """ + hb_age = _heartbeat_age_s(progress.get("heartbeat")) + tomo_active = hb_age < _TOMO_HEARTBEAT_STALE_S + + if tomo_active: + return "scanning" + + if queue_has_active_scan: + return "running" + + if idle_since is not None: + idle_s = _epoch() - idle_since + return "idle_short" if idle_s < _IDLE_SHORT_WINDOW_S else "idle_long" + + return "unknown" + + +# --------------------------------------------------------------------------- +# Main generator class +# --------------------------------------------------------------------------- + +class WebpageGenerator: + """ + Singleton-safe background thread that generates the experiment status + page by reading BEC global variables. + + Parameters + ---------- + bec_client : BECClient + The active BEC client instance (``bec`` in the iPython session). + output_dir : str | Path + Directory where ``status.json`` and ``status.html`` are written. + Created if it does not exist. + cycle_interval : float + Seconds between update cycles. Default: 15 s. + verbosity : int + VERBOSITY_SILENT / VERBOSITY_NORMAL / VERBOSITY_VERBOSE / VERBOSITY_DEBUG. + Default: VERBOSITY_NORMAL. + """ + + def __init__( + self, + bec_client, + output_dir: str = "~/data/raw/webpage/", + cycle_interval: float = _CYCLE_INTERVAL_S, + verbosity: int = VERBOSITY_NORMAL, + ): + self._bec = bec_client + self._output_dir = Path(output_dir).expanduser().resolve() + self._cycle_interval = cycle_interval + self._verbosity = verbosity + + self._thread = None + self._stop_event = threading.Event() + + # Rolling state kept between cycles + self._idle_since = None + self._owner_id = f"{socket.gethostname()}:{os.getpid()}" + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def start(self) -> bool: + """ + Start the generator thread if this session wins the singleton lock. + Returns True if started, False if another session already owns it. + """ + if not _check_account_match(self._bec): + self._log( + VERBOSITY_NORMAL, + "WebpageGenerator: BEC account does not match system user. " + "Not starting to avoid writing data to the wrong account.", + level="warning", + ) + return False + + if self._thread is not None and self._thread.is_alive(): + self._log(VERBOSITY_NORMAL, "WebpageGenerator already running in this session.") + return True + + if not self._acquire_lock(): + return False + + self._output_dir.mkdir(parents=True, exist_ok=True) + self._stop_event.clear() + self._thread = threading.Thread( + target=self._run, + name="WebpageGenerator", + daemon=True, + ) + self._thread.start() + self._log( + VERBOSITY_NORMAL, + f"WebpageGenerator started (owner: {self._owner_id}, " + f"output: {self._output_dir}, interval: {self._cycle_interval}s)", + ) + return True + + def stop(self) -> None: + """Stop the generator thread and release the singleton lock.""" + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout=self._cycle_interval + 5) + self._release_lock() + self._log(VERBOSITY_NORMAL, "WebpageGenerator stopped.") + + @property + def verbosity(self) -> int: + return self._verbosity + + @verbosity.setter + def verbosity(self, val: int) -> None: + self._verbosity = val + self._log(VERBOSITY_NORMAL, f"WebpageGenerator verbosity set to {val}.") + + def status(self) -> None: + """Print a human-readable status summary to the console.""" + lock = self._read_lock() + running = self._thread is not None and self._thread.is_alive() + print( + f"WebpageGenerator\n" + f" This session running : {running}\n" + f" Lock owner : {lock.get('owner_id', 'none')}\n" + f" Lock heartbeat : {lock.get('heartbeat', 'never')}\n" + f" Output dir : {self._output_dir}\n" + f" Cycle interval : {self._cycle_interval}s\n" + f" Verbosity : {self._verbosity}\n" + ) + + # ------------------------------------------------------------------ + # Singleton lock helpers + # ------------------------------------------------------------------ + + def _acquire_lock(self) -> bool: + lock = self._read_lock() + if lock: + age = _heartbeat_age_s(lock.get("heartbeat")) + if age < _LOCK_STALE_AFTER_S: + self._log( + VERBOSITY_NORMAL, + f"WebpageGenerator already owned by " + f"'{lock.get('owner_id')}' " + f"(heartbeat {age:.0f}s ago). Not starting.", + ) + return False + self._log( + VERBOSITY_NORMAL, + f"Stale lock found (owner: '{lock.get('owner_id')}', " + f"{age:.0f}s ago). Taking over.", + ) + + self._write_lock() + return True + + def _write_lock(self) -> None: + self._bec.set_global_var( + _LOCK_VAR_KEY, + { + "owner_id": self._owner_id, + "heartbeat": _now_iso(), + "pid": os.getpid(), + "hostname": socket.gethostname(), + }, + ) + + def _read_lock(self) -> dict: + val = self._bec.get_global_var(_LOCK_VAR_KEY) + return val if isinstance(val, dict) else {} + + def _release_lock(self) -> None: + lock = self._read_lock() + if lock.get("owner_id") == self._owner_id: + self._bec.delete_global_var(_LOCK_VAR_KEY) + + # ------------------------------------------------------------------ + # Main loop + # ------------------------------------------------------------------ + + def _run(self) -> None: + while not self._stop_event.is_set(): + cycle_start = _epoch() + try: + self._cycle() + except Exception as exc: + self._log( + VERBOSITY_NORMAL, + f"WebpageGenerator cycle error: {exc}", + level="warning", + ) + # Refresh the singleton heartbeat + try: + self._write_lock() + except Exception: + pass + # Sleep for the remainder of the interval + elapsed = _epoch() - cycle_start + sleep_time = max(0.0, self._cycle_interval - elapsed) + self._stop_event.wait(sleep_time) + + def _cycle(self) -> None: + """One generator cycle: read state -> derive status -> write outputs.""" + + # --- Read progress from global var (readable from any session) ------- + progress = self._bec.get_global_var("tomo_progress") or {} + + # --- Read queue status ----------------------------------------------- + # NOTE: queue status is always 'RUNNING' while BEC is alive. + # An actual scan is executing only when info is non-empty AND + # active_request_block is set on the first entry. + try: + queue_info = self._bec.queue.queue_storage.current_scan_queue + primary = queue_info.get("primary") + queue_status = primary.status if primary is not None else "unknown" + queue_has_active_scan = ( + primary is not None + and len(primary.info) > 0 + and primary.info[0].active_request_block is not None + ) + except Exception: + queue_status = "unknown" + queue_has_active_scan = False + + # --- Track idle onset ------------------------------------------------ + # Use both the tomo heartbeat and the queue active-scan flag. + # This handles the brief COMPLETED gap between individual scans + # while a tomo is still running. + hb_age = _heartbeat_age_s(progress.get("heartbeat")) + tomo_active = hb_age < _TOMO_HEARTBEAT_STALE_S + + if tomo_active or queue_has_active_scan: + self._idle_since = None + elif self._idle_since is None: + self._idle_since = _epoch() + + # --- Derive experiment status ---------------------------------------- + exp_status = _derive_status(progress, queue_has_active_scan, self._idle_since) + + # --- Build payload --------------------------------------------------- + idle_for_s = None if self._idle_since is None else (_epoch() - self._idle_since) + + payload = { + "generated_at": _now_iso(), + "generated_at_epoch": _epoch(), + "experiment_status": exp_status, + "queue_status": queue_status, + "queue_has_active_scan": queue_has_active_scan, + "idle_for_s": idle_for_s, + "idle_for_human": _format_duration(idle_for_s), + "progress": { + "tomo_type": progress.get("tomo_type", "N/A"), + "projection": progress.get("projection", 0), + "total_projections": progress.get("total_projections", 0), + "subtomo": progress.get("subtomo", 0), + "subtomo_projection": progress.get("subtomo_projection", 0), + "subtomo_total_projections": progress.get("subtomo_total_projections", 1), + "angle": progress.get("angle", 0), + "tomo_start_time": progress.get("tomo_start_time"), + "estimated_remaining_s": progress.get("estimated_remaining_time"), + "estimated_remaining_human": _format_duration( + progress.get("estimated_remaining_time") + ), + "heartbeat": progress.get("heartbeat"), + "heartbeat_age_s": round(hb_age, 1) if hb_age != float("inf") else None, + }, + "generator": { + "owner_id": self._owner_id, + "cycle_interval_s": self._cycle_interval, + }, + } + + # --- Write outputs --------------------------------------------------- + json_path = self._output_dir / "status.json" + json_path.write_text(json.dumps(payload, indent=2, default=str)) + + html_path = self._output_dir / "status.html" + html_path.write_text(_render_html()) + + # --- Console feedback ------------------------------------------------ + self._log( + VERBOSITY_VERBOSE, + f"[{_now_iso()}] status={exp_status} active_scan={queue_has_active_scan} " + f"proj={payload['progress']['projection']}/" + f"{payload['progress']['total_projections']} " + f"hb_age={payload['progress']['heartbeat_age_s']}s " + f"idle={_format_duration(idle_for_s)}", + ) + self._log( + VERBOSITY_DEBUG, + f" full payload:\n{json.dumps(payload, indent=4, default=str)}", + ) + + # ------------------------------------------------------------------ + # Logging helper + # ------------------------------------------------------------------ + + def _log(self, min_verbosity: int, msg: str, level: str = "info") -> None: + if self._verbosity < min_verbosity: + return + if level == "warning": + logger.warning(msg) + elif level == "error": + logger.error(msg) + else: + print(msg) + + +# --------------------------------------------------------------------------- +# HTML template (static shell - the page fetches status.json on load/refresh) +# --------------------------------------------------------------------------- + +def _render_html() -> str: + """Return the full HTML for the status page.""" + return r""" + +
+ + +Click to enable the page. Audio warnings require a user interaction to activate.
+ +