Compare commits

..

10 Commits

Author SHA1 Message Date
x12sa
f786e34a0e fix thread stop
All checks were successful
CI for csaxs_bec / test (pull_request) Successful in 1m56s
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m58s
2026-03-30 14:41:44 +02:00
x12sa
cceedc947a local server added
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 2m0s
2026-03-30 13:07:46 +02:00
x12sa
80de9724d4 removed linear progress bar 2026-03-30 13:07:46 +02:00
x12sa
2ac02e0623 http error message cooldown to 10 mins 2026-03-30 13:07:46 +02:00
x12sa
3c2a0aa484 error message cooldown 2026-03-30 13:07:46 +02:00
x12sa
27f4eca4ae change url and fix in https certificate ignore 2026-03-30 13:07:46 +02:00
x12sa
f2771bd4b6 https without certificate possible 2026-03-30 13:07:46 +02:00
x12sa
546ebf8a58 mod audio gen 2026-03-30 13:07:46 +02:00
d3f1d31bb8 fix(file_writer): Fix file_writer format method.
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 2s
CI for csaxs_bec / test (push) Successful in 2m25s
2026-03-30 12:34:07 +02:00
6d404cad12 fix(omny/shutter): MonitorSignal wrapper with auto_monitor
All checks were successful
Read the Docs Deploy Trigger / trigger-rtd-webhook (push) Successful in 1s
CI for csaxs_bec / test (push) Successful in 1m57s
2026-03-27 22:41:57 +01:00
16 changed files with 725 additions and 1267 deletions

View File

