added authentication for user to web generator
CI for csaxs_bec / test (push) Successful in 2m0s
CI for csaxs_bec / test (pull_request) Successful in 1m59s

This commit is contained in:
x12sa
2026-04-15 14:08:12 +02:00
parent a93b737be1
commit b6e372ea62
2 changed files with 156 additions and 6 deletions
@@ -1329,6 +1329,10 @@ class Flomni(
self.align = XrayEyeAlign(self.client, self)
self.set_client(client)
def set_web_password(self, password: str) -> None:
"""Set the web password for the current BEC account."""
self._webpage_gen.set_web_password(password)
def start_x_ray_eye_alignment(self, keep_shutter_open=False):
if self.OMNYTools.yesno(
@@ -43,11 +43,23 @@ Integration (inside Flomni.__init__, after self._progress_proxy.reset()):
Interactive helpers (optional, in the iPython session):
-------------------------------------------------------
flomni._webpage_gen.status() # print current status + local URL
flomni._webpage_gen.verbosity = 2 # VERBOSE: one-line summary per cycle
flomni._webpage_gen.verbosity = 3 # DEBUG: full JSON per cycle
flomni._webpage_gen.stop() # release lock, stop local server
flomni._webpage_gen.start() # restart after stop()
flomni._webpage_gen.status() # print current status + local URL
flomni._webpage_gen.verbosity = 2 # VERBOSE: one-line summary per cycle
flomni._webpage_gen.verbosity = 3 # DEBUG: full JSON per cycle
flomni._webpage_gen.stop() # release lock, stop local server
flomni._webpage_gen.start() # restart after stop()
flomni.webpage_gen.set_web_password("pw") # set web password for current e-account
Session authentication:
-----------------------
Two htpasswd files are used on the remote server:
users.htpasswd -- static, managed manually with htpasswd -B
session.htpasswd -- managed by the generator; one entry for the current e-account
At _launch(), the generator queries session_query.php to find which account was
previously active. If the account has changed the session.htpasswd is cleared
immediately (old user loses access) and uploaded. The new user calls
set_web_password() to set their own password and gain access.
"""
import datetime
@@ -268,6 +280,10 @@ class HttpUploader:
if files:
self._dispatch(self._upload_files, files, False)
def upload_file_async(self, path: Path) -> None:
"""Upload a single specific file, bypassing suffix whitelist and mtime check."""
self._dispatch(self._upload_files, [Path(path)], True)
def cleanup_ptycho_images_async(self) -> None:
"""Ask the server to delete all S*_*.png / S*_*.jpg files (background)."""
self._dispatch(self._do_cleanup)
@@ -419,7 +435,7 @@ class LocalHttpServer:
except OSError as exc:
raise RuntimeError(
f"LocalHttpServer: cannot bind port {self._port}: {exc}"
) from excs
) from exc
self._thread = threading.Thread(
target=self._server.serve_forever,
name="LocalHttpServer",
@@ -473,6 +489,14 @@ class WebpageGeneratorBase:
self._local_port = local_port
self._local_server = None # created fresh each _launch()
# Derive companion URLs from upload_url
if upload_url is not None:
self._session_query_url = upload_url.replace("upload.php", "session_query.php")
self._set_password_url = upload_url.replace("upload.php", "set_password.php")
else:
self._session_query_url = None
self._set_password_url = None
self._thread = None
self._stop_event = threading.Event()
self._last_active_time = None # epoch of last tomo/queue activity
@@ -521,6 +545,11 @@ class WebpageGeneratorBase:
self._copy_logo()
(self._output_dir / "status.html").write_text(_render_html(_PHONE_NUMBERS))
# Check whether the active BEC account matches the session user on the
# remote server. If not, clear session.htpasswd so the old user loses
# web access immediately. The new user then calls set_web_password().
self._check_session_user()
# Start local HTTP server (always on; a fresh instance per _launch).
if self._local_server is not None and self._local_server.is_alive():
self._local_server.stop()
@@ -575,6 +604,11 @@ class WebpageGeneratorBase:
lock = self._read_lock()
running = self._thread is not None and self._thread.is_alive()
local = self._local_server.url if (self._local_server and self._local_server.is_alive()) else "stopped"
remote_user = self._get_remote_session_user() if self._session_query_url else "n/a"
try:
active_account = self._bec.active_account
except Exception:
active_account = "unknown"
print(
f"WebpageGenerator\n"
f" This session running : {running}\n"
@@ -584,6 +618,9 @@ class WebpageGeneratorBase:
f" Local URL : {local}\n"
f" Cycle interval : {self._cycle_interval}s\n"
f" Upload URL : {self._uploader._url if self._uploader else 'disabled'}\n"
f" Session query URL : {self._session_query_url or 'disabled'}\n"
f" BEC active account : {active_account}\n"
f" Server session user : {remote_user}\n"
f" Verbosity : {self._verbosity}\n"
)
@@ -615,6 +652,115 @@ class WebpageGeneratorBase:
self._log(VERBOSITY_NORMAL,
f"Failed to copy logo: {exc}", level="warning")
# ------------------------------------------------------------------
# Session authentication
# ------------------------------------------------------------------
def _get_remote_session_user(self) -> "str | None":
"""GET session_query.php and return the username in session.htpasswd, or None."""
if self._session_query_url is None:
return None
try:
import requests as _requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
r = _requests.get(self._session_query_url, timeout=10, verify=False)
if r.status_code == 200:
return r.json().get("user") # str or None
self._log(VERBOSITY_VERBOSE,
f"session_query: HTTP {r.status_code}", level="warning")
except Exception as exc:
self._log(VERBOSITY_VERBOSE,
f"session_query failed: {exc}", level="warning")
return None
def _check_session_user(self) -> None:
"""Compare BEC active account with the server session user.
- Match: silent, nothing to do.
- No remote user: current user has no web auth yet — print reminder.
- Mismatch: clear session.htpasswd immediately (old user loses access),
then print reminder that current user has no web auth yet.
"""
if self._session_query_url is None:
return # no remote server configured
try:
current_account = self._bec.active_account # e.g. 'p23092'
except Exception as exc:
self._log(VERBOSITY_VERBOSE,
f"session check: cannot read active_account: {exc}", level="warning")
return
remote_user = self._get_remote_session_user()
if remote_user == current_account:
return # all good, stay silent
if remote_user is not None:
# Different user still has web access — remove it immediately.
print(f"WebpageGenerator: account changed "
f"'{remote_user}' -> '{current_account}' "
f"— removing web access for '{remote_user}'")
self._clear_session_htpasswd()
# Either no previous user or a different user — either way the
# current account has no web authentication yet.
_BOLD_RED = "\033[1;31m"
_RESET = "\033[0m"
print(f"{_BOLD_RED}***\nWebpageGenerator: '{current_account}' has no web authentication yet.{_RESET}")
print(f"{_BOLD_RED} To set a password run: flomni.set_web_password(\"your_password\")\n***{_RESET}")
def _clear_session_htpasswd(self) -> None:
"""Write an empty session.htpasswd locally and upload it."""
session_path = self._output_dir / "session.htpasswd"
session_path.write_text("")
if self._uploader is not None:
self._uploader.upload_file_async(session_path)
def set_web_password(self, password: str) -> None:
"""Set the web password for the current BEC account.
Sends the plaintext password to set_password.php on the server
(IP-restricted, HTTPS). PHP generates the bcrypt hash and writes
session.htpasswd directly — no Python bcrypt package needed.
The username is taken from bec.active_account (e.g. 'p23092').
Example::
flomni.webpage_gen.set_web_password("my_secret_password")
"""
if self._set_password_url is None:
print("set_web_password: no upload URL configured — cannot reach server")
return
try:
account = self._bec.active_account
except Exception as exc:
print(f"set_web_password: cannot determine active account: {exc}")
return
try:
import requests as _requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
r = _requests.post(
self._set_password_url,
data={"username": account, "password": password},
timeout=15,
verify=False,
)
if r.status_code == 200:
resp = r.json()
if resp.get("ok"):
print(f"set_web_password: password set for '{account}' on server")
else:
print(f"set_web_password: server error: {resp.get('error', '?')}")
else:
print(f"set_web_password: HTTP {r.status_code}: {r.text[:120]}")
except Exception as exc:
print(f"set_web_password: request failed: {exc}")
# ------------------------------------------------------------------
# Singleton lock
# ------------------------------------------------------------------