@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"rio_device": "galilrioesxbox", "rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier", "description": "Beam Position Monitor 4 current amplifier",
"channels": { "channels": {
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0 "gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1 "gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2 "gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3 "coupling": 3, # Pin 13 -> Galil ch3

View File

@@ -1317,7 +1317,9 @@ class Flomni(
self._webpage_gen = FlomniWebpageGenerator( self._webpage_gen = FlomniWebpageGenerator(
bec_client=client, bec_client=client,
output_dir="~/data/raw/webpage/", output_dir="~/data/raw/webpage/",
upload_url="http://s1090968537.online.de/upload.php", # optional #upload_url="http://s1090968537.online.de/upload.php", # optional
upload_url="https://v1p0zyg2w9n2k9c1.myfritz.net/upload.php",
local_port=8080
) )
self._webpage_gen.start() self._webpage_gen.start()

View File

@@ -5,10 +5,14 @@ Background thread that reads tomo progress from the BEC global variable store
and writes status.json (every cycle) + status.html (once at startup) to a and writes status.json (every cycle) + status.html (once at startup) to a
staging directory. An optional HttpUploader sends those files to a web host staging directory. An optional HttpUploader sends those files to a web host
after every cycle, running in a separate daemon thread so uploads never block after every cycle, running in a separate daemon thread so uploads never block
the generator cycle. the generator cycle. A built-in LocalHttpServer always serves the output
directory locally (default port 8080) so the page can be accessed on the
lab network without any extra setup.
Architecture Architecture
------------ ------------
LocalHttpServer -- built-in HTTP server; serves output_dir on port 8080.
Always started at _launch(); URL printed to console.
HttpUploader -- non-blocking HTTP uploader (fire-and-forget thread). HttpUploader -- non-blocking HTTP uploader (fire-and-forget thread).
Tracks file mtimes; only uploads changed files. Tracks file mtimes; only uploads changed files.
Sends a cleanup request to the server when the Sends a cleanup request to the server when the
@@ -31,19 +35,24 @@ Integration (inside Flomni.__init__, after self._progress_proxy.reset()):
bec_client=client, bec_client=client,
output_dir="~/data/raw/webpage/", output_dir="~/data/raw/webpage/",
upload_url="http://omny.online/upload.php", # optional upload_url="http://omny.online/upload.php", # optional
local_port=8080, # optional, default 8080
) )
self._webpage_gen.start() self._webpage_gen.start()
# On start(), the console prints:
# ➜ Status page: http://hostname:8080/status.html
Interactive helpers (optional, in the iPython session): Interactive helpers (optional, in the iPython session):
------------------------------------------------------- -------------------------------------------------------
flomni._webpage_gen.status() # print current status flomni._webpage_gen.status() # print current status + local URL
flomni._webpage_gen.verbosity = 2 # VERBOSE: one-line summary per cycle flomni._webpage_gen.verbosity = 2 # VERBOSE: one-line summary per cycle
flomni._webpage_gen.verbosity = 3 # DEBUG: full JSON per cycle flomni._webpage_gen.verbosity = 3 # DEBUG: full JSON per cycle
flomni._webpage_gen.stop() # release lock flomni._webpage_gen.stop() # release lock, stop local server
flomni._webpage_gen.start() # restart after stop() flomni._webpage_gen.start() # restart after stop()
""" """
import datetime import datetime
import functools
import http.server
import json import json
import os import os
import shutil import shutil
@@ -235,6 +244,16 @@ class HttpUploader:
self._uploaded: dict[str, float] = {} # abs path -> mtime at last upload self._uploaded: dict[str, float] = {} # abs path -> mtime at last upload
self._lock = threading.Lock() self._lock = threading.Lock()
self._busy = False # True while an upload thread is running self._busy = False # True while an upload thread is running
self._warn_at: dict[str, float] = {} # key -> epoch of last warning
_WARN_COOLDOWN_S = 600 # only repeat the same warning once per minute
def _warn(self, key: str, msg: str) -> None:
"""Log a warning at most once per _WARN_COOLDOWN_S for a given key."""
now = _epoch()
if now - self._warn_at.get(key, 0) >= self._WARN_COOLDOWN_S:
self._warn_at[key] = now
logger.warning(msg)
# ── Public API ────────────────────────────────────────────────────────── # ── Public API ──────────────────────────────────────────────────────────
@@ -294,8 +313,10 @@ class HttpUploader:
def _upload_files(self, files: list, force: bool = False) -> None: def _upload_files(self, files: list, force: bool = False) -> None:
try: try:
import requests as _requests import requests as _requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError: except ImportError:
logger.warning("HttpUploader: 'requests' library not installed") self._warn("no_requests", "HttpUploader: 'requests' library not installed")
return return
for path in files: for path in files:
@@ -312,21 +333,26 @@ class HttpUploader:
self._url, self._url,
files={"file": (path.name, f)}, files={"file": (path.name, f)},
timeout=self._timeout, timeout=self._timeout,
verify=False, # accept self-signed / untrusted certs
) )
if r.status_code == 200: if r.status_code == 200:
self._uploaded[str(path)] = mtime self._uploaded[str(path)] = mtime
self._warn_at.pop(f"upload_{path.name}", None) # clear on success
logger.debug(f"HttpUploader: OK {path.name}") logger.debug(f"HttpUploader: OK {path.name}")
else: else:
logger.warning( self._warn(
f"upload_{path.name}",
f"HttpUploader: {path.name} -> HTTP {r.status_code}: " f"HttpUploader: {path.name} -> HTTP {r.status_code}: "
f"{r.text[:120]}" f"{r.text[:120]}"
) )
except Exception as exc: except Exception as exc:
logger.warning(f"HttpUploader: {path.name} failed: {exc}") self._warn(f"upload_{path.name}", f"HttpUploader: {path.name} failed: {exc}")
def _do_cleanup(self) -> None: def _do_cleanup(self) -> None:
try: try:
import requests as _requests import requests as _requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError: except ImportError:
return return
try: try:
@@ -334,15 +360,86 @@ class HttpUploader:
self._url, self._url,
data={"action": "cleanup"}, data={"action": "cleanup"},
timeout=self._timeout, timeout=self._timeout,
verify=False, # accept self-signed / untrusted certs
) )
logger.info(f"HttpUploader cleanup: {r.text[:120]}") logger.info(f"HttpUploader cleanup: {r.text[:120]}")
self._warn_at.pop("cleanup", None) # clear on success
# Forget mtime records for ptycho files so they get re-uploaded # Forget mtime records for ptycho files so they get re-uploaded
with self._lock: with self._lock:
to_remove = [k for k in self._uploaded if "/S" in k or "\\S" in k] to_remove = [k for k in self._uploaded if "/S" in k or "\\S" in k]
for k in to_remove: for k in to_remove:
self._uploaded.pop(k, None) self._uploaded.pop(k, None)
except Exception as exc: except Exception as exc:
logger.warning(f"HttpUploader cleanup failed: {exc}") self._warn("cleanup", f"HttpUploader cleanup failed: {exc}")
# ---------------------------------------------------------------------------
# Local HTTP server (serves output_dir over http://hostname:port/)
# ---------------------------------------------------------------------------
class LocalHttpServer:
"""
Serves the generator's output directory over plain HTTP in a daemon thread.
Uses Python's built-in http.server — no extra dependencies.
Request logging is suppressed so the BEC console stays clean.
The server survives stop()/start() cycles: _launch() creates a fresh
instance each time start() is called.
Usage:
srv = LocalHttpServer(output_dir, port=8080)
srv.start()
print(srv.url) # http://hostname:8080/status.html
srv.stop()
"""
def __init__(self, directory: Path, port: int = 8080):
self._directory = Path(directory)
self._port = port
self._server = None
self._thread = None
# ── silence the per-request log lines in the iPython console ──────────
class _QuietHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, *args):
pass
def handle_error(self, request, client_address):
pass # suppress BrokenPipeError and other per-connection noise
def start(self) -> None:
Handler = functools.partial(
self._QuietHandler,
directory=str(self._directory),
)
try:
self._server = http.server.HTTPServer(("", self._port), Handler)
except OSError as exc:
raise RuntimeError(
f"LocalHttpServer: cannot bind port {self._port}: {exc}"
) from exc
self._thread = threading.Thread(
target=self._server.serve_forever,
name="LocalHttpServer",
daemon=True,
)
self._thread.start()
def stop(self) -> None:
if self._server is not None:
self._server.shutdown() # blocks until serve_forever() returns
self._server = None
def is_alive(self) -> bool:
return self._thread is not None and self._thread.is_alive()
@property
def port(self) -> int:
return self._port
@property
def url(self) -> str:
"""Best-guess URL for printing. Uses the machine's hostname."""
return f"http://{socket.gethostname()}:{self._port}/status.html"
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -363,12 +460,15 @@ class WebpageGeneratorBase:
cycle_interval: float = _CYCLE_INTERVAL_S, cycle_interval: float = _CYCLE_INTERVAL_S,
verbosity: int = VERBOSITY_NORMAL, verbosity: int = VERBOSITY_NORMAL,
upload_url: str = None, upload_url: str = None,
local_port: int = 8080,
): ):
self._bec = bec_client self._bec = bec_client
self._output_dir = Path(output_dir).expanduser().resolve() self._output_dir = Path(output_dir).expanduser().resolve()
self._cycle_interval = cycle_interval self._cycle_interval = cycle_interval
self._verbosity = verbosity self._verbosity = verbosity
self._uploader = HttpUploader(upload_url) if upload_url else None self._uploader = HttpUploader(upload_url) if upload_url else None
self._local_port = local_port
self._local_server = None # created fresh each _launch()
self._thread = None self._thread = None
self._stop_event = threading.Event() self._stop_event = threading.Event()
@@ -418,6 +518,17 @@ class WebpageGeneratorBase:
self._copy_logo() self._copy_logo()
(self._output_dir / "status.html").write_text(_render_html(_PHONE_NUMBERS)) (self._output_dir / "status.html").write_text(_render_html(_PHONE_NUMBERS))
# Start local HTTP server (always on; a fresh instance per _launch).
if self._local_server is not None and self._local_server.is_alive():
self._local_server.stop()
self._local_server = LocalHttpServer(self._output_dir, self._local_port)
try:
self._local_server.start()
local_url_msg = f" local={self._local_server.url}"
except RuntimeError as exc:
local_url_msg = f" local=ERROR({exc})"
self._log(VERBOSITY_NORMAL, str(exc), level="warning")
# Upload static files (html + logo) once at startup # Upload static files (html + logo) once at startup
if self._uploader is not None: if self._uploader is not None:
self._uploader.upload_dir_async(self._output_dir) self._uploader.upload_dir_async(self._output_dir)
@@ -430,13 +541,20 @@ class WebpageGeneratorBase:
self._log(VERBOSITY_NORMAL, self._log(VERBOSITY_NORMAL,
f"WebpageGenerator started owner={self._owner_id} " f"WebpageGenerator started owner={self._owner_id} "
f"output={self._output_dir} interval={self._cycle_interval}s" f"output={self._output_dir} interval={self._cycle_interval}s"
+ (f" upload={self._uploader._url}" if self._uploader else " upload=disabled")) + (f" upload={self._uploader._url}" if self._uploader else " upload=disabled")
+ f"\n ➜ Status page:{local_url_msg}")
def stop(self) -> None: def stop(self) -> None:
"""Stop the generator thread and release the singleton lock.""" """Stop the generator thread, local HTTP server, and release the singleton lock."""
self._stop_event.set() self._stop_event.set()
if self._thread is not None: if self._thread is not None:
self._thread.join(timeout=self._cycle_interval + 5) self._thread.join(timeout=self._cycle_interval + 5)
# Always clear the reference so start() gets a clean slate,
# even if the thread did not exit within the join timeout.
self._thread = None
if self._local_server is not None:
self._local_server.stop()
self._local_server = None
self._release_lock() self._release_lock()
self._log(VERBOSITY_NORMAL, "WebpageGenerator stopped.") self._log(VERBOSITY_NORMAL, "WebpageGenerator stopped.")
@@ -453,12 +571,14 @@ class WebpageGeneratorBase:
"""Print a human-readable status summary to the console.""" """Print a human-readable status summary to the console."""
lock = self._read_lock() lock = self._read_lock()
running = self._thread is not None and self._thread.is_alive() running = self._thread is not None and self._thread.is_alive()
local = self._local_server.url if (self._local_server and self._local_server.is_alive()) else "stopped"
print( print(
f"WebpageGenerator\n" f"WebpageGenerator\n"
f" This session running : {running}\n" f" This session running : {running}\n"
f" Lock owner : {lock.get('owner_id', 'none')}\n" f" Lock owner : {lock.get('owner_id', 'none')}\n"
f" Lock heartbeat : {lock.get('heartbeat', 'never')}\n" f" Lock heartbeat : {lock.get('heartbeat', 'never')}\n"
f" Output dir : {self._output_dir}\n" f" Output dir : {self._output_dir}\n"
f" Local URL : {local}\n"
f" Cycle interval : {self._cycle_interval}s\n" f" Cycle interval : {self._cycle_interval}s\n"
f" Upload URL : {self._uploader._url if self._uploader else 'disabled'}\n" f" Upload URL : {self._uploader._url if self._uploader else 'disabled'}\n"
f" Verbosity : {self._verbosity}\n" f" Verbosity : {self._verbosity}\n"
@@ -1245,21 +1365,6 @@ def _render_html(phone_numbers: list) -> str:
}} }}
.info-item .value {{ font-size: 0.9rem; font-weight: 600; color: var(--text); }} .info-item .value {{ font-size: 0.9rem; font-weight: 600; color: var(--text); }}
.bar-wrap {{ grid-column: 1 / -1; display: flex; flex-direction: column; gap: 0.35rem; }}
.bar-label {{
display: flex; justify-content: space-between;
font-family: var(--mono); font-size: 0.6rem; color: var(--text-dim);
letter-spacing: 0.06em; text-transform: uppercase;
}}
.bar-track {{ height: 5px; background: var(--surface2); border-radius: 99px; overflow: hidden; }}
.bar-fill {{
height: 100%;
background: var(--ring-blend);
background: color-mix(in srgb, var(--status-color) 65%, var(--surface2));
border-radius: 99px;
transition: width 0.8s cubic-bezier(.4,0,.2,1), background 0.6s;
}}
/* ── Recon card ── */ /* ── Recon card ── */
.recon-stats {{ display: flex; gap: 2rem; flex-wrap: wrap; }} .recon-stats {{ display: flex; gap: 2rem; flex-wrap: wrap; }}
.recon-stat {{ display: flex; flex-direction: column; gap: 0.15rem; }} .recon-stat {{ display: flex; flex-direction: column; gap: 0.15rem; }}
@@ -1502,10 +1607,7 @@ def _render_html(phone_numbers: list) -> str:
<div class="info-item"><span class="label">ETA</span><span class="value" id="pi-eta">-</span></div> <div class="info-item"><span class="label">ETA</span><span class="value" id="pi-eta">-</span></div>
<div class="info-item"><span class="label">Started</span><span class="value" id="pi-start">-</span></div> <div class="info-item"><span class="label">Started</span><span class="value" id="pi-start">-</span></div>
</div> </div>
<div class="bar-wrap">
<div class="bar-label"><span>Sub-tomo progress</span><span id="bar-sub-label">-</span></div>
<div class="bar-track"><div class="bar-fill" id="bar-sub-fill" style="width:0%"></div></div>
</div>
</div> </div>
@@ -1667,90 +1769,80 @@ function initDrag() {{
}} }}
initDrag(); initDrag();
// ── Audio ───────────────────────────────────────────────────────────────── // ── Audio ─────────────────────────────────────────────────────────────────
// iOS (all browsers on iPhone use WebKit) requires: // iOS (all browsers on iPhone use WebKit) strict rules:
// 1. AudioContext created inside a user gesture. // 1. AudioContext must be created inside a user gesture handler.
// 2. A real (even silent) BufferSource started synchronously in the gesture. // 2. A real BufferSource must be started SYNCHRONOUSLY in the gesture
// 3. ctx.resume() awaited before scheduling audible nodes. // .then() / microtasks run outside the gesture and are rejected.
// We combine all three: silent unlock buffer + resume promise + .then(beeps). // 3. ctx.resume() is called fire-and-forget; beeps are delayed 80ms by
// setTimeout so the engine has time to start before nodes are scheduled.
//
// unlockAudio() handles all of this and must be called at the TOP of any
// onclick handler that wants audio — before any other logic.
let audioCtx = null, audioEnabled = false; let audioCtx=null, audioEnabled=false;
let audioArmed = false, warningActive = false, warningTimer = null, lastStatus = null; let audioArmed=false, warningActive=false, warningTimer=null, lastStatus=null;
let staleActive = false, staleTimer = null, staleConfirmed = false; let staleActive=false, staleTimer=null, staleConfirmed=false;
function getCtx() {{ function getCtx(){{
if (!audioCtx) audioCtx = new (window.AudioContext || window.webkitAudioContext)(); if(!audioCtx) audioCtx=new(window.AudioContext||window.webkitAudioContext)();
return audioCtx; return audioCtx;
}} }}
function unlockAudio() {{ function unlockAudio(){{
// Must be called synchronously inside a user gesture handler. // Synchronous silent 1-sample buffer — the only reliable iOS unlock.
// Plays a 1-sample silent buffer — the most reliable iOS unlock method. // Must be called synchronously at the start of a user gesture handler.
const ctx = getCtx(); const ctx=getCtx();
const buf = ctx.createBuffer(1, 1, ctx.sampleRate); const buf=ctx.createBuffer(1,1,ctx.sampleRate);
const src = ctx.createBufferSource(); const src=ctx.createBufferSource();
src.buffer = buf; src.buffer=buf; src.connect(ctx.destination); src.start(0);
src.connect(ctx.destination); if(ctx.state==='suspended') ctx.resume(); // fire-and-forget
src.start(0);
// resume() is async; return the promise so callers can chain.
return ctx.state === 'suspended' ? ctx.resume() : Promise.resolve();
}} }}
function beep(freq, dur, vol) {{ function beep(freq,dur,vol){{
try {{ try{{
const ctx = getCtx(); const ctx=getCtx(),o=ctx.createOscillator(),g=ctx.createGain();
const o = ctx.createOscillator();
const g = ctx.createGain();
o.connect(g); g.connect(ctx.destination); o.connect(g); g.connect(ctx.destination);
o.type = 'sine'; o.type='sine'; o.frequency.setValueAtTime(freq,ctx.currentTime);
o.frequency.setValueAtTime(freq, ctx.currentTime); g.gain.setValueAtTime(vol,ctx.currentTime);
g.gain.setValueAtTime(vol, ctx.currentTime); g.gain.exponentialRampToValueAtTime(0.001,ctx.currentTime+dur);
g.gain.exponentialRampToValueAtTime(0.001, ctx.currentTime + dur); o.start(); o.stop(ctx.currentTime+dur);
o.start(); o.stop(ctx.currentTime + dur); }}catch(e){{console.warn('Audio:',e);}}
}} catch(e) {{ console.warn('Audio beep:', e); }}
}} }}
function warningChime() {{ function warningChime(){{
beep(660, 0.3, 0.4); beep(660,0.3,0.4); setTimeout(()=>beep(440,0.5,0.4),350);
setTimeout(() => beep(440, 0.5, 0.4), 350); }}
function staleChime(){{
beep(1200,0.12,0.35);
setTimeout(()=>beep(1200,0.12,0.35),180);
setTimeout(()=>beep(1200,0.25,0.35),360);
}} }}
function staleChime() {{ function testSound(){{
beep(1200, 0.12, 0.35); // Gesture handler — unlock first, then delay beeps 80ms for resume().
setTimeout(() => beep(1200, 0.12, 0.35), 180); unlockAudio();
setTimeout(() => beep(1200, 0.25, 0.35), 360); setTimeout(()=>beep(880, 0.15,0.4), 80);
setTimeout(()=>beep(1100,0.15,0.4),260);
setTimeout(()=>beep(880, 0.3, 0.4),440);
}} }}
function testSound() {{ function toggleAudio(){{
// Called directly from onclick — gesture is active here. // Gesture handler — unlock first (synchronous), then do logic.
unlockAudio().then(() => {{ unlockAudio();
beep(880, 0.15, 0.4); audioEnabled=!audioEnabled;
setTimeout(() => beep(1100, 0.15, 0.4), 180); localStorage.setItem('audioEnabled',audioEnabled);
setTimeout(() => beep(880, 0.3, 0.4), 360); if(!audioEnabled){{
}}); stopWarning(); audioArmed=false; warningActive=false;
document.getElementById('btn-confirm').style.display='none';
stopStaleWarning(); staleActive=false; staleConfirmed=false;
document.getElementById('btn-confirm-stale').style.display='none';
}} else {{
if(lastStatus==='scanning' && !audioArmed) audioArmed=true;
}}
updateAudioUI();
}} }}
function toggleAudio() {{
// Called directly from onclick — gesture is active here.
unlockAudio().then(() => {{
audioEnabled = !audioEnabled;
localStorage.setItem('audioEnabled', audioEnabled);
if (!audioEnabled) {{
stopWarning();
audioArmed = false; warningActive = false;
document.getElementById('btn-confirm').style.display = 'none';
stopStaleWarning();
staleActive = false; staleConfirmed = false;
document.getElementById('btn-confirm-stale').style.display = 'none';
}} else {{
if (lastStatus === 'scanning' && !audioArmed) audioArmed = true;
}}
updateAudioUI();
}});
}}
function confirmWarning(){{ function confirmWarning(){{
stopWarning(); stopWarning();
warningActive=false; warningActive=false;
@@ -1759,9 +1851,10 @@ function confirmWarning(){{
}} }}
function startWarning(){{ function startWarning(){{
// Not a gesture handler — context already unlocked by Enable button click.
if(warningActive) return; if(warningActive) return;
warningActive=true; warningActive=true;
if(audioEnabled) warningChime(); // returns promise; chime plays after unlock if(audioEnabled) warningChime();
warningTimer=setInterval(()=>{{ if(audioEnabled) warningChime(); }},30000); warningTimer=setInterval(()=>{{ if(audioEnabled) warningChime(); }},30000);
document.getElementById('btn-confirm').style.display='inline-block'; document.getElementById('btn-confirm').style.display='inline-block';
updateAudioUI(); updateAudioUI();
@@ -1780,9 +1873,10 @@ function confirmStale(){{
}} }}
function startStaleWarning(){{ function startStaleWarning(){{
// Not a gesture handler — context already unlocked by Enable button click.
if(staleActive || staleConfirmed) return; if(staleActive || staleConfirmed) return;
staleActive=true; staleActive=true;
if(audioEnabled) staleChime(); // returns promise; chime plays after unlock if(audioEnabled) staleChime();
staleTimer=setInterval(()=>{{ if(audioEnabled) staleChime(); }},30000); staleTimer=setInterval(()=>{{ if(audioEnabled) staleChime(); }},30000);
document.getElementById('btn-confirm-stale').style.display='inline-block'; document.getElementById('btn-confirm-stale').style.display='inline-block';
updateAudioUI(); updateAudioUI();
@@ -1964,8 +2058,7 @@ function render(d){{
document.getElementById('pi-type').textContent=p.tomo_type||'-'; document.getElementById('pi-type').textContent=p.tomo_type||'-';
document.getElementById('pi-eta').textContent=p.estimated_remaining_human||'-'; document.getElementById('pi-eta').textContent=p.estimated_remaining_human||'-';
document.getElementById('pi-start').textContent=fmtTime(p.tomo_start_time); document.getElementById('pi-start').textContent=fmtTime(p.tomo_start_time);
document.getElementById('bar-sub-label').textContent=(p.subtomo_projection||0)+' / '+(p.subtomo_total_projections||0);
document.getElementById('bar-sub-fill').style.width=(sPct*100).toFixed(1)+'%';
if(d.recon){{ if(d.recon){{
document.getElementById('recon-waiting').textContent=d.recon.waiting; document.getElementById('recon-waiting').textContent=d.recon.waiting;
const fv=document.getElementById('recon-failed'); const fv=document.getElementById('recon-failed');
@@ -1990,7 +2083,7 @@ function render(d){{
async function poll(){{ async function poll(){{
try{{ try{{
const r=await fetch(STATUS_JSON+'?t='+Date.now()); const r=await fetch(STATUS_JSON, {{cache:'no-store'}});
if(!r.ok) throw new Error('HTTP '+r.status); if(!r.ok) throw new Error('HTTP '+r.status);
render(await r.json()); render(await r.json());
}}catch(e){{ }}catch(e){{
@@ -2011,4 +2104,4 @@ poll();
</script> </script>
</body> </body>
</html> </html>
""" """

View File

@@ -544,66 +544,6 @@ sl5trxt:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel # bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 5 bl_smar_stage: 5
sl5ch:
description: ESbox1 slit 5 center horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5wh:
description: ESbox1 slit 5 width horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5cv:
description: ESbox1 slit 5 center vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
sl5wv:
description: ESbox1 slit 5 width vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
xbimtrx: xbimtrx:
description: ESbox2 beam intensity monitor x movement description: ESbox2 beam intensity monitor x movement
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
@@ -882,37 +822,4 @@ dettrx:
onFailure: retry onFailure: retry
enabled: true enabled: true
readoutPriority: baseline readoutPriority: baseline
softwareTrigger: false softwareTrigger: false
####################
### Beamstop control for flight tube
####################
beamstop_control:
description: Gain control for beamstop flightube
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
deviceConfig:
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft
galilrioesft:
description: Galil RIO for remote gain switching and slow reading FlightTube
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20

View File

@@ -199,25 +199,6 @@ xbpm1c4:
readOnly: true readOnly: true
softwareTrigger: false softwareTrigger: false
bpm1:
description: 'XBPM1 (frontend)'
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
left_top: xbpm1c1
right_top: xbpm1c2
right_bot: xbpm1c3
left_bot: xbpm1c4
onFailure: raise
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
needs:
- xbpm1c1
- xbpm1c2
- xbpm1c3
- xbpm1c4
############################################ ############################################
######### End of xbpm sub devices ########## ######### End of xbpm sub devices ##########
############################################ ############################################

View File

@@ -68,22 +68,18 @@ ccmx:
- cSAXS - cSAXS
- optics - optics
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS! # ccm_energy:
ccm_energy: # readoutPriority: baseline
description: 'test' # deviceClass: ophyd_devices.devices.simple_positioner.PSIPositionerBase
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner # prefix: "X12SA-OP-CCM1:"
deviceConfig: # override_suffixes:
prefix: 'X12SA-OP-CCM1:' # user_readback: "ENERGY-GET"
override_suffixes: # user_setpoint: "ENERGY-SET"
user_readback: "ENERGY-GET" # velocity: "ROTY:VELO"
user_setpoint: "ENERGY-SET" # deviceTags:
velocity: "ROTY.VELO" # - user motors
motor_done_move: "ROTY.DMOV" # enabled: true
onFailure: buffer # readOnly: false
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false

View File

@@ -1,24 +0,0 @@
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
bpm1:
readoutPriority: baseline
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: galilrioesxbox.analog_in.ch0
blade_r: galilrioesxbox.analog_in.ch1
blade_b: galilrioesxbox.analog_in.ch2
blade_l: galilrioesxbox.analog_in.ch3
enabled: true
readOnly: false
softwareTrigger: true
needs:
- galilrioesxbox

View File

@@ -13,6 +13,14 @@ from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger logger = bec_logger.logger
class MonitorSignal(Signal):
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
def __init__(self, *, name, auto_monitor=False, **kwargs):
super().__init__(name=name, **kwargs)
self.auto_monitor = auto_monitor
class OMNYFastShutter(PSIDeviceBase, Device): class OMNYFastShutter(PSIDeviceBase, Device):
""" """
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
@@ -26,7 +34,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
SUB_VALUE = "value" SUB_VALUE = "value"
_default_sub = SUB_VALUE _default_sub = SUB_VALUE
shutter = Cpt(Signal, name="shutter") shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
# ----------------------------------------------------- # -----------------------------------------------------
# User-facing shutter control functions # User-facing shutter control functions
@@ -48,6 +56,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshopen(self): def fshopen(self):
"""Open the fast shutter.""" """Open the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config(): if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(1)
return self.device_manager.devices["fsh"].fshopen() return self.device_manager.devices["fsh"].fshopen()
else: else:
self.shutter.put(1) self.shutter.put(1)
@@ -55,6 +64,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshclose(self): def fshclose(self):
"""Close the fast shutter.""" """Close the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config(): if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(0)
return self.device_manager.devices["fsh"].fshclose() return self.device_manager.devices["fsh"].fshclose()
else: else:
self.shutter.put(0) self.shutter.put(0)

View File

@@ -1,172 +0,0 @@
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
class BPM(PSIPseudoDeviceBase):
"""BPM positioner pseudo device."""
# Blade signals, a,b,c,d
left_top = Cpt(
BECProcessedSignal,
name="left_top",
model_config=None,
kind=Kind.config,
doc="BPM left_top blade",
)
right_top = Cpt(
BECProcessedSignal,
name="right_top",
model_config=None,
kind=Kind.config,
doc="BPM right_top blade",
)
right_bot = Cpt(
BECProcessedSignal,
name="right_bot",
model_config=None,
kind=Kind.config,
doc="BPM right_bottom blade",
)
left_bot = Cpt(
BECProcessedSignal,
name="left_bot",
model_config=None,
kind=Kind.config,
doc="BPM left_bot blade",
)
# Virtual signals
pos_x = Cpt(
BECProcessedSignal,
name="pos_x",
model_config=None,
kind=Kind.config,
doc="BPM X position, -1 fully left, 1 fully right",
)
pos_y = Cpt(
BECProcessedSignal,
name="pos_y",
model_config=None,
kind=Kind.config,
doc="BPM Y position, -1 fully bottom, 1 fully top",
)
diagonal = Cpt(
BECProcessedSignal,
name="diagonal",
model_config=None,
kind=Kind.config,
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
)
intensity = Cpt(
BECProcessedSignal,
name="intensity",
model_config=None,
kind=Kind.config,
doc="BPM intensity",
)
def __init__(
self,
name,
left_top: str,
right_top: str,
right_bot: str,
left_bot: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all blade signal objects from utility method
signal_t = self.left_top.get_device_object_from_bec(
object_name=left_top, signal_name=self.name, device_manager=device_manager
)
signal_r = self.right_top.get_device_object_from_bec(
object_name=right_top, signal_name=self.name, device_manager=device_manager
)
signal_b = self.right_bot.get_device_object_from_bec(
object_name=right_bot, signal_name=self.name, device_manager=device_manager
)
signal_l = self.left_bot.get_device_object_from_bec(
object_name=left_bot, signal_name=self.name, device_manager=device_manager
)
# Set compute methods for blade signals and virtual signals
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
self.intensity.set_compute_method(
self._compute_intensity,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
left_bot=self.left_bot,
)
self.pos_x.set_compute_method(
self._compute_pos_x,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.pos_y.set_compute_method(
self._compute_pos_y,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.diagonal.set_compute_method(
self._compute_diagonal,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
def _compute_blade_signal(self, signal: Signal) -> float:
return signal.get()
def _compute_intensity(
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
) -> float:
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
return intensity
def _compute_pos_x(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
sum_left = left_bot.get() + left_top.get()
sum_right = right_top.get() + right_bot.get()
sum_total = sum_left + sum_right
if sum_total == 0:
return 0.0
return (sum_right - sum_left) / sum_total
def _compute_pos_y(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
sum_top = left_top.get() + right_top.get()
sum_bot = right_bot.get() + left_bot.get()
sum_total = sum_top + sum_bot
if sum_total == 0:
return 0.0
return (sum_top - sum_bot) / sum_total
def _compute_diagonal(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
sum_diag1 = left_bot.get() + right_top.get()
sum_diag2 = left_top.get() + right_bot.get()
sum_total = sum_diag1 + sum_diag2
if sum_total == 0:
return 0.0
return (sum_diag1 - sum_diag2) / sum_total

View File

@@ -1,189 +0,0 @@
"""
Module for controlling the BPM amplifier settings, such as gain and coupling.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from ophyd import Component as Cpt
from ophyd import Kind
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
from ophyd import Signal
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class BPMControl(PSIPseudoDeviceBase):
"""
BPM amplifier control pseudo device. It is responsible for controlling the
gain and coupling for the BPM amplifier. It relies on signals from a device
in BEC to be available. For cSAXS, these are most liikely to be from the
GalilRIO device that controls the BPM amplifier.
Args:
name (str): Name of the pseudo device.
gain_lsb (str): Name of the signal in BEC that controls the LSB
of the gain setting.
gain_mid (str): Name of the signal in BEC that controls the MID
bit of the gain setting.
gain_msb (str): Name of the signal in BEC that controls the MSB
of the gain setting.
coupling (str): Name of the signal in BEC that controls the coupling
setting.
speed_mode (str): Name of the signal in BEC that controls the speed mode
(low-noise vs high-speed) of the amplifier.
"""
USER_ACCESS = ["set_gain", "set_coupling"]
gain = Cpt(
BECProcessedSignal,
name="gain",
model_config=None,
kind=Kind.config,
doc="Gain of the amplifier",
)
coupling = Cpt(
BECProcessedSignal,
name="coupling",
model_config=None,
kind=Kind.config,
doc="Coupling of the amplifier",
)
speed = Cpt(
BECProcessedSignal,
name="speed",
model_config=None,
kind=Kind.config,
doc="Speed of the amplifier",
)
def __init__(
self,
name: str,
gain_lsb: str,
gain_mid: str,
gain_msb: str,
coupling: str,
speed_mode: str,
device_manager: DeviceManagerDS | None = None,
scan_info: ScanInfo | None = None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
self._gain_lsb = self.gain.get_device_object_from_bec(
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
)
self._gain_mid = self.gain.get_device_object_from_bec(
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
)
self._gain_msb = self.gain.get_device_object_from_bec(
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
)
self._coupling = self.gain.get_device_object_from_bec(
object_name=coupling, signal_name=self.name, device_manager=device_manager
)
self._speed_mode = self.gain.get_device_object_from_bec(
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
)
# Set the compute methods for the virtual signals.
self.gain.set_compute_method(
self._compute_gain,
msb=self._gain_msb,
mid=self._gain_mid,
lsb=self._gain_lsb,
speed_mode=self._speed_mode,
)
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
def set_gain(
self,
gain: Literal[
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
],
) -> None:
"""
Set the gain of the amplifier.
Args:
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
"""
gain_int = int(gain)
if gain_int not in VALID_GAINS:
raise ValueError(
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
)
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
self._gain_msb.set(bool(msb)).wait(timeout=2)
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
self._gain_mid.set(bool(mid)).wait(timeout=2)
self._speed_mode.set(bool(use_low_noise))
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
"""
Set the coupling of the amplifier.
Args:
coupling (Literal): Must be either "AC" or "DC".
"""
if coupling not in ["AC", "DC"]:
raise ValueError(
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
)
self._coupling.set(coupling == "DC").wait(timeout=2)
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
"""Compute the gain based on the bits and speed mode."""
bits = (msb.get(), mid.get(), lsb.get())
speed_mode = speed_mode.get()
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
def _compute_coupling(self, coupling: Signal) -> str:
"""Compute the coupling based on the signal."""
return "DC" if coupling.get() else "AC"
def _compute_speed(self, speed: Signal) -> str:
"""Compute the speed based on the signal."""
return "low_speed" if speed.get() else "high_speed"

View File

@@ -1 +0,0 @@
# from ophyd

View File

@@ -1 +1 @@
from .csaxs_nexus import NeXus_format as cSAXS_NeXus_format from .csaxs_nexus import cSAXSNeXusFormat

View File

@@ -1,445 +1,472 @@
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Any
import numpy as np import numpy as np
from bec_server.file_writer.default_writer import DefaultFormat
if TYPE_CHECKING:
from bec_lib.devicemanager import DeviceManagerBase
from bec_server.file_writer.file_writer import HDF5Storage
def get_entry(data: dict, name: str, default=None) -> Any: class cSAXSNeXusFormat(DefaultFormat):
""" """
Get an entry from the scan data assuming a <device>.<device>.value structure. NeXus file format for cSAXS beamline. This format is based on the default NeXus format, but with some additional entries specific to the cSAXS beamline. The structure of the file is based on the NeXus standard, but with some additional groups and datasets specific to the cSAXS beamline.
Args:
data (dict): Scan data
name (str): Entry name
default (Any, optional): Default value. Defaults to None.
""" """
if isinstance(data.get(name), list) and isinstance(data.get(name)[0], dict):
return [sub_data.get(name, {}).get("value", default) for sub_data in data.get(name)]
return data.get(name, {}).get(name, {}).get("value", default) def format(self) -> None:
"""
Prepare the NeXus file format.
Override this method in file writer plugins to customize the HDF5 file format.
The class provides access to the following attributes:
- self.storage: The HDF5Storage object.
- self.data: The data dictionary.
- self.file_references: The file references dictionary, which has the link to external data.
- self.device_manager: The DeviceManagerBase object.
- self.get_entry(name, default=None): Helper method to get an entry from the data dictionary.
def NeXus_format( See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`.
storage: HDF5Storage, data: dict, file_references: dict, device_manager: DeviceManagerBase
) -> HDF5Storage:
"""
Prepare the NeXus file format.
Args: """
storage (HDF5Storage): HDF5 storage. Pseudo hdf5 file container that will be written to disk later.
data (dict): scan data
file_references (dict): File references. Can be used to add external files to the HDF5 file. The path is given relative to the HDF5 file.
device_manager (DeviceManagerBase): Device manager. Can be used to check if devices are available.
Returns: # entry = self.storage.create_group("entry")
HDF5Storage: Updated HDF5 storage
"""
# /entry
entry = storage.create_group("entry")
entry.attrs["NX_class"] = "NXentry"
entry.attrs["definition"] = "NXsas"
entry.attrs["start_time"] = data.get("start_time")
entry.attrs["end_time"] = data.get("end_time")
entry.attrs["version"] = 1.0
# /entry/collection # # /entry/control
collection = entry.create_group("collection") # control = entry.create_group("control")
collection.attrs["NX_class"] = "NXcollection" # control.attrs["NX_class"] = "NXmonitor"
bec_collection = collection.create_group("bec") # control.create_dataset(name="mode", data="monitor")
# /entry/control # #########
control = entry.create_group("control") # # EXAMPLE for soft link
control.attrs["NX_class"] = "NXmonitor" # #########
control.create_dataset(name="mode", data="monitor") # # /entry/data
control.create_dataset(name="integral", data=get_entry(data, "bpm4i")) # if "eiger_4" in self.device_manager.devices:
# entry.create_soft_link(name="data", target="/entry/instrument/eiger_4")
# /entry/data # ########
main_data = entry.create_group("data") # # EXAMPLE for external link
main_data.attrs["NX_class"] = "NXdata" # ########
if "eiger_4" in device_manager.devices: # # control = entry.create_group("sample")
main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data") # # control.create_ext_link("data", self.file_references["eiger9m"]["path"], "EG9M/data")
elif "eiger9m" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
elif "pilatus_2" in device_manager.devices:
main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
# /entry/sample # # /entry/sample
control = entry.create_group("sample") # control = entry.create_group("sample")
control.attrs["NX_class"] = "NXsample" # control.attrs["NX_class"] = "NXsample"
control.create_dataset(name="name", data=get_entry(data, "samplename")) # control.create_dataset(name="name", data=self.data.get("samplename"))
control.create_dataset(name="description", data=data.get("sample_description")) # control.create_dataset(name="description", data=self.data.get("sample_description"))
x_translation = control.create_dataset(name="x_translation", data=get_entry(data, "samx"))
x_translation.attrs["units"] = "mm"
y_translation = control.create_dataset(name="y_translation", data=get_entry(data, "samy"))
y_translation.attrs["units"] = "mm"
temperature_log = control.create_dataset(name="temperature_log", data=get_entry(data, "temp"))
temperature_log.attrs["units"] = "K"
# /entry/instrument # # /entry/instrument
instrument = entry.create_group("instrument") # instrument = entry.create_group("instrument")
instrument.attrs["NX_class"] = "NXinstrument" # instrument.attrs["NX_class"] = "NXinstrument"
instrument.create_dataset(name="name", data="cSAXS beamline")
source = instrument.create_group("source") # source = instrument.create_group("source")
source.attrs["NX_class"] = "NXsource" # source.attrs["NX_class"] = "NXsource"
source.create_dataset(name="type", data="Synchrotron X-ray Source") # source.create_dataset(name="type", data="Synchrotron X-ray Source")
source.create_dataset(name="name", data="Swiss Light Source") # source.create_dataset(name="name", data="Swiss Light Source")
source.create_dataset(name="probe", data="x-ray") # source.create_dataset(name="probe", data="x-ray")
distance = source.create_dataset(
name="distance", data=-33800 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
sigma_x = source.create_dataset(name="sigma_x", data=0.202)
sigma_x.attrs["units"] = "mm"
sigma_y = source.create_dataset(name="sigma_y", data=0.018)
sigma_y.attrs["units"] = "mm"
divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
divergence_x.attrs["units"] = "radians"
divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
divergence_y.attrs["units"] = "radians"
current = source.create_dataset(name="current", data=get_entry(data, "curr"))
current.attrs["units"] = "mA"
insertion_device = instrument.create_group("insertion_device") # # /entry
insertion_device.attrs["NX_class"] = "NXinsertion_device" # entry = self.storage.create_group("entry")
source.create_dataset(name="type", data="undulator") # entry.attrs["NX_class"] = "NXentry"
gap = source.create_dataset(name="gap", data=get_entry(data, "idgap")) # entry.attrs["definition"] = "NXsas"
gap.attrs["units"] = "mm" # entry.attrs["start_time"] = self.data.get("start_time")
k = source.create_dataset(name="k", data=2.46) # entry.attrs["end_time"] = self.data.get("end_time")
k.attrs["units"] = "NX_DIMENSIONLESS" # entry.attrs["version"] = 1.0
length = source.create_dataset(name="length", data=1820)
length.attrs["units"] = "mm"
slit_0 = instrument.create_group("slit_0") # # /entry/control
slit_0.attrs["NX_class"] = "NXslit" # control = entry.create_group("control")
source.create_dataset(name="material", data="OFHC Cu") # control.attrs["NX_class"] = "NXmonitor"
source.create_dataset(name="description", data="Horizontal secondary source slit") # control.create_dataset(name="mode", data="monitor")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl0wh")) # control.create_dataset(name="integral", data=self.get_entry("bpm4i"))
x_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl0ch"))
x_translation.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-21700 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
slit_1 = instrument.create_group("slit_1") # # /entry/data
slit_1.attrs["NX_class"] = "NXslit" # main_data = entry.create_group("data")
source.create_dataset(name="material", data="OFHC Cu") # main_data.attrs["NX_class"] = "NXdata"
source.create_dataset(name="description", data="Horizontal secondary source slit") # if "eiger_4" in self.device_manager.devices:
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl1wh")) # main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
x_gap.attrs["units"] = "mm" # elif "eiger9m" in self.device_manager.devices:
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl1wv")) # main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
y_gap.attrs["units"] = "mm" # elif "pilatus_2" in self.device_manager.devices:
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch")) # main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl1ch"))
height.attrs["units"] = "mm"
distance = source.create_dataset(
name="distance", data=-7800 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
mono = instrument.create_group("monochromator") # # /entry/sample
mono.attrs["NX_class"] = "NXmonochromator" # control = entry.create_group("sample")
mokev = data.get("mokev", {}) # control.attrs["NX_class"] = "NXsample"
if mokev: # control.create_dataset(name="name", data=self.get_entry("samplename"))
if isinstance(mokev, list): # control.create_dataset(name="description", data=self.data.get("sample_description"))
mokev = mokev[0] # x_translation = control.create_dataset(name="x_translation", data=self.get_entry("samx"))
wavelength = mono.create_dataset( # x_translation.attrs["units"] = "mm"
name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9) # y_translation = control.create_dataset(name="y_translation", data=self.get_entry("samy"))
) # y_translation.attrs["units"] = "mm"
wavelength.attrs["units"] = "Angstrom" # temperature_log = control.create_dataset(
energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value")) # name="temperature_log", data=self.get_entry("temp")
energy.attrs["units"] = "keV" # )
mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.") # temperature_log.attrs["units"] = "K"
distance = mono.create_dataset(
name="distance", data=-5220 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
crystal_1 = mono.create_group("crystal_1") # # /entry/instrument
crystal_1.attrs["NX_class"] = "NXcrystal" # instrument = entry.create_group("instrument")
crystal_1.create_dataset(name="usage", data="Bragg") # instrument.attrs["NX_class"] = "NXinstrument"
crystal_1.create_dataset(name="order_no", data="1") # instrument.create_dataset(name="name", data="cSAXS beamline")
crystal_1.create_dataset(name="reflection", data="[1 1 1]")
bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=get_entry(data, "moth1"))
bragg_angle.attrs["units"] = "degrees"
crystal_2 = mono.create_group("crystal_2") # source = instrument.create_group("source")
crystal_2.attrs["NX_class"] = "NXcrystal" # source.attrs["NX_class"] = "NXsource"
crystal_2.create_dataset(name="usage", data="Bragg") # source.create_dataset(name="type", data="Synchrotron X-ray Source")
crystal_2.create_dataset(name="order_no", data="2") # source.create_dataset(name="name", data="Swiss Light Source")
crystal_2.create_dataset(name="reflection", data="[1 1 1]") # source.create_dataset(name="probe", data="x-ray")
bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=get_entry(data, "moth1")) # distance = source.create_dataset(
bragg_angle.attrs["units"] = "degrees" # name="distance", data=-33800 - np.asarray(self.get_entry("samz", 0))
bend_x = crystal_2.create_dataset(name="bend_x", data=get_entry(data, "mobd")) # )
bend_x.attrs["units"] = "degrees" # distance.attrs["units"] = "mm"
# sigma_x = source.create_dataset(name="sigma_x", data=0.202)
# sigma_x.attrs["units"] = "mm"
# sigma_y = source.create_dataset(name="sigma_y", data=0.018)
# sigma_y.attrs["units"] = "mm"
# divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
# divergence_x.attrs["units"] = "radians"
# divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
# divergence_y.attrs["units"] = "radians"
# current = source.create_dataset(name="current", data=self.get_entry("curr"))
# current.attrs["units"] = "mA"
xbpm4 = instrument.create_group("XBPM4") # insertion_device = instrument.create_group("insertion_device")
xbpm4.attrs["NX_class"] = "NXdetector" # insertion_device.attrs["NX_class"] = "NXinsertion_device"
xbpm4_sum = xbpm4.create_group("XBPM4_sum") # source.create_dataset(name="type", data="undulator")
xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=get_entry(data, "bpm4s")) # gap = source.create_dataset(name="gap", data=self.get_entry("idgap"))
xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS" # gap.attrs["units"] = "mm"
xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.") # k = source.create_dataset(name="k", data=2.46)
xbpm4_x = xbpm4.create_group("XBPM4_x") # k.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=get_entry(data, "bpm4x")) # length = source.create_dataset(name="length", data=1820)
xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS" # length.attrs["units"] = "mm"
xbpm4_x.create_dataset(
name="description", data="Normalized difference of counts between left and right quadrants."
)
xbpm4_y = xbpm4.create_group("XBPM4_y")
xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=get_entry(data, "bpm4y"))
xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_y.create_dataset(
name="description", data="Normalized difference of counts between high and low quadrants."
)
xbpm4_skew = xbpm4.create_group("XBPM4_skew")
xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=get_entry(data, "bpm4z"))
xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm4_skew.create_dataset(
name="description", data="Normalized difference of counts between diagonal quadrants."
)
mirror = instrument.create_group("mirror") # slit_0 = instrument.create_group("slit_0")
mirror.attrs["NX_class"] = "NXmirror" # slit_0.attrs["NX_class"] = "NXslit"
mirror.create_dataset(name="type", data="single") # source.create_dataset(name="material", data="OFHC Cu")
mirror.create_dataset( # source.create_dataset(name="description", data="Horizontal secondary source slit")
name="description", # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl0wh"))
data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).", # x_gap.attrs["units"] = "mm"
) # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl0ch"))
incident_angle = mirror.create_dataset(name="incident_angle", data=get_entry(data, "mith")) # x_translation.attrs["units"] = "mm"
incident_angle.attrs["units"] = "degrees" # distance = source.create_dataset(
substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2") # name="distance", data=-21700 - np.asarray(self.get_entry("samz", 0))
substrate_material.attrs["units"] = "NX_CHAR" # )
coating_material = mirror.create_dataset(name="coating_material", data="SiO2") # distance.attrs["units"] = "mm"
coating_material.attrs["units"] = "NX_CHAR"
bend_y = mirror.create_dataset(name="bend_y", data="mibd")
bend_y.attrs["units"] = "NX_DIMENSIONLESS"
distance = mirror.create_dataset(
name="distance", data=-4370 - np.asarray(get_entry(data, "samz", 0))
)
distance.attrs["units"] = "mm"
xbpm5 = instrument.create_group("XBPM5") # slit_1 = instrument.create_group("slit_1")
xbpm5.attrs["NX_class"] = "NXdetector" # slit_1.attrs["NX_class"] = "NXslit"
xbpm5_sum = xbpm5.create_group("XBPM5_sum") # source.create_dataset(name="material", data="OFHC Cu")
xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=get_entry(data, "bpm5s")) # source.create_dataset(name="description", data="Horizontal secondary source slit")
xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS" # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl1wh"))
xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.") # x_gap.attrs["units"] = "mm"
xbpm5_x = xbpm5.create_group("XBPM5_x") # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl1wv"))
xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=get_entry(data, "bpm5x")) # y_gap.attrs["units"] = "mm"
xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS" # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
xbpm5_x.create_dataset( # x_translation.attrs["units"] = "mm"
name="description", data="Normalized difference of counts between left and right quadrants." # height = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
) # height.attrs["units"] = "mm"
xbpm5_y = xbpm5.create_group("XBPM5_y") # distance = source.create_dataset(
xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=get_entry(data, "bpm5y")) # name="distance", data=-7800 - np.asarray(self.get_entry("samz", 0))
xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS" # )
xbpm5_y.create_dataset( # distance.attrs["units"] = "mm"
name="description", data="Normalized difference of counts between high and low quadrants."
)
xbpm5_skew = xbpm5.create_group("XBPM5_skew")
xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=get_entry(data, "bpm5z"))
xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
xbpm5_skew.create_dataset(
name="description", data="Normalized difference of counts between diagonal quadrants."
)
slit_2 = instrument.create_group("slit_2") # mono = instrument.create_group("monochromator")
slit_2.attrs["NX_class"] = "NXslit" # mono.attrs["NX_class"] = "NXmonochromator"
source.create_dataset(name="material", data="Ag") # mokev = self.data.get("mokev", {})
source.create_dataset(name="description", data="Slit 2, optics hutch") # if mokev:
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl2wh")) # if isinstance(mokev, list):
x_gap.attrs["units"] = "mm" # mokev = mokev[0]
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl2wv")) # wavelength = mono.create_dataset(
y_gap.attrs["units"] = "mm" # name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl2ch")) # )
x_translation.attrs["units"] = "mm" # wavelength.attrs["units"] = "Angstrom"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl2cv")) # energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
height.attrs["units"] = "mm" # energy.attrs["units"] = "keV"
distance = source.create_dataset( # mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
name="distance", data=-3140 - np.asarray(get_entry(data, "samz", 0)) # distance = mono.create_dataset(
) # name="distance", data=-5220 - np.asarray(self.get_entry("samz", 0))
distance.attrs["units"] = "mm" # )
# distance.attrs["units"] = "mm"
slit_3 = instrument.create_group("slit_3") # crystal_1 = mono.create_group("crystal_1")
slit_3.attrs["NX_class"] = "NXslit" # crystal_1.attrs["NX_class"] = "NXcrystal"
source.create_dataset(name="material", data="Si") # crystal_1.create_dataset(name="usage", data="Bragg")
source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box") # crystal_1.create_dataset(name="order_no", data="1")
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl3wh")) # crystal_1.create_dataset(name="reflection", data="[1 1 1]")
x_gap.attrs["units"] = "mm" # bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl3wv")) # bragg_angle.attrs["units"] = "degrees"
y_gap.attrs["units"] = "mm"
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl3ch"))
x_translation.attrs["units"] = "mm"
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl3cv"))
height.attrs["units"] = "mm"
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0))
# distance.attrs["units"] = "mm"
filter_set = instrument.create_group("filter_set") # crystal_2 = mono.create_group("crystal_2")
filter_set.attrs["NX_class"] = "NXattenuator" # crystal_2.attrs["NX_class"] = "NXcrystal"
filter_set.create_dataset(name="material", data="Si") # crystal_2.create_dataset(name="usage", data="Bragg")
filter_set.create_dataset( # crystal_2.create_dataset(name="order_no", data="2")
name="description", # crystal_2.create_dataset(name="reflection", data="[1 1 1]")
data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.", # bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
) # bragg_angle.attrs["units"] = "degrees"
attenuator_transmission = filter_set.create_dataset( # bend_x = crystal_2.create_dataset(name="bend_x", data=self.get_entry("mobd"))
name="attenuator_transmission", data=10 ** get_entry(data, "ftrans", 0) # bend_x.attrs["units"] = "degrees"
)
attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
slit_4 = instrument.create_group("slit_4") # xbpm4 = instrument.create_group("XBPM4")
slit_4.attrs["NX_class"] = "NXslit" # xbpm4.attrs["NX_class"] = "NXdetector"
source.create_dataset(name="material", data="Si") # xbpm4_sum = xbpm4.create_group("XBPM4_sum")
source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box") # xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=self.get_entry("bpm4s"))
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl4wh")) # xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
x_gap.attrs["units"] = "mm" # xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl4wv")) # xbpm4_x = xbpm4.create_group("XBPM4_x")
y_gap.attrs["units"] = "mm" # xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=self.get_entry("bpm4x"))
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl4ch")) # xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
x_translation.attrs["units"] = "mm" # xbpm4_x.create_dataset(
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl4cv")) # name="description",
height.attrs["units"] = "mm" # data="Normalized difference of counts between left and right quadrants.",
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0)) # )
# distance.attrs["units"] = "mm" # xbpm4_y = xbpm4.create_group("XBPM4_y")
# xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=self.get_entry("bpm4y"))
# xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_y.create_dataset(
# name="description",
# data="Normalized difference of counts between high and low quadrants.",
# )
# xbpm4_skew = xbpm4.create_group("XBPM4_skew")
# xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=self.get_entry("bpm4z"))
# xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm4_skew.create_dataset(
# name="description", data="Normalized difference of counts between diagonal quadrants."
# )
slit_5 = instrument.create_group("slit_5") # mirror = instrument.create_group("mirror")
slit_5.attrs["NX_class"] = "NXslit" # mirror.attrs["NX_class"] = "NXmirror"
source.create_dataset(name="material", data="Si") # mirror.create_dataset(name="type", data="single")
source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box") # mirror.create_dataset(
x_gap = source.create_dataset(name="x_gap", data=get_entry(data, "sl5wh")) # name="description",
x_gap.attrs["units"] = "mm" # data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
y_gap = source.create_dataset(name="y_gap", data=get_entry(data, "sl5wv")) # )
y_gap.attrs["units"] = "mm" # incident_angle = mirror.create_dataset(name="incident_angle", data=self.get_entry("mith"))
x_translation = source.create_dataset(name="x_translation", data=get_entry(data, "sl5ch")) # incident_angle.attrs["units"] = "degrees"
x_translation.attrs["units"] = "mm" # substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
height = source.create_dataset(name="x_translation", data=get_entry(data, "sl5cv")) # substrate_material.attrs["units"] = "NX_CHAR"
height.attrs["units"] = "mm" # coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
# distance = source.create_dataset(name="distance", data=-3140 - get_entry(data, "samz", 0)) # coating_material.attrs["units"] = "NX_CHAR"
# distance.attrs["units"] = "mm" # bend_y = mirror.create_dataset(name="bend_y", data="mibd")
# bend_y.attrs["units"] = "NX_DIMENSIONLESS"
# distance = mirror.create_dataset(
# name="distance", data=-4370 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
beam_stop_1 = instrument.create_group("beam_stop_1") # xbpm5 = instrument.create_group("XBPM5")
beam_stop_1.attrs["NX_class"] = "NX_beamstop" # xbpm5.attrs["NX_class"] = "NXdetector"
beam_stop_1.create_dataset(name="description", data="circular") # xbpm5_sum = xbpm5.create_group("XBPM5_sum")
bms1_size = beam_stop_1.create_dataset(name="size", data=3) # xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=self.get_entry("bpm5s"))
bms1_size.attrs["units"] = "mm" # xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
bms1_x = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1x")) # xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
bms1_x.attrs["units"] = "mm" # xbpm5_x = xbpm5.create_group("XBPM5_x")
bms1_y = beam_stop_1.create_dataset(name="size", data=get_entry(data, "bs1y")) # xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=self.get_entry("bpm5x"))
bms1_y.attrs["units"] = "mm" # xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_x.create_dataset(
# name="description",
# data="Normalized difference of counts between left and right quadrants.",
# )
# xbpm5_y = xbpm5.create_group("XBPM5_y")
# xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=self.get_entry("bpm5y"))
# xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_y.create_dataset(
# name="description",
# data="Normalized difference of counts between high and low quadrants.",
# )
# xbpm5_skew = xbpm5.create_group("XBPM5_skew")
# xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=self.get_entry("bpm5z"))
# xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
# xbpm5_skew.create_dataset(
# name="description", data="Normalized difference of counts between diagonal quadrants."
# )
beam_stop_2 = instrument.create_group("beam_stop_2") # slit_2 = instrument.create_group("slit_2")
beam_stop_2.attrs["NX_class"] = "NX_beamstop" # slit_2.attrs["NX_class"] = "NXslit"
beam_stop_2.create_dataset(name="description", data="rectangular") # source.create_dataset(name="material", data="Ag")
bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5) # source.create_dataset(name="description", data="Slit 2, optics hutch")
bms2_size_x.attrs["units"] = "mm" # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl2wh"))
bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25) # x_gap.attrs["units"] = "mm"
bms2_size_y.attrs["units"] = "mm" # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl2wv"))
bms2_x = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2x")) # y_gap.attrs["units"] = "mm"
bms2_x.attrs["units"] = "mm" # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl2ch"))
bms2_y = beam_stop_2.create_dataset(name="size", data=get_entry(data, "bs2y")) # x_translation.attrs["units"] = "mm"
bms2_y.attrs["units"] = "mm" # height = source.create_dataset(name="x_translation", data=self.get_entry("sl2cv"))
bms2_data = beam_stop_2.create_dataset(name="data", data=get_entry(data, "diode")) # height.attrs["units"] = "mm"
bms2_data.attrs["units"] = "NX_DIMENSIONLESS" # distance = source.create_dataset(
# name="distance", data=-3140 - np.asarray(self.get_entry("samz", 0))
# )
# distance.attrs["units"] = "mm"
if "eiger1p5m" in device_manager.devices and device_manager.devices.eiger1p5m.enabled: # slit_3 = instrument.create_group("slit_3")
eiger_4 = instrument.create_group("eiger_4") # slit_3.attrs["NX_class"] = "NXslit"
eiger_4.attrs["NX_class"] = "NXdetector" # source.create_dataset(name="material", data="Si")
x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75) # source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
x_pixel_size.attrs["units"] = "um" # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl3wh"))
y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75) # x_gap.attrs["units"] = "mm"
y_pixel_size.attrs["units"] = "um" # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl3wv"))
polar_angle = eiger_4.create_dataset(name="polar_angle", data=0) # y_gap.attrs["units"] = "mm"
polar_angle.attrs["units"] = "degrees" # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl3ch"))
azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0) # x_translation.attrs["units"] = "mm"
azimuthal_angle.attrs["units"] = "degrees" # height = source.create_dataset(name="x_translation", data=self.get_entry("sl3cv"))
rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0) # height.attrs["units"] = "mm"
rotation_angle.attrs["units"] = "degrees" # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
description = eiger_4.create_dataset( # # distance.attrs["units"] = "mm"
name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
)
orientation = eiger_4.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=3)
if ( # filter_set = instrument.create_group("filter_set")
"eiger9m" in device_manager.devices # filter_set.attrs["NX_class"] = "NXattenuator"
and device_manager.devices.eiger9m.enabled # filter_set.create_dataset(name="material", data="Si")
and "eiger9m" in file_references # filter_set.create_dataset(
): # name="description",
eiger9m = instrument.create_group("eiger9m") # data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
eiger9m.attrs["NX_class"] = "NXdetector" # )
x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75) # attenuator_transmission = filter_set.create_dataset(
x_pixel_size.attrs["units"] = "um" # name="attenuator_transmission", data=10 ** self.get_entry("ftrans", 0)
y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75) # )
y_pixel_size.attrs["units"] = "um" # attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
polar_angle.attrs["units"] = "degrees"
azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = eiger9m.create_dataset(
name="description", data="Eiger9M detector, in-house developed, Paul Scherrer Institute"
)
orientation = eiger9m.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=3)
data = eiger9m.create_ext_link("data", file_references["eiger9m"]["path"], "EG9M/data")
status = eiger9m.create_ext_link(
"status", file_references["eiger9m"]["path"], "EG9M/status"
)
if ( # slit_4 = instrument.create_group("slit_4")
"pilatus_2" in device_manager.devices # slit_4.attrs["NX_class"] = "NXslit"
and device_manager.devices.pilatus_2.enabled # source.create_dataset(name="material", data="Si")
and "pilatus_2" in file_references # source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
): # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl4wh"))
pilatus_2 = instrument.create_group("pilatus_2") # x_gap.attrs["units"] = "mm"
pilatus_2.attrs["NX_class"] = "NXdetector" # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl4wv"))
x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172) # y_gap.attrs["units"] = "mm"
x_pixel_size.attrs["units"] = "um" # x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl4ch"))
y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172) # x_translation.attrs["units"] = "mm"
y_pixel_size.attrs["units"] = "um" # height = source.create_dataset(name="x_translation", data=self.get_entry("sl4cv"))
polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0) # height.attrs["units"] = "mm"
polar_angle.attrs["units"] = "degrees" # # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0) # # distance.attrs["units"] = "mm"
azimuthal_angle.attrs["units"] = "degrees"
rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
rotation_angle.attrs["units"] = "degrees"
description = pilatus_2.create_dataset(
name="description", data="Pilatus 300K detector, Dectris, Switzerland"
)
orientation = pilatus_2.create_group("orientation")
orientation.attrs["description"] = (
"Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
)
orientation.create_dataset(name="transpose", data=1)
orientation.create_dataset(name="rot90", data=2)
data = pilatus_2.create_ext_link(
"data", file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
)
if ( # slit_5 = instrument.create_group("slit_5")
"falcon" in device_manager.devices # slit_5.attrs["NX_class"] = "NXslit"
and device_manager.devices.falcon.enabled # source.create_dataset(name="material", data="Si")
and "falcon" in file_references # source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
): # x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl5wh"))
falcon = instrument.create_ext_link( # x_gap.attrs["units"] = "mm"
"falcon", file_references["falcon"]["path"], "entry/instrument/FalconX1" # y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl5wv"))
) # y_gap.attrs["units"] = "mm"
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl5ch"))
# x_translation.attrs["units"] = "mm"
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl5cv"))
# height.attrs["units"] = "mm"
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
# # distance.attrs["units"] = "mm"
return storage # beam_stop_1 = instrument.create_group("beam_stop_1")
# beam_stop_1.attrs["NX_class"] = "NX_beamstop"
# beam_stop_1.create_dataset(name="description", data="circular")
# bms1_size = beam_stop_1.create_dataset(name="size", data=3)
# bms1_size.attrs["units"] = "mm"
# bms1_x = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1x"))
# bms1_x.attrs["units"] = "mm"
# bms1_y = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1y"))
# bms1_y.attrs["units"] = "mm"
# beam_stop_2 = instrument.create_group("beam_stop_2")
# beam_stop_2.attrs["NX_class"] = "NX_beamstop"
# beam_stop_2.create_dataset(name="description", data="rectangular")
# bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
# bms2_size_x.attrs["units"] = "mm"
# bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
# bms2_size_y.attrs["units"] = "mm"
# bms2_x = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2x"))
# bms2_x.attrs["units"] = "mm"
# bms2_y = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2y"))
# bms2_y.attrs["units"] = "mm"
# bms2_data = beam_stop_2.create_dataset(name="data", data=self.get_entry("diode"))
# bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
# if (
# "eiger1p5m" in self.device_manager.devices
# and self.device_manager.devices.eiger1p5m.enabled
# ):
# eiger_4 = instrument.create_group("eiger_4")
# eiger_4.attrs["NX_class"] = "NXdetector"
# x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = eiger_4.create_dataset(
# name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
# )
# orientation = eiger_4.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=3)
# if (
# "eiger9m" in self.device_manager.devices
# and self.device_manager.devices.eiger9m.enabled
# and "eiger9m" in self.file_references
# ):
# eiger9m = instrument.create_group("eiger9m")
# eiger9m.attrs["NX_class"] = "NXdetector"
# x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = eiger9m.create_dataset(
# name="description",
# data="Eiger9M detector, in-house developed, Paul Scherrer Institute",
# )
# orientation = eiger9m.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=3)
# data = eiger9m.create_ext_link(
# "data", self.file_references["eiger9m"]["path"], "EG9M/data"
# )
# status = eiger9m.create_ext_link(
# "status", self.file_references["eiger9m"]["path"], "EG9M/status"
# )
# if (
# "pilatus_2" in self.device_manager.devices
# and self.device_manager.devices.pilatus_2.enabled
# and "pilatus_2" in self.file_references
# ):
# pilatus_2 = instrument.create_group("pilatus_2")
# pilatus_2.attrs["NX_class"] = "NXdetector"
# x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
# x_pixel_size.attrs["units"] = "um"
# y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
# y_pixel_size.attrs["units"] = "um"
# polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
# polar_angle.attrs["units"] = "degrees"
# azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
# azimuthal_angle.attrs["units"] = "degrees"
# rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
# rotation_angle.attrs["units"] = "degrees"
# description = pilatus_2.create_dataset(
# name="description", data="Pilatus 300K detector, Dectris, Switzerland"
# )
# orientation = pilatus_2.create_group("orientation")
# orientation.attrs["description"] = (
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
# )
# orientation.create_dataset(name="transpose", data=1)
# orientation.create_dataset(name="rot90", data=2)
# data = pilatus_2.create_ext_link(
# "data", self.file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
# )
# if (
# "falcon" in self.device_manager.devices
# and self.device_manager.devices.falcon.enabled
# and "falcon" in self.file_references
# ):
# falcon = instrument.create_ext_link(
# "falcon", self.file_references["falcon"]["path"], "entry/instrument/FalconX1"
# )

View File

@@ -0,0 +1,69 @@
import pytest
from bec_server.device_server.tests.utils import DMMock
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
@pytest.mark.parametrize("auto_monitor", [False, True])
def test_monitor_signal_stores_auto_monitor(auto_monitor):
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
assert signal.auto_monitor is auto_monitor
def test_monitor_signal_put_propagates_value_to_readback_callback():
signal = MonitorSignal(name="signal", auto_monitor=True)
initial_value = signal.read()[signal.name]["value"]
callback_values = []
callback_reads = []
def _test_cb(value, old_value, **kwargs):
callback_values.append((value, old_value))
callback_reads.append(kwargs["obj"].read())
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
signal.put(1)
assert callback_values == [(1, initial_value)]
assert len(callback_reads) == 1
assert callback_reads[0][signal.name]["value"] == 1
assert signal.read()[signal.name]["value"] == 1
signal.put(0)
assert callback_values == [(1, initial_value), (0, 1)]
assert len(callback_reads) == 2
assert callback_reads[1][signal.name]["value"] == 0
assert signal.read()[signal.name]["value"] == 0
@pytest.fixture
def omny_fast_shutter():
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
try:
yield shutter
finally:
shutter.destroy()
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
assert omny_fast_shutter.shutter.auto_monitor is True
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
signal_name = omny_fast_shutter.shutter.name
callback_reads = []
def _test_cb(**kwargs):
callback_reads.append(omny_fast_shutter.read())
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
omny_fast_shutter.shutter.put(1)
assert len(callback_reads) == 1
assert callback_reads[0][signal_name]["value"] == 1
assert omny_fast_shutter.read()[signal_name]["value"] == 1
assert omny_fast_shutter.fshstatus() == 1

View File

@@ -1,241 +0,0 @@
"""Module to test the pseudo_device module."""
import pytest
from bec_lib.atlas_models import Device
from ophyd_devices.sim.sim_signals import SetableSignal
from csaxs_bec.devices.pseudo_devices.bpm import BPM
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
@pytest.fixture
def patched_dm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm_control(patched_dm):
name = "bpm_control"
control_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"gain_lsb": "gain_lsb",
"gain_mid": "gain_mid",
"gain_msb": "gain_msb",
"coupling": "coupling",
"speed_mode": "speed_mode",
},
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
)
patched_dm._session["devices"].append(control_config.model_dump())
try:
control = BPMControl(
name=name,
gain_lsb="gain_lsb",
gain_mid="gain_mid",
gain_msb="gain_msb",
coupling="coupling",
speed_mode="speed_mode",
device_manager=patched_dm,
)
patched_dm.devices._add_device(control.name, control)
control.wait_for_connection()
yield control
finally:
control.destroy()
def test_bpm_control_set_gain(bpm_control):
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
gain_mid = bpm_control.device_manager.devices["gain_mid"]
gain_msb = bpm_control.device_manager.devices["gain_msb"]
coupling = bpm_control.device_manager.devices["coupling"]
speed_mode = bpm_control.device_manager.devices["speed_mode"]
gain_lsb.put(0)
gain_mid.put(0)
gain_msb.put(0)
coupling.put(0)
speed_mode.put(1)
gain = bpm_control.gain.get()
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
gain_val = 10000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
gain_val = 100000000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
with pytest.raises(ValueError):
bpm_control.set_gain(1005.0)
def test_bpm_control_set_coupling(bpm_control):
coupling = bpm_control.device_manager.devices["coupling"]
coupling.put(0)
bpm_control.coupling.get() == "AC"
coupling.put(1)
bpm_control.coupling.get() == "DC"
bpm_control.set_coupling("AC")
assert coupling.get() == 0
with pytest.raises(ValueError):
bpm_control.set_coupling("wrong")
@pytest.fixture
def patched_dm_bpm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
left_top = SetableSignal(name="left_top", value=0, kind="config")
right_top = SetableSignal(name="right_top", value=0, kind="config")
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
for signal in [left_top, right_top, right_bot, left_bot]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm(patched_dm_bpm):
name = "bpm"
bpm_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"left_top": "left_top",
"right_top": "right_top",
"right_bot": "right_bot",
"left_bot": "left_bot",
},
needs=["left_top", "right_top", "right_bot", "left_bot"],
)
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
try:
bpm = BPM(
name=name,
left_top="left_top",
right_top="right_top",
right_bot="right_bot",
left_bot="left_bot",
device_manager=patched_dm_bpm,
)
patched_dm_bpm.devices._add_device(bpm.name, bpm)
bpm.wait_for_connection()
yield bpm
finally:
bpm.destroy()
def test_bpm_positions(bpm):
left_top = bpm.device_manager.devices["left_top"]
right_top = bpm.device_manager.devices["right_top"]
right_bot = bpm.device_manager.devices["right_bot"]
left_bot = bpm.device_manager.devices["left_bot"]
# Test center position
for signal in [left_top, right_top, right_bot, left_bot]:
signal.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
# Test fully left
left_top.put(1)
right_top.put(0)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == -1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
assert bpm.intensity.get() == 2
# Test fully right
left_top.put(0)
right_top.put(1)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
# Test fully top
left_top.put(1)
right_top.put(1)
right_bot.put(0)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 1
assert bpm.diagonal.get() == 0
# Test fully bottom
left_top.put(0)
right_top.put(0)
right_bot.put(1)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == -1
assert bpm.diagonal.get() == 0
# Diagonal beam
left_top.put(1)
right_top.put(0)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == -1
left_top.put(0)
right_top.put(1)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 1