Compare commits
20 Commits
feat-csaxs
...
config_and
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ac23be094e | ||
|
|
10cc820b2c | ||
|
|
acc3fb0104 | ||
| 6a704c6dd0 | |||
| 2e014bd9ea | |||
|
|
006a451220 | ||
|
|
bdc996d3b2 | ||
|
|
2fac8bc1d7 | ||
|
|
bf045dadf1 | ||
|
|
be508cf300 | ||
|
|
f786e34a0e | ||
|
|
cceedc947a
|
||
|
|
80de9724d4
|
||
|
|
2ac02e0623
|
||
|
|
3c2a0aa484
|
||
|
|
27f4eca4ae
|
||
|
|
f2771bd4b6
|
||
|
|
546ebf8a58
|
||
|
d3f1d31bb8
|
|||
|
6d404cad12
|
@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
|
||||
"rio_device": "galilrioesxbox",
|
||||
"description": "Beam Position Monitor 4 current amplifier",
|
||||
"channels": {
|
||||
"gain_lsb": 0, # Pin 10 -> Galil ch0
|
||||
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0
|
||||
"gain_mid": 1, # Pin 11 -> Galil ch1
|
||||
"gain_msb": 2, # Pin 12 -> Galil ch2
|
||||
"coupling": 3, # Pin 13 -> Galil ch3
|
||||
|
||||
@@ -1317,7 +1317,9 @@ class Flomni(
|
||||
self._webpage_gen = FlomniWebpageGenerator(
|
||||
bec_client=client,
|
||||
output_dir="~/data/raw/webpage/",
|
||||
upload_url="http://s1090968537.online.de/upload.php", # optional
|
||||
#upload_url="http://s1090968537.online.de/upload.php", # optional
|
||||
upload_url="https://v1p0zyg2w9n2k9c1.myfritz.net/upload.php",
|
||||
local_port=8080
|
||||
)
|
||||
self._webpage_gen.start()
|
||||
|
||||
@@ -1827,20 +1829,20 @@ class Flomni(
|
||||
or (self.tomo_type == 3 and projection_number == None)
|
||||
):
|
||||
|
||||
# pylint: disable=undefined-variable
|
||||
# if bec.active_account != "":
|
||||
# self.tomo_id = self.add_sample_database(
|
||||
# self.sample_name,
|
||||
# str(datetime.date.today()),
|
||||
# bec.active_account,
|
||||
# bec.queue.next_scan_number,
|
||||
# "flomni",
|
||||
# "test additional info",
|
||||
# "BEC",
|
||||
# )
|
||||
# self.write_pdf_report()
|
||||
# else:
|
||||
self.tomo_id = 0
|
||||
#pylint: disable=undefined-variable
|
||||
if bec.active_account != "":
|
||||
self.tomo_id = self.add_sample_database(
|
||||
self.sample_name,
|
||||
str(datetime.date.today()),
|
||||
bec.active_account,
|
||||
bec.queue.next_scan_number,
|
||||
"flomni",
|
||||
"test additional info",
|
||||
"BEC",
|
||||
)
|
||||
self.write_pdf_report()
|
||||
else:
|
||||
self.tomo_id = 0
|
||||
self.write_pdf_report()
|
||||
self.progress["tomo_start_time"] = datetime.datetime.now().isoformat()
|
||||
|
||||
|
||||
@@ -5,10 +5,14 @@ Background thread that reads tomo progress from the BEC global variable store
|
||||
and writes status.json (every cycle) + status.html (once at startup) to a
|
||||
staging directory. An optional HttpUploader sends those files to a web host
|
||||
after every cycle, running in a separate daemon thread so uploads never block
|
||||
the generator cycle.
|
||||
the generator cycle. A built-in LocalHttpServer always serves the output
|
||||
directory locally (default port 8080) so the page can be accessed on the
|
||||
lab network without any extra setup.
|
||||
|
||||
Architecture
|
||||
------------
|
||||
LocalHttpServer -- built-in HTTP server; serves output_dir on port 8080.
|
||||
Always started at _launch(); URL printed to console.
|
||||
HttpUploader -- non-blocking HTTP uploader (fire-and-forget thread).
|
||||
Tracks file mtimes; only uploads changed files.
|
||||
Sends a cleanup request to the server when the
|
||||
@@ -31,19 +35,24 @@ Integration (inside Flomni.__init__, after self._progress_proxy.reset()):
|
||||
bec_client=client,
|
||||
output_dir="~/data/raw/webpage/",
|
||||
upload_url="http://omny.online/upload.php", # optional
|
||||
local_port=8080, # optional, default 8080
|
||||
)
|
||||
self._webpage_gen.start()
|
||||
# On start(), the console prints:
|
||||
# ➜ Status page: http://hostname:8080/status.html
|
||||
|
||||
Interactive helpers (optional, in the iPython session):
|
||||
-------------------------------------------------------
|
||||
flomni._webpage_gen.status() # print current status
|
||||
flomni._webpage_gen.status() # print current status + local URL
|
||||
flomni._webpage_gen.verbosity = 2 # VERBOSE: one-line summary per cycle
|
||||
flomni._webpage_gen.verbosity = 3 # DEBUG: full JSON per cycle
|
||||
flomni._webpage_gen.stop() # release lock
|
||||
flomni._webpage_gen.stop() # release lock, stop local server
|
||||
flomni._webpage_gen.start() # restart after stop()
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import http.server
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
@@ -235,6 +244,16 @@ class HttpUploader:
|
||||
self._uploaded: dict[str, float] = {} # abs path -> mtime at last upload
|
||||
self._lock = threading.Lock()
|
||||
self._busy = False # True while an upload thread is running
|
||||
self._warn_at: dict[str, float] = {} # key -> epoch of last warning
|
||||
|
||||
_WARN_COOLDOWN_S = 600 # only repeat the same warning once per minute
|
||||
|
||||
def _warn(self, key: str, msg: str) -> None:
|
||||
"""Log a warning at most once per _WARN_COOLDOWN_S for a given key."""
|
||||
now = _epoch()
|
||||
if now - self._warn_at.get(key, 0) >= self._WARN_COOLDOWN_S:
|
||||
self._warn_at[key] = now
|
||||
logger.warning(msg)
|
||||
|
||||
# ── Public API ──────────────────────────────────────────────────────────
|
||||
|
||||
@@ -294,8 +313,10 @@ class HttpUploader:
|
||||
def _upload_files(self, files: list, force: bool = False) -> None:
|
||||
try:
|
||||
import requests as _requests
|
||||
import urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
except ImportError:
|
||||
logger.warning("HttpUploader: 'requests' library not installed")
|
||||
self._warn("no_requests", "HttpUploader: 'requests' library not installed")
|
||||
return
|
||||
|
||||
for path in files:
|
||||
@@ -312,21 +333,26 @@ class HttpUploader:
|
||||
self._url,
|
||||
files={"file": (path.name, f)},
|
||||
timeout=self._timeout,
|
||||
verify=False, # accept self-signed / untrusted certs
|
||||
)
|
||||
if r.status_code == 200:
|
||||
self._uploaded[str(path)] = mtime
|
||||
self._warn_at.pop(f"upload_{path.name}", None) # clear on success
|
||||
logger.debug(f"HttpUploader: OK {path.name}")
|
||||
else:
|
||||
logger.warning(
|
||||
self._warn(
|
||||
f"upload_{path.name}",
|
||||
f"HttpUploader: {path.name} -> HTTP {r.status_code}: "
|
||||
f"{r.text[:120]}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(f"HttpUploader: {path.name} failed: {exc}")
|
||||
self._warn(f"upload_{path.name}", f"HttpUploader: {path.name} failed: {exc}")
|
||||
|
||||
def _do_cleanup(self) -> None:
|
||||
try:
|
||||
import requests as _requests
|
||||
import urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
except ImportError:
|
||||
return
|
||||
try:
|
||||
@@ -334,15 +360,89 @@ class HttpUploader:
|
||||
self._url,
|
||||
data={"action": "cleanup"},
|
||||
timeout=self._timeout,
|
||||
verify=False, # accept self-signed / untrusted certs
|
||||
)
|
||||
logger.info(f"HttpUploader cleanup: {r.text[:120]}")
|
||||
self._warn_at.pop("cleanup", None) # clear on success
|
||||
# Forget mtime records for ptycho files so they get re-uploaded
|
||||
with self._lock:
|
||||
to_remove = [k for k in self._uploaded if "/S" in k or "\\S" in k]
|
||||
for k in to_remove:
|
||||
self._uploaded.pop(k, None)
|
||||
except Exception as exc:
|
||||
logger.warning(f"HttpUploader cleanup failed: {exc}")
|
||||
self._warn("cleanup", f"HttpUploader cleanup failed: {exc}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Local HTTP server (serves output_dir over http://hostname:port/)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class LocalHttpServer:
|
||||
"""
|
||||
Serves the generator's output directory over plain HTTP in a daemon thread.
|
||||
|
||||
Uses Python's built-in http.server — no extra dependencies.
|
||||
Request logging is suppressed so the BEC console stays clean.
|
||||
The server survives stop()/start() cycles: _launch() creates a fresh
|
||||
instance each time start() is called.
|
||||
|
||||
Usage:
|
||||
srv = LocalHttpServer(output_dir, port=8080)
|
||||
srv.start()
|
||||
print(srv.url) # http://hostname:8080/status.html
|
||||
srv.stop()
|
||||
"""
|
||||
|
||||
def __init__(self, directory: Path, port: int = 8080):
|
||||
self._directory = Path(directory)
|
||||
self._port = port
|
||||
self._server = None
|
||||
self._thread = None
|
||||
|
||||
# ── silence the per-request log lines in the iPython console ──────────
|
||||
class _QuietHandler(http.server.SimpleHTTPRequestHandler):
|
||||
def log_message(self, *args):
|
||||
pass
|
||||
# handle_error here does nothing — wrong class
|
||||
|
||||
class _QuietHTTPServer(http.server.HTTPServer):
|
||||
def handle_error(self, request, client_address):
|
||||
pass # suppress BrokenPipeError and all other per-connection noise
|
||||
|
||||
def start(self) -> None:
|
||||
Handler = functools.partial(
|
||||
self._QuietHandler,
|
||||
directory=str(self._directory),
|
||||
)
|
||||
try:
|
||||
self._server = self._QuietHTTPServer(("", self._port), Handler)
|
||||
except OSError as exc:
|
||||
raise RuntimeError(
|
||||
f"LocalHttpServer: cannot bind port {self._port}: {exc}"
|
||||
) from excs
|
||||
self._thread = threading.Thread(
|
||||
target=self._server.serve_forever,
|
||||
name="LocalHttpServer",
|
||||
daemon=True,
|
||||
)
|
||||
self._thread.start()
|
||||
|
||||
def stop(self) -> None:
|
||||
if self._server is not None:
|
||||
self._server.shutdown() # blocks until serve_forever() returns
|
||||
self._server = None
|
||||
|
||||
def is_alive(self) -> bool:
|
||||
return self._thread is not None and self._thread.is_alive()
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
return self._port
|
||||
|
||||
@property
|
||||
def url(self) -> str:
|
||||
"""Best-guess URL for printing. Uses the machine's hostname."""
|
||||
return f"http://{socket.gethostname()}:{self._port}/status.html"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -363,12 +463,15 @@ class WebpageGeneratorBase:
|
||||
cycle_interval: float = _CYCLE_INTERVAL_S,
|
||||
verbosity: int = VERBOSITY_NORMAL,
|
||||
upload_url: str = None,
|
||||
local_port: int = 8080,
|
||||
):
|
||||
self._bec = bec_client
|
||||
self._output_dir = Path(output_dir).expanduser().resolve()
|
||||
self._cycle_interval = cycle_interval
|
||||
self._verbosity = verbosity
|
||||
self._uploader = HttpUploader(upload_url) if upload_url else None
|
||||
self._local_port = local_port
|
||||
self._local_server = None # created fresh each _launch()
|
||||
|
||||
self._thread = None
|
||||
self._stop_event = threading.Event()
|
||||
@@ -418,6 +521,17 @@ class WebpageGeneratorBase:
|
||||
self._copy_logo()
|
||||
(self._output_dir / "status.html").write_text(_render_html(_PHONE_NUMBERS))
|
||||
|
||||
# Start local HTTP server (always on; a fresh instance per _launch).
|
||||
if self._local_server is not None and self._local_server.is_alive():
|
||||
self._local_server.stop()
|
||||
self._local_server = LocalHttpServer(self._output_dir, self._local_port)
|
||||
try:
|
||||
self._local_server.start()
|
||||
local_url_msg = f" local={self._local_server.url}"
|
||||
except RuntimeError as exc:
|
||||
local_url_msg = f" local=ERROR({exc})"
|
||||
self._log(VERBOSITY_NORMAL, str(exc), level="warning")
|
||||
|
||||
# Upload static files (html + logo) once at startup
|
||||
if self._uploader is not None:
|
||||
self._uploader.upload_dir_async(self._output_dir)
|
||||
@@ -430,13 +544,20 @@ class WebpageGeneratorBase:
|
||||
self._log(VERBOSITY_NORMAL,
|
||||
f"WebpageGenerator started owner={self._owner_id} "
|
||||
f"output={self._output_dir} interval={self._cycle_interval}s"
|
||||
+ (f" upload={self._uploader._url}" if self._uploader else " upload=disabled"))
|
||||
+ (f" upload={self._uploader._url}" if self._uploader else " upload=disabled")
|
||||
+ f"\n ➜ Status page:{local_url_msg}")
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Stop the generator thread and release the singleton lock."""
|
||||
"""Stop the generator thread, local HTTP server, and release the singleton lock."""
|
||||
self._stop_event.set()
|
||||
if self._thread is not None:
|
||||
self._thread.join(timeout=self._cycle_interval + 5)
|
||||
# Always clear the reference so start() gets a clean slate,
|
||||
# even if the thread did not exit within the join timeout.
|
||||
self._thread = None
|
||||
if self._local_server is not None:
|
||||
self._local_server.stop()
|
||||
self._local_server = None
|
||||
self._release_lock()
|
||||
self._log(VERBOSITY_NORMAL, "WebpageGenerator stopped.")
|
||||
|
||||
@@ -453,12 +574,14 @@ class WebpageGeneratorBase:
|
||||
"""Print a human-readable status summary to the console."""
|
||||
lock = self._read_lock()
|
||||
running = self._thread is not None and self._thread.is_alive()
|
||||
local = self._local_server.url if (self._local_server and self._local_server.is_alive()) else "stopped"
|
||||
print(
|
||||
f"WebpageGenerator\n"
|
||||
f" This session running : {running}\n"
|
||||
f" Lock owner : {lock.get('owner_id', 'none')}\n"
|
||||
f" Lock heartbeat : {lock.get('heartbeat', 'never')}\n"
|
||||
f" Output dir : {self._output_dir}\n"
|
||||
f" Local URL : {local}\n"
|
||||
f" Cycle interval : {self._cycle_interval}s\n"
|
||||
f" Upload URL : {self._uploader._url if self._uploader else 'disabled'}\n"
|
||||
f" Verbosity : {self._verbosity}\n"
|
||||
@@ -1245,21 +1368,6 @@ def _render_html(phone_numbers: list) -> str:
|
||||
}}
|
||||
.info-item .value {{ font-size: 0.9rem; font-weight: 600; color: var(--text); }}
|
||||
|
||||
.bar-wrap {{ grid-column: 1 / -1; display: flex; flex-direction: column; gap: 0.35rem; }}
|
||||
.bar-label {{
|
||||
display: flex; justify-content: space-between;
|
||||
font-family: var(--mono); font-size: 0.6rem; color: var(--text-dim);
|
||||
letter-spacing: 0.06em; text-transform: uppercase;
|
||||
}}
|
||||
.bar-track {{ height: 5px; background: var(--surface2); border-radius: 99px; overflow: hidden; }}
|
||||
.bar-fill {{
|
||||
height: 100%;
|
||||
background: var(--ring-blend);
|
||||
background: color-mix(in srgb, var(--status-color) 65%, var(--surface2));
|
||||
border-radius: 99px;
|
||||
transition: width 0.8s cubic-bezier(.4,0,.2,1), background 0.6s;
|
||||
}}
|
||||
|
||||
/* ── Recon card ── */
|
||||
.recon-stats {{ display: flex; gap: 2rem; flex-wrap: wrap; }}
|
||||
.recon-stat {{ display: flex; flex-direction: column; gap: 0.15rem; }}
|
||||
@@ -1502,10 +1610,7 @@ def _render_html(phone_numbers: list) -> str:
|
||||
<div class="info-item"><span class="label">ETA</span><span class="value" id="pi-eta">-</span></div>
|
||||
<div class="info-item"><span class="label">Started</span><span class="value" id="pi-start">-</span></div>
|
||||
</div>
|
||||
<div class="bar-wrap">
|
||||
<div class="bar-label"><span>Sub-tomo progress</span><span id="bar-sub-label">-</span></div>
|
||||
<div class="bar-track"><div class="bar-fill" id="bar-sub-fill" style="width:0%"></div></div>
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
|
||||
@@ -1667,90 +1772,80 @@ function initDrag() {{
|
||||
}}
|
||||
initDrag();
|
||||
|
||||
|
||||
// ── Audio ─────────────────────────────────────────────────────────────────
|
||||
// iOS (all browsers on iPhone use WebKit) requires:
|
||||
// 1. AudioContext created inside a user gesture.
|
||||
// 2. A real (even silent) BufferSource started synchronously in the gesture.
|
||||
// 3. ctx.resume() awaited before scheduling audible nodes.
|
||||
// We combine all three: silent unlock buffer + resume promise + .then(beeps).
|
||||
// iOS (all browsers on iPhone use WebKit) strict rules:
|
||||
// 1. AudioContext must be created inside a user gesture handler.
|
||||
// 2. A real BufferSource must be started SYNCHRONOUSLY in the gesture —
|
||||
// .then() / microtasks run outside the gesture and are rejected.
|
||||
// 3. ctx.resume() is called fire-and-forget; beeps are delayed 80ms by
|
||||
// setTimeout so the engine has time to start before nodes are scheduled.
|
||||
//
|
||||
// unlockAudio() handles all of this and must be called at the TOP of any
|
||||
// onclick handler that wants audio — before any other logic.
|
||||
|
||||
let audioCtx = null, audioEnabled = false;
|
||||
let audioArmed = false, warningActive = false, warningTimer = null, lastStatus = null;
|
||||
let staleActive = false, staleTimer = null, staleConfirmed = false;
|
||||
let audioCtx=null, audioEnabled=false;
|
||||
let audioArmed=false, warningActive=false, warningTimer=null, lastStatus=null;
|
||||
let staleActive=false, staleTimer=null, staleConfirmed=false;
|
||||
|
||||
function getCtx() {{
|
||||
if (!audioCtx) audioCtx = new (window.AudioContext || window.webkitAudioContext)();
|
||||
function getCtx(){{
|
||||
if(!audioCtx) audioCtx=new(window.AudioContext||window.webkitAudioContext)();
|
||||
return audioCtx;
|
||||
}}
|
||||
|
||||
function unlockAudio() {{
|
||||
// Must be called synchronously inside a user gesture handler.
|
||||
// Plays a 1-sample silent buffer — the most reliable iOS unlock method.
|
||||
const ctx = getCtx();
|
||||
const buf = ctx.createBuffer(1, 1, ctx.sampleRate);
|
||||
const src = ctx.createBufferSource();
|
||||
src.buffer = buf;
|
||||
src.connect(ctx.destination);
|
||||
src.start(0);
|
||||
// resume() is async; return the promise so callers can chain.
|
||||
return ctx.state === 'suspended' ? ctx.resume() : Promise.resolve();
|
||||
function unlockAudio(){{
|
||||
// Synchronous silent 1-sample buffer — the only reliable iOS unlock.
|
||||
// Must be called synchronously at the start of a user gesture handler.
|
||||
const ctx=getCtx();
|
||||
const buf=ctx.createBuffer(1,1,ctx.sampleRate);
|
||||
const src=ctx.createBufferSource();
|
||||
src.buffer=buf; src.connect(ctx.destination); src.start(0);
|
||||
if(ctx.state==='suspended') ctx.resume(); // fire-and-forget
|
||||
}}
|
||||
|
||||
function beep(freq, dur, vol) {{
|
||||
try {{
|
||||
const ctx = getCtx();
|
||||
const o = ctx.createOscillator();
|
||||
const g = ctx.createGain();
|
||||
function beep(freq,dur,vol){{
|
||||
try{{
|
||||
const ctx=getCtx(),o=ctx.createOscillator(),g=ctx.createGain();
|
||||
o.connect(g); g.connect(ctx.destination);
|
||||
o.type = 'sine';
|
||||
o.frequency.setValueAtTime(freq, ctx.currentTime);
|
||||
g.gain.setValueAtTime(vol, ctx.currentTime);
|
||||
g.gain.exponentialRampToValueAtTime(0.001, ctx.currentTime + dur);
|
||||
o.start(); o.stop(ctx.currentTime + dur);
|
||||
}} catch(e) {{ console.warn('Audio beep:', e); }}
|
||||
o.type='sine'; o.frequency.setValueAtTime(freq,ctx.currentTime);
|
||||
g.gain.setValueAtTime(vol,ctx.currentTime);
|
||||
g.gain.exponentialRampToValueAtTime(0.001,ctx.currentTime+dur);
|
||||
o.start(); o.stop(ctx.currentTime+dur);
|
||||
}}catch(e){{console.warn('Audio:',e);}}
|
||||
}}
|
||||
|
||||
function warningChime() {{
|
||||
beep(660, 0.3, 0.4);
|
||||
setTimeout(() => beep(440, 0.5, 0.4), 350);
|
||||
function warningChime(){{
|
||||
beep(660,0.3,0.4); setTimeout(()=>beep(440,0.5,0.4),350);
|
||||
}}
|
||||
function staleChime(){{
|
||||
beep(1200,0.12,0.35);
|
||||
setTimeout(()=>beep(1200,0.12,0.35),180);
|
||||
setTimeout(()=>beep(1200,0.25,0.35),360);
|
||||
}}
|
||||
|
||||
function staleChime() {{
|
||||
beep(1200, 0.12, 0.35);
|
||||
setTimeout(() => beep(1200, 0.12, 0.35), 180);
|
||||
setTimeout(() => beep(1200, 0.25, 0.35), 360);
|
||||
function testSound(){{
|
||||
// Gesture handler — unlock first, then delay beeps 80ms for resume().
|
||||
unlockAudio();
|
||||
setTimeout(()=>beep(880, 0.15,0.4), 80);
|
||||
setTimeout(()=>beep(1100,0.15,0.4),260);
|
||||
setTimeout(()=>beep(880, 0.3, 0.4),440);
|
||||
}}
|
||||
|
||||
function testSound() {{
|
||||
// Called directly from onclick — gesture is active here.
|
||||
unlockAudio().then(() => {{
|
||||
beep(880, 0.15, 0.4);
|
||||
setTimeout(() => beep(1100, 0.15, 0.4), 180);
|
||||
setTimeout(() => beep(880, 0.3, 0.4), 360);
|
||||
}});
|
||||
function toggleAudio(){{
|
||||
// Gesture handler — unlock first (synchronous), then do logic.
|
||||
unlockAudio();
|
||||
audioEnabled=!audioEnabled;
|
||||
localStorage.setItem('audioEnabled',audioEnabled);
|
||||
if(!audioEnabled){{
|
||||
stopWarning(); audioArmed=false; warningActive=false;
|
||||
document.getElementById('btn-confirm').style.display='none';
|
||||
stopStaleWarning(); staleActive=false; staleConfirmed=false;
|
||||
document.getElementById('btn-confirm-stale').style.display='none';
|
||||
}} else {{
|
||||
if(lastStatus==='scanning' && !audioArmed) audioArmed=true;
|
||||
}}
|
||||
updateAudioUI();
|
||||
}}
|
||||
|
||||
function toggleAudio() {{
|
||||
// Called directly from onclick — gesture is active here.
|
||||
unlockAudio().then(() => {{
|
||||
audioEnabled = !audioEnabled;
|
||||
localStorage.setItem('audioEnabled', audioEnabled);
|
||||
if (!audioEnabled) {{
|
||||
stopWarning();
|
||||
audioArmed = false; warningActive = false;
|
||||
document.getElementById('btn-confirm').style.display = 'none';
|
||||
stopStaleWarning();
|
||||
staleActive = false; staleConfirmed = false;
|
||||
document.getElementById('btn-confirm-stale').style.display = 'none';
|
||||
}} else {{
|
||||
if (lastStatus === 'scanning' && !audioArmed) audioArmed = true;
|
||||
}}
|
||||
updateAudioUI();
|
||||
}});
|
||||
}}
|
||||
|
||||
|
||||
function confirmWarning(){{
|
||||
stopWarning();
|
||||
warningActive=false;
|
||||
@@ -1759,9 +1854,10 @@ function confirmWarning(){{
|
||||
}}
|
||||
|
||||
function startWarning(){{
|
||||
// Not a gesture handler — context already unlocked by Enable button click.
|
||||
if(warningActive) return;
|
||||
warningActive=true;
|
||||
if(audioEnabled) warningChime(); // returns promise; chime plays after unlock
|
||||
if(audioEnabled) warningChime();
|
||||
warningTimer=setInterval(()=>{{ if(audioEnabled) warningChime(); }},30000);
|
||||
document.getElementById('btn-confirm').style.display='inline-block';
|
||||
updateAudioUI();
|
||||
@@ -1780,9 +1876,10 @@ function confirmStale(){{
|
||||
}}
|
||||
|
||||
function startStaleWarning(){{
|
||||
// Not a gesture handler — context already unlocked by Enable button click.
|
||||
if(staleActive || staleConfirmed) return;
|
||||
staleActive=true;
|
||||
if(audioEnabled) staleChime(); // returns promise; chime plays after unlock
|
||||
if(audioEnabled) staleChime();
|
||||
staleTimer=setInterval(()=>{{ if(audioEnabled) staleChime(); }},30000);
|
||||
document.getElementById('btn-confirm-stale').style.display='inline-block';
|
||||
updateAudioUI();
|
||||
@@ -1964,8 +2061,7 @@ function render(d){{
|
||||
document.getElementById('pi-type').textContent=p.tomo_type||'-';
|
||||
document.getElementById('pi-eta').textContent=p.estimated_remaining_human||'-';
|
||||
document.getElementById('pi-start').textContent=fmtTime(p.tomo_start_time);
|
||||
document.getElementById('bar-sub-label').textContent=(p.subtomo_projection||0)+' / '+(p.subtomo_total_projections||0);
|
||||
document.getElementById('bar-sub-fill').style.width=(sPct*100).toFixed(1)+'%';
|
||||
|
||||
if(d.recon){{
|
||||
document.getElementById('recon-waiting').textContent=d.recon.waiting;
|
||||
const fv=document.getElementById('recon-failed');
|
||||
@@ -1990,7 +2086,7 @@ function render(d){{
|
||||
|
||||
async function poll(){{
|
||||
try{{
|
||||
const r=await fetch(STATUS_JSON+'?t='+Date.now());
|
||||
const r=await fetch(STATUS_JSON, {{cache:'no-store'}});
|
||||
if(!r.ok) throw new Error('HTTP '+r.status);
|
||||
render(await r.json());
|
||||
}}catch(e){{
|
||||
@@ -2011,4 +2107,4 @@ poll();
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
"""
|
||||
@@ -234,9 +234,10 @@ class TomoIDManager:
|
||||
)
|
||||
"""
|
||||
|
||||
OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
|
||||
OMNY_USER = "omny"
|
||||
OMNY_PASSWORD = "samples"
|
||||
#OMNY_URL = "https://omny.web.psi.ch/samples/newmeasurement.php"
|
||||
OMNY_URL = "https://v1p0zyg2w9n2k9c1.myfritz.net/samples/newmeasurement.php"
|
||||
OMNY_USER = ""
|
||||
OMNY_PASSWORD = ""
|
||||
TMP_FILE = "/tmp/currsamplesnr.txt"
|
||||
|
||||
def register(
|
||||
@@ -273,9 +274,14 @@ class TomoIDManager:
|
||||
f"&additional={additional_info}"
|
||||
f"&user={user}"
|
||||
)
|
||||
# subprocess.run(
|
||||
# f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
|
||||
# f" -q -O {self.TMP_FILE} '{url}'",
|
||||
# shell=True,
|
||||
# )
|
||||
#print(url)
|
||||
subprocess.run(
|
||||
f"wget --user={self.OMNY_USER} --password={self.OMNY_PASSWORD}"
|
||||
f" -q -O {self.TMP_FILE} '{url}'",
|
||||
f"wget -q -O {self.TMP_FILE} '{url}'",
|
||||
shell=True,
|
||||
)
|
||||
with open(self.TMP_FILE) as f:
|
||||
|
||||
@@ -544,6 +544,66 @@ sl5trxt:
|
||||
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
|
||||
bl_smar_stage: 5
|
||||
|
||||
sl5ch:
|
||||
description: ESbox1 slit 5 center horizontal
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
|
||||
deviceConfig:
|
||||
left_slit: sl5trxi
|
||||
right_slit: sl5trxo
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxi
|
||||
- sl5trxo
|
||||
|
||||
sl5wh:
|
||||
description: ESbox1 slit 5 width horizontal
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
|
||||
deviceConfig:
|
||||
left_slit: sl5trxi
|
||||
right_slit: sl5trxo
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxi
|
||||
- sl5trxo
|
||||
|
||||
sl5cv:
|
||||
description: ESbox1 slit 5 center vertical
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
|
||||
deviceConfig:
|
||||
left_slit: sl5trxb
|
||||
right_slit: sl5trxt
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxb
|
||||
- sl5trxt
|
||||
|
||||
sl5wv:
|
||||
description: ESbox1 slit 5 width vertical
|
||||
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
|
||||
deviceConfig:
|
||||
left_slit: sl5trxb
|
||||
right_slit: sl5trxt
|
||||
offset: 0
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
needs:
|
||||
- sl5trxb
|
||||
- sl5trxt
|
||||
|
||||
xbimtrx:
|
||||
description: ESbox2 beam intensity monitor x movement
|
||||
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
|
||||
@@ -822,4 +882,62 @@ dettrx:
|
||||
onFailure: retry
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
softwareTrigger: false
|
||||
softwareTrigger: false
|
||||
|
||||
|
||||
####################
|
||||
### Beamstop diode control for flight tube
|
||||
####################
|
||||
|
||||
beamstop_gain_control:
|
||||
description: Gain control for beamstop flightube
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
|
||||
deviceConfig:
|
||||
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
|
||||
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
|
||||
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
|
||||
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
|
||||
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
onFailure: retry
|
||||
needs:
|
||||
- galilrioesft
|
||||
|
||||
galilrioesft:
|
||||
description: Galil RIO for remote gain switching and slow reading FlightTube
|
||||
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
|
||||
deviceConfig:
|
||||
host: galilrioesft.psi.ch
|
||||
enabled: true
|
||||
onFailure: retry
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
|
||||
beamstop_dummy_bpm:
|
||||
description: BPM Xbox 2 (First Xbox in ES hutch)
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
|
||||
deviceConfig:
|
||||
left_top: galilrioesft.analog_in.ch0
|
||||
right_top: galilrioesft.analog_in.ch1
|
||||
right_bot: galilrioesft.analog_in.ch2
|
||||
left_bot: galilrioesft.analog_in.ch3
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
onFailure: retry
|
||||
needs:
|
||||
- galilrioesft
|
||||
|
||||
beamstop_intensity:
|
||||
description: Beamstop intensity from Galil analog input ch6
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.signal_forwarder.SignalForwarder
|
||||
deviceConfig:
|
||||
signal: galilrioesft.analog_in.ch6
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
onFailure: retry
|
||||
needs:
|
||||
- galilrioesft
|
||||
|
||||
|
||||
|
||||
@@ -199,6 +199,25 @@ xbpm1c4:
|
||||
readOnly: true
|
||||
softwareTrigger: false
|
||||
|
||||
bpm1:
|
||||
description: 'XBPM1 (frontend)'
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
|
||||
deviceConfig:
|
||||
left_top: xbpm1c1
|
||||
right_top: xbpm1c2
|
||||
right_bot: xbpm1c3
|
||||
left_bot: xbpm1c4
|
||||
onFailure: raise
|
||||
enabled: true
|
||||
readoutPriority: monitored
|
||||
readOnly: true
|
||||
softwareTrigger: false
|
||||
needs:
|
||||
- xbpm1c1
|
||||
- xbpm1c2
|
||||
- xbpm1c3
|
||||
- xbpm1c4
|
||||
|
||||
############################################
|
||||
######### End of xbpm sub devices ##########
|
||||
############################################
|
||||
|
||||
@@ -68,18 +68,22 @@ ccmx:
|
||||
- cSAXS
|
||||
- optics
|
||||
|
||||
# ccm_energy:
|
||||
# readoutPriority: baseline
|
||||
# deviceClass: ophyd_devices.devices.simple_positioner.PSIPositionerBase
|
||||
# prefix: "X12SA-OP-CCM1:"
|
||||
# override_suffixes:
|
||||
# user_readback: "ENERGY-GET"
|
||||
# user_setpoint: "ENERGY-SET"
|
||||
# velocity: "ROTY:VELO"
|
||||
# deviceTags:
|
||||
# - user motors
|
||||
# enabled: true
|
||||
# readOnly: false
|
||||
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS!
|
||||
ccm_energy:
|
||||
description: 'test'
|
||||
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
|
||||
deviceConfig:
|
||||
prefix: 'X12SA-OP-CCM1:'
|
||||
override_suffixes:
|
||||
user_readback: "ENERGY-GET"
|
||||
user_setpoint: "ENERGY-SET"
|
||||
velocity: "ROTY.VELO"
|
||||
motor_done_move: "ROTY.DMOV"
|
||||
onFailure: buffer
|
||||
enabled: true
|
||||
readoutPriority: baseline
|
||||
readOnly: false
|
||||
softwareTrigger: false
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
# This is the main configuration file that is
|
||||
# commented or uncommented according to the type of experiment
|
||||
|
||||
# optics:
|
||||
# - !include ./bl_optics_hutch.yaml
|
||||
optics:
|
||||
- !include ./bl_optics_hutch.yaml
|
||||
|
||||
# frontend:
|
||||
# - !include ./bl_frontend.yaml
|
||||
@@ -10,8 +10,8 @@
|
||||
endstation:
|
||||
- !include ./bl_endstation.yaml
|
||||
|
||||
# detectors:
|
||||
# - !include ./bl_detectors.yaml
|
||||
detectors:
|
||||
- !include ./bl_detectors.yaml
|
||||
|
||||
#sastt:
|
||||
# - !include ./sastt.yaml
|
||||
|
||||
@@ -534,6 +534,7 @@ omny_panda:
|
||||
INENC4.VAL.Max: interf_st_rotx_max
|
||||
INENC4.VAL.Mean: interf_st_rotx_mean
|
||||
INENC4.VAL.Min: interf_st_rotx_min
|
||||
PCAP.GATE_DURATION.Value: pcap_gate_duration_value
|
||||
deviceTags:
|
||||
- detector
|
||||
enabled: true
|
||||
|
||||
24
csaxs_bec/device_configs/test_config.yaml
Normal file
24
csaxs_bec/device_configs/test_config.yaml
Normal file
@@ -0,0 +1,24 @@
|
||||
galilrioesxbox:
|
||||
description: Galil RIO for remote gain switching and slow reading ES XBox
|
||||
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
|
||||
deviceConfig:
|
||||
host: galilrioesft.psi.ch
|
||||
enabled: true
|
||||
onFailure: raise
|
||||
readOnly: false
|
||||
readoutPriority: baseline
|
||||
connectionTimeout: 20
|
||||
bpm1:
|
||||
readoutPriority: baseline
|
||||
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
|
||||
deviceConfig:
|
||||
blade_t: galilrioesxbox.analog_in.ch0
|
||||
blade_r: galilrioesxbox.analog_in.ch1
|
||||
blade_b: galilrioesxbox.analog_in.ch2
|
||||
blade_l: galilrioesxbox.analog_in.ch3
|
||||
enabled: true
|
||||
readOnly: false
|
||||
softwareTrigger: true
|
||||
needs:
|
||||
- galilrioesxbox
|
||||
|
||||
@@ -13,6 +13,14 @@ from ophyd_devices import PSIDeviceBase
|
||||
logger = bec_logger.logger
|
||||
|
||||
|
||||
class MonitorSignal(Signal):
|
||||
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
|
||||
|
||||
def __init__(self, *, name, auto_monitor=False, **kwargs):
|
||||
super().__init__(name=name, **kwargs)
|
||||
self.auto_monitor = auto_monitor
|
||||
|
||||
|
||||
class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
"""
|
||||
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
|
||||
@@ -26,7 +34,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
SUB_VALUE = "value"
|
||||
_default_sub = SUB_VALUE
|
||||
|
||||
shutter = Cpt(Signal, name="shutter")
|
||||
shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
|
||||
|
||||
# -----------------------------------------------------
|
||||
# User-facing shutter control functions
|
||||
@@ -48,7 +56,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
def fshopen(self):
|
||||
"""Open the fast shutter."""
|
||||
if self._check_if_cSAXS_shutter_exists_in_config():
|
||||
self.shutter.put(1)
|
||||
return self.device_manager.devices["fsh"].fshopen()
|
||||
else:
|
||||
self.shutter.put(1)
|
||||
@@ -56,7 +63,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
|
||||
def fshclose(self):
|
||||
"""Close the fast shutter."""
|
||||
if self._check_if_cSAXS_shutter_exists_in_config():
|
||||
self.shutter.put(0)
|
||||
return self.device_manager.devices["fsh"].fshclose()
|
||||
else:
|
||||
self.shutter.put(0)
|
||||
|
||||
0
csaxs_bec/devices/pseudo_devices/__init__.py
Normal file
0
csaxs_bec/devices/pseudo_devices/__init__.py
Normal file
172
csaxs_bec/devices/pseudo_devices/bpm.py
Normal file
172
csaxs_bec/devices/pseudo_devices/bpm.py
Normal file
@@ -0,0 +1,172 @@
|
||||
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind, Signal
|
||||
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
|
||||
class BPM(PSIPseudoDeviceBase):
|
||||
"""BPM positioner pseudo device."""
|
||||
|
||||
# Blade signals, a,b,c,d
|
||||
left_top = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="left_top",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM left_top blade",
|
||||
)
|
||||
right_top = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="right_top",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM right_top blade",
|
||||
)
|
||||
right_bot = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="right_bot",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM right_bottom blade",
|
||||
)
|
||||
left_bot = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="left_bot",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM left_bot blade",
|
||||
)
|
||||
|
||||
# Virtual signals
|
||||
pos_x = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="pos_x",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM X position, -1 fully left, 1 fully right",
|
||||
)
|
||||
pos_y = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="pos_y",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM Y position, -1 fully bottom, 1 fully top",
|
||||
)
|
||||
diagonal = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="diagonal",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
|
||||
)
|
||||
intensity = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="intensity",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="BPM intensity",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
left_top: str,
|
||||
right_top: str,
|
||||
right_bot: str,
|
||||
left_bot: str,
|
||||
device_manager=None,
|
||||
scan_info=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
# Get all blade signal objects from utility method
|
||||
signal_t = self.left_top.get_device_object_from_bec(
|
||||
object_name=left_top, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_r = self.right_top.get_device_object_from_bec(
|
||||
object_name=right_top, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_b = self.right_bot.get_device_object_from_bec(
|
||||
object_name=right_bot, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
signal_l = self.left_bot.get_device_object_from_bec(
|
||||
object_name=left_bot, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
|
||||
# Set compute methods for blade signals and virtual signals
|
||||
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
|
||||
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
|
||||
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
|
||||
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
|
||||
|
||||
self.intensity.set_compute_method(
|
||||
self._compute_intensity,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
left_bot=self.left_bot,
|
||||
)
|
||||
self.pos_x.set_compute_method(
|
||||
self._compute_pos_x,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
self.pos_y.set_compute_method(
|
||||
self._compute_pos_y,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
self.diagonal.set_compute_method(
|
||||
self._compute_diagonal,
|
||||
left_bot=self.left_bot,
|
||||
left_top=self.left_top,
|
||||
right_top=self.right_top,
|
||||
right_bot=self.right_bot,
|
||||
)
|
||||
|
||||
def _compute_blade_signal(self, signal: Signal) -> float:
|
||||
return signal.get()
|
||||
|
||||
def _compute_intensity(
|
||||
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
|
||||
) -> float:
|
||||
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
|
||||
return intensity
|
||||
|
||||
def _compute_pos_x(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
|
||||
sum_left = left_bot.get() + left_top.get()
|
||||
sum_right = right_top.get() + right_bot.get()
|
||||
sum_total = sum_left + sum_right
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_right - sum_left) / sum_total
|
||||
|
||||
def _compute_pos_y(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
|
||||
sum_top = left_top.get() + right_top.get()
|
||||
sum_bot = right_bot.get() + left_bot.get()
|
||||
sum_total = sum_top + sum_bot
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_top - sum_bot) / sum_total
|
||||
|
||||
def _compute_diagonal(
|
||||
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
|
||||
) -> float:
|
||||
sum_diag1 = left_bot.get() + right_top.get()
|
||||
sum_diag2 = left_top.get() + right_bot.get()
|
||||
sum_total = sum_diag1 + sum_diag2
|
||||
if sum_total == 0:
|
||||
return 0.0
|
||||
return (sum_diag1 - sum_diag2) / sum_total
|
||||
189
csaxs_bec/devices/pseudo_devices/bpm_control.py
Normal file
189
csaxs_bec/devices/pseudo_devices/bpm_control.py
Normal file
@@ -0,0 +1,189 @@
|
||||
"""
|
||||
Module for controlling the BPM amplifier settings, such as gain and coupling.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Literal
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind
|
||||
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
if TYPE_CHECKING: # pragma: no cover
|
||||
from bec_lib.devicemanager import ScanInfo
|
||||
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
|
||||
from ophyd import Signal
|
||||
|
||||
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e3),
|
||||
(0, 0, 1): int(1e4),
|
||||
(0, 1, 0): int(1e5),
|
||||
(0, 1, 1): int(1e6),
|
||||
(1, 0, 0): int(1e7),
|
||||
(1, 0, 1): int(1e8),
|
||||
(1, 1, 0): int(1e9),
|
||||
}
|
||||
|
||||
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
|
||||
(0, 0, 0): int(1e5),
|
||||
(0, 0, 1): int(1e6),
|
||||
(0, 1, 0): int(1e7),
|
||||
(0, 1, 1): int(1e8),
|
||||
(1, 0, 0): int(1e9),
|
||||
(1, 0, 1): int(1e10),
|
||||
(1, 1, 0): int(1e11),
|
||||
}
|
||||
|
||||
_GAIN_TO_BITS: dict[int, tuple] = {}
|
||||
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, True)
|
||||
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
|
||||
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
|
||||
_GAIN_TO_BITS[_gain] = (*_bits, False)
|
||||
|
||||
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
|
||||
|
||||
|
||||
class BPMControl(PSIPseudoDeviceBase):
|
||||
"""
|
||||
BPM amplifier control pseudo device. It is responsible for controlling the
|
||||
gain and coupling for the BPM amplifier. It relies on signals from a device
|
||||
in BEC to be available. For cSAXS, these are most liikely to be from the
|
||||
GalilRIO device that controls the BPM amplifier.
|
||||
|
||||
Args:
|
||||
name (str): Name of the pseudo device.
|
||||
gain_lsb (str): Name of the signal in BEC that controls the LSB
|
||||
of the gain setting.
|
||||
gain_mid (str): Name of the signal in BEC that controls the MID
|
||||
bit of the gain setting.
|
||||
gain_msb (str): Name of the signal in BEC that controls the MSB
|
||||
of the gain setting.
|
||||
coupling (str): Name of the signal in BEC that controls the coupling
|
||||
setting.
|
||||
speed_mode (str): Name of the signal in BEC that controls the speed mode
|
||||
(low-noise vs high-speed) of the amplifier.
|
||||
"""
|
||||
|
||||
USER_ACCESS = ["set_gain", "set_coupling"]
|
||||
|
||||
gain = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="gain",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Gain of the amplifier",
|
||||
)
|
||||
coupling = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="coupling",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Coupling of the amplifier",
|
||||
)
|
||||
speed = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="speed",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Speed of the amplifier",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
gain_lsb: str,
|
||||
gain_mid: str,
|
||||
gain_msb: str,
|
||||
coupling: str,
|
||||
speed_mode: str,
|
||||
device_manager: DeviceManagerDS | None = None,
|
||||
scan_info: ScanInfo | None = None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
|
||||
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
|
||||
self._gain_lsb = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._gain_mid = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._gain_msb = self.gain.get_device_object_from_bec(
|
||||
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._coupling = self.gain.get_device_object_from_bec(
|
||||
object_name=coupling, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
self._speed_mode = self.gain.get_device_object_from_bec(
|
||||
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
|
||||
)
|
||||
|
||||
# Set the compute methods for the virtual signals.
|
||||
self.gain.set_compute_method(
|
||||
self._compute_gain,
|
||||
msb=self._gain_msb,
|
||||
mid=self._gain_mid,
|
||||
lsb=self._gain_lsb,
|
||||
speed_mode=self._speed_mode,
|
||||
)
|
||||
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
|
||||
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
|
||||
|
||||
def set_gain(
|
||||
self,
|
||||
gain: Literal[
|
||||
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
|
||||
],
|
||||
) -> None:
|
||||
"""
|
||||
Set the gain of the amplifier.
|
||||
|
||||
Args:
|
||||
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
|
||||
"""
|
||||
gain_int = int(gain)
|
||||
if gain_int not in VALID_GAINS:
|
||||
raise ValueError(
|
||||
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
|
||||
)
|
||||
|
||||
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
|
||||
|
||||
self._gain_msb.set(bool(msb)).wait(timeout=2)
|
||||
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
|
||||
self._gain_mid.set(bool(mid)).wait(timeout=2)
|
||||
self._speed_mode.set(bool(use_low_noise))
|
||||
|
||||
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
|
||||
"""
|
||||
Set the coupling of the amplifier.
|
||||
|
||||
Args:
|
||||
coupling (Literal): Must be either "AC" or "DC".
|
||||
"""
|
||||
if coupling not in ["AC", "DC"]:
|
||||
raise ValueError(
|
||||
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
|
||||
)
|
||||
self._coupling.set(coupling == "DC").wait(timeout=2)
|
||||
|
||||
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
|
||||
"""Compute the gain based on the bits and speed mode."""
|
||||
bits = (msb.get(), mid.get(), lsb.get())
|
||||
speed_mode = speed_mode.get()
|
||||
if speed_mode:
|
||||
return _GAIN_BITS_LOW_NOISE.get(bits)
|
||||
else:
|
||||
return _GAIN_BITS_HIGH_SPEED.get(bits)
|
||||
|
||||
def _compute_coupling(self, coupling: Signal) -> str:
|
||||
"""Compute the coupling based on the signal."""
|
||||
return "DC" if coupling.get() else "AC"
|
||||
|
||||
def _compute_speed(self, speed: Signal) -> str:
|
||||
"""Compute the speed based on the signal."""
|
||||
return "low_speed" if speed.get() else "high_speed"
|
||||
1
csaxs_bec/devices/pseudo_devices/dlpca200_settings.py
Normal file
1
csaxs_bec/devices/pseudo_devices/dlpca200_settings.py
Normal file
@@ -0,0 +1 @@
|
||||
# from ophyd
|
||||
41
csaxs_bec/devices/pseudo_devices/signal_forwarder.py
Normal file
41
csaxs_bec/devices/pseudo_devices/signal_forwarder.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""
|
||||
Pseudo device that forwards a single BEC signal 1:1.
|
||||
"""
|
||||
|
||||
from ophyd import Component as Cpt
|
||||
from ophyd import Kind, Signal
|
||||
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
|
||||
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
|
||||
|
||||
|
||||
class SignalForwarder(PSIPseudoDeviceBase):
|
||||
"""Forward one signal unchanged."""
|
||||
|
||||
signal = Cpt(
|
||||
BECProcessedSignal,
|
||||
name="signal",
|
||||
model_config=None,
|
||||
kind=Kind.config,
|
||||
doc="Forwarded signal",
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
signal: str,
|
||||
device_manager=None,
|
||||
scan_info=None,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
|
||||
|
||||
src = self.signal.get_device_object_from_bec(
|
||||
object_name=signal,
|
||||
signal_name=self.name,
|
||||
device_manager=device_manager,
|
||||
)
|
||||
|
||||
self.signal.set_compute_method(self._compute_signal, signal=src)
|
||||
|
||||
def _compute_signal(self, signal: Signal) -> float:
|
||||
return signal.get()
|
||||
@@ -6,467 +6,465 @@ from bec_server.file_writer.default_writer import DefaultFormat
|
||||
|
||||
class cSAXSNeXusFormat(DefaultFormat):
|
||||
"""
|
||||
NeXus file format for cSAXS beamline. This format is based on the default NeXus format, but with some additional entries specific to the cSAXS beamline. The structure of the file is based on the NeXus standard, but with some additional groups and datasets specific to the cSAXS beamline.
|
||||
NeXus file format for the cSAXS beamline (BEC era).
|
||||
|
||||
Mirrors the old SPEC layout.xml hierarchy and adds the flOMNI instrument
|
||||
group for the nano-positioning stage used in ptychography.
|
||||
|
||||
Device resilience
|
||||
-----------------
|
||||
Every device read (self.get_entry / device call) is wrapped in try/except.
|
||||
If a device is removed from the BEC config file between sessions it simply
|
||||
disappears from the device_manager — the corresponding dataset or link is
|
||||
silently omitted from the HDF5 file without raising an error. This means
|
||||
the file structure is additive: re-add the device to the config and the
|
||||
field reappears automatically on the next scan.
|
||||
|
||||
Top-level HDF5 structure
|
||||
────────────────────────
|
||||
/entry NXentry (definition = NXptycho)
|
||||
/sample NXsample ← primary sample group
|
||||
/entry_ptycho NXentry ← generic ptycho entry
|
||||
/data_soft NXentry ← convenience Eiger frame links
|
||||
/control NXmonitor
|
||||
/instrument NXinstrument
|
||||
/source
|
||||
/insertion_device
|
||||
/monochromator
|
||||
/XBPM3
|
||||
/slit_3 … slit_5
|
||||
/filter_set
|
||||
/beam_stop_1 … beam_stop_2
|
||||
/eiger_1_5 NXdetector
|
||||
/mcs NXdetector
|
||||
/flOMNI NXpositioner
|
||||
|
||||
Device name mapping (old SPEC → current BEC)
|
||||
────────────────────────────────────────────
|
||||
samx / samy → samx / samy (generic; kept for non-flOMNI configs)
|
||||
sl3wh/wv/ch/cv → sl3trxi/o/b/t (individual blade motors; gap/centre TODO)
|
||||
sl4wh/wv/ch/cv → sl4trxi/o/b/t
|
||||
sl5wh/wv/ch/cv → sl5trxi/o/b/t
|
||||
bs1x / bs1y → bs1x / bs1y
|
||||
bs2x / bs2y → bs2x / bs2y
|
||||
dettrx → dettrx
|
||||
eiger_4 → eiger_1_5
|
||||
mcs → mcs
|
||||
filter_array → filter_array_1_x … filter_array_4_x
|
||||
xbpm3 → xbpm3x / xbpm3y (stage positions; signal readouts TODO)
|
||||
energy → ccm_energy
|
||||
|
||||
TODO (devices not yet in BEC list)
|
||||
───────────────────────────────────
|
||||
curr, idgap ring current, undulator gap
|
||||
moth1, mobd monochromator crystal angles
|
||||
mith, mibd, mirror_coating mirror
|
||||
bpm3s/x/y/z XBPM3 signal readouts
|
||||
sl0 / sl1 / sl2 upstream optics-hutch slits
|
||||
slit gap / centre derived from blade pairs + calibration offset
|
||||
"""
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def _safe_dataset(self, group, name: str, device: str,
|
||||
units: str | None = None,
|
||||
description: str | None = None) -> None:
|
||||
"""
|
||||
Write a dataset from the BEC scan data dictionary.
|
||||
Silently skips if the device was not recorded in this scan
|
||||
(e.g. removed from config, readoutPriority=on_request and not triggered,
|
||||
or the scan finished before the device responded).
|
||||
"""
|
||||
try:
|
||||
value = self.get_entry(device)
|
||||
ds = group.create_dataset(name, data=value)
|
||||
if units:
|
||||
ds.attrs["units"] = units
|
||||
if description:
|
||||
ds.attrs["description"] = description
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _safe_soft_link(self, group, name: str, target: str) -> None:
|
||||
"""Create a soft link; silently skip on any error."""
|
||||
try:
|
||||
group.create_soft_link(name, target)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _slit_blades(self, group, prefix: str) -> None:
|
||||
"""
|
||||
Store individual blade motor positions for a 4-blade slit set.
|
||||
Derived quantities (gap, centre) require a per-slit calibration offset
|
||||
and will be added in a later update.
|
||||
"""
|
||||
for blade, motor in [
|
||||
("inner_x", f"{prefix}trxi"),
|
||||
("outer_x", f"{prefix}trxo"),
|
||||
("bottom_y", f"{prefix}trxb"),
|
||||
("top_y", f"{prefix}trxt"),
|
||||
]:
|
||||
self._safe_dataset(group, blade, motor, units="mm")
|
||||
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Main format method
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def format(self) -> None:
|
||||
"""
|
||||
Prepare the NeXus file format.
|
||||
Override this method in file writer plugins to customize the HDF5 file format.
|
||||
"""Build the NeXus/HDF5 layout for a cSAXS scan."""
|
||||
|
||||
The class provides access to the following attributes:
|
||||
- self.storage: The HDF5Storage object.
|
||||
- self.data: The data dictionary.
|
||||
- self.file_references: The file references dictionary, which has the link to external data.
|
||||
- self.device_manager: The DeviceManagerBase object.
|
||||
- self.get_entry(name, default=None): Helper method to get an entry from the data dictionary.
|
||||
# Canonical paths referenced by multiple groups
|
||||
RT_POS_PATH = "/entry/instrument/flOMNI/rt_positions"
|
||||
EIGER_COLL = "/entry/collection/file_references/eiger_1_5"
|
||||
|
||||
See also: :class:`bec_server.file_writer.file_writer.HDF5Storage`.
|
||||
# ── Root entry ────────────────────────────────────────────────────────
|
||||
entry = self.storage.create_group("entry")
|
||||
entry.attrs["NX_class"] = "NXentry"
|
||||
entry.attrs["definition"] = "NXptycho"
|
||||
|
||||
"""
|
||||
# ── /entry/sample ─────────────────────────────────────────────────────
|
||||
# Primary sample group. Contains the name of the mounted sample and a
|
||||
# link to the real-time scan positions. Generic samx/samy are recorded
|
||||
# here so the group is meaningful for non-flOMNI configurations too.
|
||||
sample = entry.create_group("sample")
|
||||
sample.attrs["NX_class"] = "NXsample"
|
||||
# Soft-link name directly to the value BEC recorded in the collection.
|
||||
# Only written when flomni_samples is present; other configs leave name absent.
|
||||
if "flomni_samples" in self.device_manager.devices:
|
||||
self._safe_soft_link(
|
||||
sample, "name",
|
||||
"/entry/collection/devices/flomni_samples"
|
||||
"/flomni_samples_sample_names_sample0/value",
|
||||
)
|
||||
# Generic coarse stage positions (meaningful in non-flOMNI setups)
|
||||
self._safe_dataset(sample, "x_translation", "samx", units="mm")
|
||||
self._safe_dataset(sample, "y_translation", "samy", units="mm")
|
||||
# Real-time encoder positions — the primary scan coordinate
|
||||
self._safe_soft_link(sample, "positions", RT_POS_PATH)
|
||||
|
||||
# entry = self.storage.create_group("entry")
|
||||
# ── /entry/entry_ptycho ───────────────────────────────────────────────
|
||||
# Generic ptychography entry. Detector data and scan positions are
|
||||
# linked in from the instrument groups so this entry is self-contained
|
||||
# for downstream reconstruction codes.
|
||||
entry_ptycho = entry.create_group("entry_ptycho")
|
||||
entry_ptycho.attrs["NX_class"] = "NXentry"
|
||||
entry_ptycho.attrs["definition"] = "NXptycho"
|
||||
|
||||
# # /entry/control
|
||||
# control = entry.create_group("control")
|
||||
# control.attrs["NX_class"] = "NXmonitor"
|
||||
# control.create_dataset(name="mode", data="monitor")
|
||||
nxdata = entry_ptycho.create_group("data")
|
||||
nxdata.attrs["NX_class"] = "NXdata"
|
||||
nxdata.attrs["signal"] = "data"
|
||||
# Detector frames
|
||||
try:
|
||||
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
|
||||
self._safe_soft_link(nxdata, k, f"{EIGER_COLL}/{k}")
|
||||
except Exception:
|
||||
pass
|
||||
# Scan positions
|
||||
self._safe_soft_link(nxdata, "positions", RT_POS_PATH)
|
||||
|
||||
# #########
|
||||
# # EXAMPLE for soft link
|
||||
# #########
|
||||
# # /entry/data
|
||||
# if "eiger_4" in self.device_manager.devices:
|
||||
# entry.create_soft_link(name="data", target="/entry/instrument/eiger_4")
|
||||
# Link to the primary sample group
|
||||
self._safe_soft_link(entry_ptycho, "sample", "/entry/sample")
|
||||
|
||||
# ########
|
||||
# # EXAMPLE for external link
|
||||
# ########
|
||||
# # control = entry.create_group("sample")
|
||||
# # control.create_ext_link("data", self.file_references["eiger9m"]["path"], "EG9M/data")
|
||||
# ── /entry/data_soft ──────────────────────────────────────────────────
|
||||
# Convenience group mirroring the old /entry/data hardlink from layout.xml.
|
||||
data_soft = entry.create_group("data_soft")
|
||||
data_soft.attrs["NX_class"] = "NXentry"
|
||||
try:
|
||||
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
|
||||
self._safe_soft_link(data_soft, k, f"{EIGER_COLL}/{k}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# # /entry/sample
|
||||
# control = entry.create_group("sample")
|
||||
# control.attrs["NX_class"] = "NXsample"
|
||||
# control.create_dataset(name="name", data=self.data.get("samplename"))
|
||||
# control.create_dataset(name="description", data=self.data.get("sample_description"))
|
||||
# ── /entry/control ────────────────────────────────────────────────────
|
||||
control = entry.create_group("control")
|
||||
control.attrs["NX_class"] = "NXmonitor"
|
||||
control.create_dataset("mode", data="monitor")
|
||||
# TODO: beam intensity integral — add device when available
|
||||
# self._safe_dataset(control, "integral", "bpm_sum", units="NX_DIMENSIONLESS")
|
||||
|
||||
# # /entry/instrument
|
||||
# instrument = entry.create_group("instrument")
|
||||
# instrument.attrs["NX_class"] = "NXinstrument"
|
||||
# ── /entry/instrument ─────────────────────────────────────────────────
|
||||
instrument = entry.create_group("instrument")
|
||||
instrument.attrs["NX_class"] = "NXinstrument"
|
||||
instrument.create_dataset("name", data="cSAXS beamline")
|
||||
|
||||
# source = instrument.create_group("source")
|
||||
# source.attrs["NX_class"] = "NXsource"
|
||||
# source.create_dataset(name="type", data="Synchrotron X-ray Source")
|
||||
# source.create_dataset(name="name", data="Swiss Light Source")
|
||||
# source.create_dataset(name="probe", data="x-ray")
|
||||
# ── Source ────────────────────────────────────────────────────────────
|
||||
# Numerical values are currently unknown and stored as 0.
|
||||
# Will be updated once the corresponding devices are in BEC.
|
||||
source = instrument.create_group("source")
|
||||
source.attrs["NX_class"] = "NXsource"
|
||||
source.create_dataset("type", data="Synchrotron X-ray Source")
|
||||
source.create_dataset("name", data="Swiss Light Source")
|
||||
source.create_dataset("probe", data="x-ray")
|
||||
source.create_dataset("sigma_x", data=0.0).attrs["units"] = "mm"
|
||||
source.create_dataset("sigma_y", data=0.0).attrs["units"] = "mm"
|
||||
source.create_dataset("divergence_x", data=0.0).attrs["units"] = "radians"
|
||||
source.create_dataset("divergence_y", data=0.0).attrs["units"] = "radians"
|
||||
# TODO: current — add device when available
|
||||
# self._safe_dataset(source, "current", "curr", units="mA")
|
||||
|
||||
# # /entry
|
||||
# entry = self.storage.create_group("entry")
|
||||
# entry.attrs["NX_class"] = "NXentry"
|
||||
# entry.attrs["definition"] = "NXsas"
|
||||
# entry.attrs["start_time"] = self.data.get("start_time")
|
||||
# entry.attrs["end_time"] = self.data.get("end_time")
|
||||
# entry.attrs["version"] = 1.0
|
||||
|
||||
# # /entry/control
|
||||
# control = entry.create_group("control")
|
||||
# control.attrs["NX_class"] = "NXmonitor"
|
||||
# control.create_dataset(name="mode", data="monitor")
|
||||
# control.create_dataset(name="integral", data=self.get_entry("bpm4i"))
|
||||
|
||||
# # /entry/data
|
||||
# main_data = entry.create_group("data")
|
||||
# main_data.attrs["NX_class"] = "NXdata"
|
||||
# if "eiger_4" in self.device_manager.devices:
|
||||
# main_data.create_soft_link(name="data", target="/entry/instrument/eiger_4/data")
|
||||
# elif "eiger9m" in self.device_manager.devices:
|
||||
# main_data.create_soft_link(name="data", target="/entry/instrument/eiger9m/data")
|
||||
# elif "pilatus_2" in self.device_manager.devices:
|
||||
# main_data.create_soft_link(name="data", target="/entry/instrument/pilatus_2/data")
|
||||
|
||||
# # /entry/sample
|
||||
# control = entry.create_group("sample")
|
||||
# control.attrs["NX_class"] = "NXsample"
|
||||
# control.create_dataset(name="name", data=self.get_entry("samplename"))
|
||||
# control.create_dataset(name="description", data=self.data.get("sample_description"))
|
||||
# x_translation = control.create_dataset(name="x_translation", data=self.get_entry("samx"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# y_translation = control.create_dataset(name="y_translation", data=self.get_entry("samy"))
|
||||
# y_translation.attrs["units"] = "mm"
|
||||
# temperature_log = control.create_dataset(
|
||||
# name="temperature_log", data=self.get_entry("temp")
|
||||
# )
|
||||
# temperature_log.attrs["units"] = "K"
|
||||
|
||||
# # /entry/instrument
|
||||
# instrument = entry.create_group("instrument")
|
||||
# instrument.attrs["NX_class"] = "NXinstrument"
|
||||
# instrument.create_dataset(name="name", data="cSAXS beamline")
|
||||
|
||||
# source = instrument.create_group("source")
|
||||
# source.attrs["NX_class"] = "NXsource"
|
||||
# source.create_dataset(name="type", data="Synchrotron X-ray Source")
|
||||
# source.create_dataset(name="name", data="Swiss Light Source")
|
||||
# source.create_dataset(name="probe", data="x-ray")
|
||||
# distance = source.create_dataset(
|
||||
# name="distance", data=-33800 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
# sigma_x = source.create_dataset(name="sigma_x", data=0.202)
|
||||
# sigma_x.attrs["units"] = "mm"
|
||||
# sigma_y = source.create_dataset(name="sigma_y", data=0.018)
|
||||
# sigma_y.attrs["units"] = "mm"
|
||||
# divergence_x = source.create_dataset(name="divergence_x", data=0.000135)
|
||||
# divergence_x.attrs["units"] = "radians"
|
||||
# divergence_y = source.create_dataset(name="divergence_y", data=0.000025)
|
||||
# divergence_y.attrs["units"] = "radians"
|
||||
# current = source.create_dataset(name="current", data=self.get_entry("curr"))
|
||||
# current.attrs["units"] = "mA"
|
||||
|
||||
# insertion_device = instrument.create_group("insertion_device")
|
||||
# insertion_device.attrs["NX_class"] = "NXinsertion_device"
|
||||
# source.create_dataset(name="type", data="undulator")
|
||||
# gap = source.create_dataset(name="gap", data=self.get_entry("idgap"))
|
||||
# gap.attrs["units"] = "mm"
|
||||
# k = source.create_dataset(name="k", data=2.46)
|
||||
# k.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# length = source.create_dataset(name="length", data=1820)
|
||||
# length.attrs["units"] = "mm"
|
||||
|
||||
# slit_0 = instrument.create_group("slit_0")
|
||||
# slit_0.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="OFHC Cu")
|
||||
# source.create_dataset(name="description", data="Horizontal secondary source slit")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl0wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl0ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(
|
||||
# name="distance", data=-21700 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
# slit_1 = instrument.create_group("slit_1")
|
||||
# slit_1.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="OFHC Cu")
|
||||
# source.create_dataset(name="description", data="Horizontal secondary source slit")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl1wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl1wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl1ch"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(
|
||||
# name="distance", data=-7800 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
# mono = instrument.create_group("monochromator")
|
||||
# mono.attrs["NX_class"] = "NXmonochromator"
|
||||
# mokev = self.data.get("mokev", {})
|
||||
# if mokev:
|
||||
# if isinstance(mokev, list):
|
||||
# mokev = mokev[0]
|
||||
# wavelength = mono.create_dataset(
|
||||
# name="wavelength", data=12.3984193 / (mokev.get("mokev").get("value") + 1e-9)
|
||||
# )
|
||||
# wavelength.attrs["units"] = "Angstrom"
|
||||
# energy = mono.create_dataset(name="energy", data=mokev.get("mokev").get("value"))
|
||||
# energy.attrs["units"] = "keV"
|
||||
# mono.create_dataset(name="type", data="Double crystal fixed exit monochromator.")
|
||||
# distance = mono.create_dataset(
|
||||
# name="distance", data=-5220 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
# ── Insertion device ──────────────────────────────────────────────────
|
||||
insertion_device = instrument.create_group("insertion_device")
|
||||
insertion_device.attrs["NX_class"] = "NXinsertion_device"
|
||||
insertion_device.create_dataset("type", data="undulator")
|
||||
insertion_device.create_dataset("k", data=0.0)
|
||||
insertion_device.create_dataset("length", data=0.0).attrs["units"] = "mm"
|
||||
# TODO: gap — add device when available
|
||||
# self._safe_dataset(insertion_device, "gap", "idgap", units="mm")
|
||||
|
||||
# ── Monochromator ─────────────────────────────────────────────────────
|
||||
# ccm_energy is a baseline device and is recorded in the scan data.
|
||||
mono = instrument.create_group("monochromator")
|
||||
mono.attrs["NX_class"] = "NXmonochromator"
|
||||
mono.create_dataset("type", data="Double crystal fixed exit monochromator.")
|
||||
try:
|
||||
energy_kev = self.get_entry("ccm_energy")
|
||||
energy_arr = np.asarray(energy_kev, dtype=float)
|
||||
en_ds = mono.create_dataset("energy", data=energy_arr)
|
||||
en_ds.attrs["units"] = "keV"
|
||||
with np.errstate(divide="ignore", invalid="ignore"):
|
||||
wavelength = np.where(energy_arr != 0, 12.3984193 / energy_arr, 0.0)
|
||||
wl_ds = mono.create_dataset("wavelength", data=wavelength)
|
||||
wl_ds.attrs["units"] = "Angstrom"
|
||||
except Exception:
|
||||
pass
|
||||
# TODO: crystal angles — add moth1 / mobd when available
|
||||
# crystal_1 = mono.create_group("crystal_1")
|
||||
# crystal_1.attrs["NX_class"] = "NXcrystal"
|
||||
# crystal_1.create_dataset(name="usage", data="Bragg")
|
||||
# crystal_1.create_dataset(name="order_no", data="1")
|
||||
# crystal_1.create_dataset(name="reflection", data="[1 1 1]")
|
||||
# bragg_angle = crystal_1.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
|
||||
# bragg_angle.attrs["units"] = "degrees"
|
||||
|
||||
# crystal_1.create_dataset("usage", data="Bragg")
|
||||
# crystal_1.create_dataset("type", data="Si")
|
||||
# crystal_1.create_dataset("order_no", data=1.0)
|
||||
# crystal_1.create_dataset("reflection", data="[1 1 1]")
|
||||
# self._safe_dataset(crystal_1, "bragg_angle", "moth1", units="degrees")
|
||||
# crystal_2 = mono.create_group("crystal_2")
|
||||
# crystal_2.attrs["NX_class"] = "NXcrystal"
|
||||
# crystal_2.create_dataset(name="usage", data="Bragg")
|
||||
# crystal_2.create_dataset(name="order_no", data="2")
|
||||
# crystal_2.create_dataset(name="reflection", data="[1 1 1]")
|
||||
# bragg_angle = crystal_2.create_dataset(name="bragg_angle", data=self.get_entry("moth1"))
|
||||
# bragg_angle.attrs["units"] = "degrees"
|
||||
# bend_x = crystal_2.create_dataset(name="bend_x", data=self.get_entry("mobd"))
|
||||
# bend_x.attrs["units"] = "degrees"
|
||||
|
||||
# xbpm4 = instrument.create_group("XBPM4")
|
||||
# xbpm4.attrs["NX_class"] = "NXdetector"
|
||||
# xbpm4_sum = xbpm4.create_group("XBPM4_sum")
|
||||
# xbpm4_sum_data = xbpm4_sum.create_dataset(name="data", data=self.get_entry("bpm4s"))
|
||||
# xbpm4_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm4_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
|
||||
# xbpm4_x = xbpm4.create_group("XBPM4_x")
|
||||
# xbpm4_x_data = xbpm4_x.create_dataset(name="data", data=self.get_entry("bpm4x"))
|
||||
# xbpm4_x_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm4_x.create_dataset(
|
||||
# name="description",
|
||||
# data="Normalized difference of counts between left and right quadrants.",
|
||||
# )
|
||||
# xbpm4_y = xbpm4.create_group("XBPM4_y")
|
||||
# xbpm4_y_data = xbpm4_y.create_dataset(name="data", data=self.get_entry("bpm4y"))
|
||||
# xbpm4_y_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm4_y.create_dataset(
|
||||
# name="description",
|
||||
# data="Normalized difference of counts between high and low quadrants.",
|
||||
# )
|
||||
# xbpm4_skew = xbpm4.create_group("XBPM4_skew")
|
||||
# xbpm4_skew_data = xbpm4_skew.create_dataset(name="data", data=self.get_entry("bpm4z"))
|
||||
# xbpm4_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm4_skew.create_dataset(
|
||||
# name="description", data="Normalized difference of counts between diagonal quadrants."
|
||||
# )
|
||||
# crystal_2.create_dataset("usage", data="Bragg")
|
||||
# crystal_2.create_dataset("type", data="Si")
|
||||
# crystal_2.create_dataset("order_no", data=2.0)
|
||||
# crystal_2.create_dataset("reflection", data="[1 1 1]")
|
||||
# self._safe_dataset(crystal_2, "bragg_angle", "moth1", units="degrees")
|
||||
# self._safe_dataset(crystal_2, "bend_x", "mobd", units="degrees")
|
||||
|
||||
# ── Mirror ────────────────────────────────────────────────────────────
|
||||
# TODO: mith, mibd, mirror_coating not yet in device list
|
||||
# mirror = instrument.create_group("mirror")
|
||||
# mirror.attrs["NX_class"] = "NXmirror"
|
||||
# mirror.create_dataset(name="type", data="single")
|
||||
# mirror.create_dataset("type", data="single")
|
||||
# mirror.create_dataset(
|
||||
# name="description",
|
||||
# data="Grazing incidence mirror to reject high-harmonic wavelengths from the monochromator. There are three coating options available that are used depending on the X-ray energy, no coating (SiO2), rhodium (Rh) or platinum (Pt).",
|
||||
# )
|
||||
# incident_angle = mirror.create_dataset(name="incident_angle", data=self.get_entry("mith"))
|
||||
# incident_angle.attrs["units"] = "degrees"
|
||||
# substrate_material = mirror.create_dataset(name="substrate_material", data="SiO2")
|
||||
# substrate_material.attrs["units"] = "NX_CHAR"
|
||||
# coating_material = mirror.create_dataset(name="coating_material", data="SiO2")
|
||||
# coating_material.attrs["units"] = "NX_CHAR"
|
||||
# bend_y = mirror.create_dataset(name="bend_y", data="mibd")
|
||||
# bend_y.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# distance = mirror.create_dataset(
|
||||
# name="distance", data=-4370 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
|
||||
# xbpm5 = instrument.create_group("XBPM5")
|
||||
# xbpm5.attrs["NX_class"] = "NXdetector"
|
||||
# xbpm5_sum = xbpm5.create_group("XBPM5_sum")
|
||||
# xbpm5_sum_data = xbpm5_sum.create_dataset(name="data", data=self.get_entry("bpm5s"))
|
||||
# xbpm5_sum_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm5_sum.create_dataset(name="description", data="Sum of counts for the four quadrants.")
|
||||
# xbpm5_x = xbpm5.create_group("XBPM5_x")
|
||||
# xbpm5_x_data = xbpm5_x.create_dataset(name="data", data=self.get_entry("bpm5x"))
|
||||
# xbpm5_x_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm5_x.create_dataset(
|
||||
# name="description",
|
||||
# data="Normalized difference of counts between left and right quadrants.",
|
||||
# )
|
||||
# xbpm5_y = xbpm5.create_group("XBPM5_y")
|
||||
# xbpm5_y_data = xbpm5_y.create_dataset(name="data", data=self.get_entry("bpm5y"))
|
||||
# xbpm5_y_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm5_y.create_dataset(
|
||||
# name="description",
|
||||
# data="Normalized difference of counts between high and low quadrants.",
|
||||
# )
|
||||
# xbpm5_skew = xbpm5.create_group("XBPM5_skew")
|
||||
# xbpm5_skew_data = xbpm5_skew.create_dataset(name="data", data=self.get_entry("bpm5z"))
|
||||
# xbpm5_skew_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# xbpm5_skew.create_dataset(
|
||||
# name="description", data="Normalized difference of counts between diagonal quadrants."
|
||||
# "description",
|
||||
# data=(
|
||||
# "Grazing incidence mirror to reject high-harmonic wavelengths. "
|
||||
# "Three coating options: no coating (SiO2), rhodium (Rh), platinum (Pt)."
|
||||
# ),
|
||||
# )
|
||||
# mirror.create_dataset("substrate_material", data="SiO2")
|
||||
# self._safe_dataset(mirror, "incident_angle", "mith", units="degrees")
|
||||
# self._safe_dataset(mirror, "coating_material", "mirror_coating", units="NX_CHAR")
|
||||
# self._safe_dataset(mirror, "bend_y", "mibd", units="NX_DIMENSIONLESS")
|
||||
|
||||
# ── Upstream slits (optics hutch) ─────────────────────────────────────
|
||||
# TODO: slit_0 / slit_1 / slit_2 motors not yet in BEC device list
|
||||
# slit_0 = instrument.create_group("slit_0")
|
||||
# ...
|
||||
# slit_1 = instrument.create_group("slit_1")
|
||||
# ...
|
||||
# slit_2 = instrument.create_group("slit_2")
|
||||
# slit_2.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="Ag")
|
||||
# source.create_dataset(name="description", data="Slit 2, optics hutch")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl2wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl2wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl2ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl2cv"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# distance = source.create_dataset(
|
||||
# name="distance", data=-3140 - np.asarray(self.get_entry("samz", 0))
|
||||
# )
|
||||
# distance.attrs["units"] = "mm"
|
||||
# ...
|
||||
|
||||
# slit_3 = instrument.create_group("slit_3")
|
||||
# slit_3.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="Si")
|
||||
# source.create_dataset(name="description", data="Slit 3, experimental hutch, exposure box")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl3wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl3wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl3ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl3cv"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
|
||||
# # distance.attrs["units"] = "mm"
|
||||
# ── XBPM3 ─────────────────────────────────────────────────────────────
|
||||
# xbpm3x/xbpm3y are stage motor positions for aligning the monitor.
|
||||
# Signal readouts (sum/x/y/skew) are TODO once MCS channels are mapped.
|
||||
xbpm3 = instrument.create_group("XBPM3")
|
||||
xbpm3.attrs["NX_class"] = "NXdetector"
|
||||
xbpm3.attrs["description"] = "X-ray beam position monitor 3, experimental hutch"
|
||||
self._safe_dataset(xbpm3, "x_stage", "xbpm3x", units="mm",
|
||||
description="XBPM3 stage x-translation")
|
||||
self._safe_dataset(xbpm3, "y_stage", "xbpm3y", units="mm",
|
||||
description="XBPM3 stage y-translation")
|
||||
# TODO: signal readout sub-groups once MCS channels are configured
|
||||
# for suffix, entry_name, desc in [
|
||||
# ("sum", "bpm3s", "Sum of counts for the four quadrants."),
|
||||
# ("x", "bpm3x", "Normalized diff, left vs right quadrants."),
|
||||
# ("y", "bpm3y", "Normalized diff, high vs low quadrants."),
|
||||
# ("skew", "bpm3z", "Normalized diff, diagonal quadrants."),
|
||||
# ]:
|
||||
# g = xbpm3.create_group(f"XBPM3_{suffix}")
|
||||
# self._safe_dataset(g, "data", entry_name, units="NX_DIMENSIONLESS")
|
||||
# g.create_dataset("description", data=desc)
|
||||
|
||||
# filter_set = instrument.create_group("filter_set")
|
||||
# filter_set.attrs["NX_class"] = "NXattenuator"
|
||||
# filter_set.create_dataset(name="material", data="Si")
|
||||
# filter_set.create_dataset(
|
||||
# name="description",
|
||||
# data="The filter set consists of 4 linear stages, each with five filter positions. Additionally, each one allows for an out position to allow 'no filtering'.",
|
||||
# )
|
||||
# attenuator_transmission = filter_set.create_dataset(
|
||||
# name="attenuator_transmission", data=10 ** self.get_entry("ftrans", 0)
|
||||
# )
|
||||
# attenuator_transmission.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# ── Slit 3 (experimental hutch, exposure box) ─────────────────────────
|
||||
slit_3 = instrument.create_group("slit_3")
|
||||
slit_3.attrs["NX_class"] = "NXslit"
|
||||
slit_3.create_dataset("material", data="Si")
|
||||
slit_3.create_dataset("description", data="Slit 3, experimental hutch, exposure box")
|
||||
# TODO: gap / centre require per-slit calibration offset — add later
|
||||
self._slit_blades(slit_3, "sl3")
|
||||
|
||||
# slit_4 = instrument.create_group("slit_4")
|
||||
# slit_4.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="Si")
|
||||
# source.create_dataset(name="description", data="Slit 4, experimental hutch, exposure box")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl4wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl4wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl4ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl4cv"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
|
||||
# # distance.attrs["units"] = "mm"
|
||||
# ── Filter set ────────────────────────────────────────────────────────
|
||||
filter_set = instrument.create_group("filter_set")
|
||||
filter_set.attrs["NX_class"] = "NXattenuator"
|
||||
filter_set.create_dataset("material", data="Si")
|
||||
filter_set.create_dataset(
|
||||
"description",
|
||||
data=(
|
||||
"Four linear filter stages (filter_array_1_x … filter_array_4_x). "
|
||||
"Each stage has five filter positions plus an 'out' position."
|
||||
),
|
||||
)
|
||||
for i in range(1, 5):
|
||||
self._safe_dataset(filter_set, f"stage_{i}_x",
|
||||
f"filter_array_{i}_x", units="mm")
|
||||
# TODO: attenuator_transmission = 10^(ftrans) once device is available
|
||||
|
||||
# slit_5 = instrument.create_group("slit_5")
|
||||
# slit_5.attrs["NX_class"] = "NXslit"
|
||||
# source.create_dataset(name="material", data="Si")
|
||||
# source.create_dataset(name="description", data="Slit 5, experimental hutch, exposure box")
|
||||
# x_gap = source.create_dataset(name="x_gap", data=self.get_entry("sl5wh"))
|
||||
# x_gap.attrs["units"] = "mm"
|
||||
# y_gap = source.create_dataset(name="y_gap", data=self.get_entry("sl5wv"))
|
||||
# y_gap.attrs["units"] = "mm"
|
||||
# x_translation = source.create_dataset(name="x_translation", data=self.get_entry("sl5ch"))
|
||||
# x_translation.attrs["units"] = "mm"
|
||||
# height = source.create_dataset(name="x_translation", data=self.get_entry("sl5cv"))
|
||||
# height.attrs["units"] = "mm"
|
||||
# # distance = source.create_dataset(name="distance", data=-3140 - self.get_entry("samz", 0))
|
||||
# # distance.attrs["units"] = "mm"
|
||||
# ── Slit 4 (experimental hutch, exposure box) ─────────────────────────
|
||||
slit_4 = instrument.create_group("slit_4")
|
||||
slit_4.attrs["NX_class"] = "NXslit"
|
||||
slit_4.create_dataset("material", data="Ge")
|
||||
slit_4.create_dataset("description", data="Slit 4, experimental hutch, exposure box")
|
||||
self._slit_blades(slit_4, "sl4")
|
||||
|
||||
# beam_stop_1 = instrument.create_group("beam_stop_1")
|
||||
# beam_stop_1.attrs["NX_class"] = "NX_beamstop"
|
||||
# beam_stop_1.create_dataset(name="description", data="circular")
|
||||
# bms1_size = beam_stop_1.create_dataset(name="size", data=3)
|
||||
# bms1_size.attrs["units"] = "mm"
|
||||
# bms1_x = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1x"))
|
||||
# bms1_x.attrs["units"] = "mm"
|
||||
# bms1_y = beam_stop_1.create_dataset(name="size", data=self.get_entry("bs1y"))
|
||||
# bms1_y.attrs["units"] = "mm"
|
||||
# ── Slit 5 (experimental hutch, exposure box) ─────────────────────────
|
||||
slit_5 = instrument.create_group("slit_5")
|
||||
slit_5.attrs["NX_class"] = "NXslit"
|
||||
slit_5.create_dataset("material", data="Si")
|
||||
slit_5.create_dataset("description", data="Slit 5, experimental hutch, exposure box")
|
||||
self._slit_blades(slit_5, "sl5")
|
||||
|
||||
# beam_stop_2 = instrument.create_group("beam_stop_2")
|
||||
# beam_stop_2.attrs["NX_class"] = "NX_beamstop"
|
||||
# beam_stop_2.create_dataset(name="description", data="rectangular")
|
||||
# bms2_size_x = beam_stop_2.create_dataset(name="size_x", data=5)
|
||||
# bms2_size_x.attrs["units"] = "mm"
|
||||
# bms2_size_y = beam_stop_2.create_dataset(name="size_y", data=2.25)
|
||||
# bms2_size_y.attrs["units"] = "mm"
|
||||
# bms2_x = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2x"))
|
||||
# bms2_x.attrs["units"] = "mm"
|
||||
# bms2_y = beam_stop_2.create_dataset(name="size", data=self.get_entry("bs2y"))
|
||||
# bms2_y.attrs["units"] = "mm"
|
||||
# bms2_data = beam_stop_2.create_dataset(name="data", data=self.get_entry("diode"))
|
||||
# bms2_data.attrs["units"] = "NX_DIMENSIONLESS"
|
||||
# ── Beam stop 1 ────────────────────────────────────────────────────────
|
||||
beam_stop_1 = instrument.create_group("beam_stop_1")
|
||||
beam_stop_1.attrs["NX_class"] = "NXbeam_stop"
|
||||
beam_stop_1.create_dataset("description", data="circular")
|
||||
beam_stop_1.create_dataset("size", data=3.0).attrs["units"] = "mm"
|
||||
self._safe_dataset(beam_stop_1, "x", "bs1x", units="mm")
|
||||
self._safe_dataset(beam_stop_1, "y", "bs1y", units="mm")
|
||||
# TODO: diode signal behind beam stop 1 when device is available
|
||||
|
||||
# if (
|
||||
# "eiger1p5m" in self.device_manager.devices
|
||||
# and self.device_manager.devices.eiger1p5m.enabled
|
||||
# ):
|
||||
# eiger_4 = instrument.create_group("eiger_4")
|
||||
# eiger_4.attrs["NX_class"] = "NXdetector"
|
||||
# x_pixel_size = eiger_4.create_dataset(name="x_pixel_size", data=75)
|
||||
# x_pixel_size.attrs["units"] = "um"
|
||||
# y_pixel_size = eiger_4.create_dataset(name="y_pixel_size", data=75)
|
||||
# y_pixel_size.attrs["units"] = "um"
|
||||
# polar_angle = eiger_4.create_dataset(name="polar_angle", data=0)
|
||||
# polar_angle.attrs["units"] = "degrees"
|
||||
# azimuthal_angle = eiger_4.create_dataset(name="azimuthal_angle", data=0)
|
||||
# azimuthal_angle.attrs["units"] = "degrees"
|
||||
# rotation_angle = eiger_4.create_dataset(name="rotation_angle", data=0)
|
||||
# rotation_angle.attrs["units"] = "degrees"
|
||||
# description = eiger_4.create_dataset(
|
||||
# name="description", data="Single-photon counting detector, 320 micron-thick Si chip"
|
||||
# )
|
||||
# orientation = eiger_4.create_group("orientation")
|
||||
# orientation.attrs["description"] = (
|
||||
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
# )
|
||||
# orientation.create_dataset(name="transpose", data=1)
|
||||
# orientation.create_dataset(name="rot90", data=3)
|
||||
# ── Beam stop 2 ────────────────────────────────────────────────────────
|
||||
beam_stop_2 = instrument.create_group("beam_stop_2")
|
||||
beam_stop_2.attrs["NX_class"] = "NXbeam_stop"
|
||||
beam_stop_2.create_dataset("description", data="rectangular")
|
||||
beam_stop_2.create_dataset("size_x", data=5.0).attrs["units"] = "mm"
|
||||
beam_stop_2.create_dataset("size_y", data=2.25).attrs["units"] = "mm"
|
||||
self._safe_dataset(beam_stop_2, "x", "bs2x", units="mm")
|
||||
self._safe_dataset(beam_stop_2, "y", "bs2y", units="mm")
|
||||
# TODO: diode (transmitted signal) when device is available
|
||||
|
||||
# if (
|
||||
# "eiger9m" in self.device_manager.devices
|
||||
# and self.device_manager.devices.eiger9m.enabled
|
||||
# and "eiger9m" in self.file_references
|
||||
# ):
|
||||
# eiger9m = instrument.create_group("eiger9m")
|
||||
# eiger9m.attrs["NX_class"] = "NXdetector"
|
||||
# x_pixel_size = eiger9m.create_dataset(name="x_pixel_size", data=75)
|
||||
# x_pixel_size.attrs["units"] = "um"
|
||||
# y_pixel_size = eiger9m.create_dataset(name="y_pixel_size", data=75)
|
||||
# y_pixel_size.attrs["units"] = "um"
|
||||
# polar_angle = eiger9m.create_dataset(name="polar_angle", data=0)
|
||||
# polar_angle.attrs["units"] = "degrees"
|
||||
# azimuthal_angle = eiger9m.create_dataset(name="azimuthal_angle", data=0)
|
||||
# azimuthal_angle.attrs["units"] = "degrees"
|
||||
# rotation_angle = eiger9m.create_dataset(name="rotation_angle", data=0)
|
||||
# rotation_angle.attrs["units"] = "degrees"
|
||||
# description = eiger9m.create_dataset(
|
||||
# name="description",
|
||||
# data="Eiger9M detector, in-house developed, Paul Scherrer Institute",
|
||||
# )
|
||||
# orientation = eiger9m.create_group("orientation")
|
||||
# orientation.attrs["description"] = (
|
||||
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
# )
|
||||
# orientation.create_dataset(name="transpose", data=1)
|
||||
# orientation.create_dataset(name="rot90", data=3)
|
||||
# data = eiger9m.create_ext_link(
|
||||
# "data", self.file_references["eiger9m"]["path"], "EG9M/data"
|
||||
# )
|
||||
# status = eiger9m.create_ext_link(
|
||||
# "status", self.file_references["eiger9m"]["path"], "EG9M/status"
|
||||
# )
|
||||
# ── Detector translation ───────────────────────────────────────────────
|
||||
self._safe_dataset(
|
||||
instrument, "detector_translation_x", "dettrx",
|
||||
units="mm", description="Detector x-translation stage",
|
||||
)
|
||||
|
||||
# if (
|
||||
# "pilatus_2" in self.device_manager.devices
|
||||
# and self.device_manager.devices.pilatus_2.enabled
|
||||
# and "pilatus_2" in self.file_references
|
||||
# ):
|
||||
# pilatus_2 = instrument.create_group("pilatus_2")
|
||||
# pilatus_2.attrs["NX_class"] = "NXdetector"
|
||||
# x_pixel_size = pilatus_2.create_dataset(name="x_pixel_size", data=172)
|
||||
# x_pixel_size.attrs["units"] = "um"
|
||||
# y_pixel_size = pilatus_2.create_dataset(name="y_pixel_size", data=172)
|
||||
# y_pixel_size.attrs["units"] = "um"
|
||||
# polar_angle = pilatus_2.create_dataset(name="polar_angle", data=0)
|
||||
# polar_angle.attrs["units"] = "degrees"
|
||||
# azimuthal_angle = pilatus_2.create_dataset(name="azimuthal_angle", data=0)
|
||||
# azimuthal_angle.attrs["units"] = "degrees"
|
||||
# rotation_angle = pilatus_2.create_dataset(name="rotation_angle", data=0)
|
||||
# rotation_angle.attrs["units"] = "degrees"
|
||||
# description = pilatus_2.create_dataset(
|
||||
# name="description", data="Pilatus 300K detector, Dectris, Switzerland"
|
||||
# )
|
||||
# orientation = pilatus_2.create_group("orientation")
|
||||
# orientation.attrs["description"] = (
|
||||
# "Orientation defines the number of counterclockwise rotations by 90 deg followed by a transposition to reach the 'cameraman orientation', that is looking towards the beam."
|
||||
# )
|
||||
# orientation.create_dataset(name="transpose", data=1)
|
||||
# orientation.create_dataset(name="rot90", data=2)
|
||||
# data = pilatus_2.create_ext_link(
|
||||
# "data", self.file_references["pilatus_2"]["path"], "entry/instrument/pilatus_2/data"
|
||||
# )
|
||||
# ── Eiger 1.5M detector ───────────────────────────────────────────────
|
||||
if (
|
||||
"eiger_1_5" in self.device_manager.devices
|
||||
and self.device_manager.devices.eiger_1_5.enabled
|
||||
and "eiger_1_5" in self.file_references
|
||||
):
|
||||
eiger = instrument.create_group("eiger_1_5")
|
||||
eiger.attrs["NX_class"] = "NXdetector"
|
||||
eiger.create_dataset("x_pixel_size", data=75.0).attrs["units"] = "um"
|
||||
eiger.create_dataset("y_pixel_size", data=75.0).attrs["units"] = "um"
|
||||
eiger.create_dataset("polar_angle", data=0.0).attrs["units"] = "degrees"
|
||||
eiger.create_dataset("azimuthal_angle", data=0.0).attrs["units"] = "degrees"
|
||||
eiger.create_dataset("rotation_angle", data=0.0).attrs["units"] = "degrees"
|
||||
eiger.create_dataset(
|
||||
"description",
|
||||
data="Eiger 1.5M detector, in-house developed, Paul Scherrer Institute",
|
||||
)
|
||||
eiger.create_dataset(
|
||||
"type",
|
||||
data="Single-photon counting detector, 320 micron-thick Si chip",
|
||||
)
|
||||
orientation = eiger.create_group("orientation")
|
||||
orientation.attrs["description"] = (
|
||||
"Orientation defines the number of counterclockwise rotations by 90 deg "
|
||||
"followed by a transposition to reach the 'cameraman orientation', "
|
||||
"looking towards the beam."
|
||||
)
|
||||
orientation.create_dataset("transpose", data=1)
|
||||
orientation.create_dataset("rot90", data=3)
|
||||
# Soft-link recorded frame data from the BEC collection
|
||||
try:
|
||||
for k in self.file_references["eiger_1_5"].hinted_h5_entries.keys():
|
||||
self._safe_soft_link(eiger, k, f"{EIGER_COLL}/{k}")
|
||||
except Exception:
|
||||
pass
|
||||
# External link to pixel mask in the Eiger master file
|
||||
try:
|
||||
eiger.create_ext_link(
|
||||
"pixel_mask",
|
||||
self.file_references["eiger_1_5"].file_path,
|
||||
"/entry/instrument/detector/pixel_mask",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# if (
|
||||
# "falcon" in self.device_manager.devices
|
||||
# and self.device_manager.devices.falcon.enabled
|
||||
# and "falcon" in self.file_references
|
||||
# ):
|
||||
# falcon = instrument.create_ext_link(
|
||||
# "falcon", self.file_references["falcon"]["path"], "entry/instrument/FalconX1"
|
||||
# )
|
||||
# ── MCS (multi-channel scaler) ─────────────────────────────────────────
|
||||
if (
|
||||
"mcs" in self.device_manager.devices
|
||||
and self.device_manager.devices.mcs.enabled
|
||||
):
|
||||
mcs_group = instrument.create_group("mcs")
|
||||
mcs_group.attrs["NX_class"] = "NXdetector"
|
||||
mcs_group.attrs["description"] = "MCS card cSAXS — multi-channel scaler"
|
||||
self._safe_soft_link(mcs_group, "data", "/entry/collection/devices/mcs")
|
||||
|
||||
# ── flOMNI ────────────────────────────────────────────────────────────
|
||||
# flomni_samples is used as the sentinel for the entire flOMNI setup.
|
||||
# If it is absent from the device_manager (removed from config) the
|
||||
# whole group is omitted. Individual datasets inside are still each
|
||||
# guarded by _safe_dataset / _safe_soft_link in case a specific motor
|
||||
# is temporarily disabled without removing the full setup.
|
||||
if "flomni_samples" in self.device_manager.devices:
|
||||
flomni = instrument.create_group("flOMNI")
|
||||
flomni.attrs["NX_class"] = "NXpositioner"
|
||||
flomni.attrs["description"] = "flOMNI flexible tOMography Nano Imaging"
|
||||
|
||||
# Galil motors — coarse sample stage
|
||||
self._safe_dataset(flomni, "fsamx", "fsamx", units="mm", description="Sample coarse X")
|
||||
self._safe_dataset(flomni, "fsamy", "fsamy", units="mm", description="Sample coarse Y")
|
||||
self._safe_dataset(flomni, "fsamroy", "fsamroy", units="degrees", description="Sample rotation")
|
||||
|
||||
# Galil motors — sample transfer / tray
|
||||
self._safe_dataset(flomni, "ftransx", "ftransx", units="mm", description="Sample transfer X")
|
||||
self._safe_dataset(flomni, "ftransy", "ftransy", units="mm", description="Sample transfer Y")
|
||||
self._safe_dataset(flomni, "ftransz", "ftransz", units="mm", description="Sample transfer Z")
|
||||
self._safe_dataset(flomni, "ftray", "ftray", units="mm", description="Sample transfer tray")
|
||||
|
||||
# Galil motors — laser tracker
|
||||
self._safe_dataset(flomni, "ftracky", "ftracky", units="mm", description="Laser tracker coarse Y")
|
||||
self._safe_dataset(flomni, "ftrackz", "ftrackz", units="mm", description="Laser tracker coarse Z")
|
||||
|
||||
# Galil motors — X-ray eye
|
||||
self._safe_dataset(flomni, "feyex", "feyex", units="mm", description="X-ray eye X")
|
||||
self._safe_dataset(flomni, "feyey", "feyey", units="mm", description="X-ray eye Y")
|
||||
|
||||
# Galil motors — optics (zone plate)
|
||||
self._safe_dataset(flomni, "foptx", "foptx", units="mm", description="Optics X")
|
||||
self._safe_dataset(flomni, "fopty", "fopty", units="mm", description="Optics Y")
|
||||
self._safe_dataset(flomni, "foptz", "foptz", units="mm", description="Optics Z")
|
||||
|
||||
# Galil motor — heater
|
||||
self._safe_dataset(flomni, "fheater", "fheater", units="mm", description="Heater Y")
|
||||
|
||||
# Smaract motors — OSA (order-sorting aperture)
|
||||
self._safe_dataset(flomni, "fosax", "fosax", units="mm", description="OSA X")
|
||||
self._safe_dataset(flomni, "fosay", "fosay", units="mm", description="OSA Y")
|
||||
self._safe_dataset(flomni, "fosaz", "fosaz", units="mm", description="OSA Z")
|
||||
|
||||
# Temperature and humidity sensor (soft link to BEC collection entry)
|
||||
self._safe_soft_link(
|
||||
flomni, "flomni_temphum",
|
||||
"/entry/collection/devices/flomni_temphum",
|
||||
)
|
||||
|
||||
# Real-time encoder positions (RtFlomniFlyer)
|
||||
# Single soft link to the entire rt_positions folder in the BEC
|
||||
# collection. This is the primary scan coordinate for ptychography.
|
||||
self._safe_soft_link(
|
||||
flomni, "rt_positions",
|
||||
"/entry/collection/devices/rt_positions",
|
||||
)
|
||||
|
||||
69
tests/tests_devices/test_omny_shutter.py
Normal file
69
tests/tests_devices/test_omny_shutter.py
Normal file
@@ -0,0 +1,69 @@
|
||||
import pytest
|
||||
from bec_server.device_server.tests.utils import DMMock
|
||||
|
||||
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
|
||||
|
||||
|
||||
@pytest.mark.parametrize("auto_monitor", [False, True])
|
||||
def test_monitor_signal_stores_auto_monitor(auto_monitor):
|
||||
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
|
||||
|
||||
assert signal.auto_monitor is auto_monitor
|
||||
|
||||
|
||||
def test_monitor_signal_put_propagates_value_to_readback_callback():
|
||||
signal = MonitorSignal(name="signal", auto_monitor=True)
|
||||
initial_value = signal.read()[signal.name]["value"]
|
||||
callback_values = []
|
||||
callback_reads = []
|
||||
|
||||
def _test_cb(value, old_value, **kwargs):
|
||||
callback_values.append((value, old_value))
|
||||
callback_reads.append(kwargs["obj"].read())
|
||||
|
||||
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
|
||||
|
||||
signal.put(1)
|
||||
|
||||
assert callback_values == [(1, initial_value)]
|
||||
assert len(callback_reads) == 1
|
||||
assert callback_reads[0][signal.name]["value"] == 1
|
||||
assert signal.read()[signal.name]["value"] == 1
|
||||
|
||||
signal.put(0)
|
||||
assert callback_values == [(1, initial_value), (0, 1)]
|
||||
assert len(callback_reads) == 2
|
||||
assert callback_reads[1][signal.name]["value"] == 0
|
||||
assert signal.read()[signal.name]["value"] == 0
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def omny_fast_shutter():
|
||||
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
|
||||
|
||||
try:
|
||||
yield shutter
|
||||
finally:
|
||||
shutter.destroy()
|
||||
|
||||
|
||||
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
|
||||
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
|
||||
assert omny_fast_shutter.shutter.auto_monitor is True
|
||||
|
||||
|
||||
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
|
||||
signal_name = omny_fast_shutter.shutter.name
|
||||
callback_reads = []
|
||||
|
||||
def _test_cb(**kwargs):
|
||||
callback_reads.append(omny_fast_shutter.read())
|
||||
|
||||
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
|
||||
|
||||
omny_fast_shutter.shutter.put(1)
|
||||
|
||||
assert len(callback_reads) == 1
|
||||
assert callback_reads[0][signal_name]["value"] == 1
|
||||
assert omny_fast_shutter.read()[signal_name]["value"] == 1
|
||||
assert omny_fast_shutter.fshstatus() == 1
|
||||
241
tests/tests_devices/test_pseudo_devices.py
Normal file
241
tests/tests_devices/test_pseudo_devices.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""Module to test the pseudo_device module."""
|
||||
|
||||
import pytest
|
||||
from bec_lib.atlas_models import Device
|
||||
from ophyd_devices.sim.sim_signals import SetableSignal
|
||||
|
||||
from csaxs_bec.devices.pseudo_devices.bpm import BPM
|
||||
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_dm(dm_with_devices):
|
||||
# Patch missing current_session attribute in the device manager
|
||||
dm = dm_with_devices
|
||||
setattr(dm, "current_session", dm._session)
|
||||
#
|
||||
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
|
||||
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
|
||||
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
|
||||
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
|
||||
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
|
||||
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
|
||||
dev_cfg = Device(
|
||||
name=signal.name,
|
||||
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
)
|
||||
dm._session["devices"].append(dev_cfg.model_dump())
|
||||
dm.devices._add_device(signal.name, signal)
|
||||
return dm
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bpm_control(patched_dm):
|
||||
name = "bpm_control"
|
||||
control_config = Device(
|
||||
name=name,
|
||||
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
deviceConfig={
|
||||
"gain_lsb": "gain_lsb",
|
||||
"gain_mid": "gain_mid",
|
||||
"gain_msb": "gain_msb",
|
||||
"coupling": "coupling",
|
||||
"speed_mode": "speed_mode",
|
||||
},
|
||||
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
|
||||
)
|
||||
patched_dm._session["devices"].append(control_config.model_dump())
|
||||
try:
|
||||
control = BPMControl(
|
||||
name=name,
|
||||
gain_lsb="gain_lsb",
|
||||
gain_mid="gain_mid",
|
||||
gain_msb="gain_msb",
|
||||
coupling="coupling",
|
||||
speed_mode="speed_mode",
|
||||
device_manager=patched_dm,
|
||||
)
|
||||
patched_dm.devices._add_device(control.name, control)
|
||||
control.wait_for_connection()
|
||||
yield control
|
||||
finally:
|
||||
control.destroy()
|
||||
|
||||
|
||||
def test_bpm_control_set_gain(bpm_control):
|
||||
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
|
||||
gain_mid = bpm_control.device_manager.devices["gain_mid"]
|
||||
gain_msb = bpm_control.device_manager.devices["gain_msb"]
|
||||
coupling = bpm_control.device_manager.devices["coupling"]
|
||||
speed_mode = bpm_control.device_manager.devices["speed_mode"]
|
||||
gain_lsb.put(0)
|
||||
gain_mid.put(0)
|
||||
gain_msb.put(0)
|
||||
coupling.put(0)
|
||||
speed_mode.put(1)
|
||||
|
||||
gain = bpm_control.gain.get()
|
||||
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
|
||||
|
||||
gain_val = 10000000
|
||||
bpm_control.set_gain(gain_val)
|
||||
assert _GAIN_TO_BITS.get(gain_val, ()) == (
|
||||
gain_msb.get(),
|
||||
gain_mid.get(),
|
||||
gain_lsb.get(),
|
||||
speed_mode.get(),
|
||||
)
|
||||
|
||||
gain_val = 100000000000
|
||||
bpm_control.set_gain(gain_val)
|
||||
assert _GAIN_TO_BITS.get(gain_val, ()) == (
|
||||
gain_msb.get(),
|
||||
gain_mid.get(),
|
||||
gain_lsb.get(),
|
||||
speed_mode.get(),
|
||||
)
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bpm_control.set_gain(1005.0)
|
||||
|
||||
|
||||
def test_bpm_control_set_coupling(bpm_control):
|
||||
coupling = bpm_control.device_manager.devices["coupling"]
|
||||
coupling.put(0)
|
||||
|
||||
bpm_control.coupling.get() == "AC"
|
||||
coupling.put(1)
|
||||
bpm_control.coupling.get() == "DC"
|
||||
|
||||
bpm_control.set_coupling("AC")
|
||||
assert coupling.get() == 0
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
bpm_control.set_coupling("wrong")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patched_dm_bpm(dm_with_devices):
|
||||
# Patch missing current_session attribute in the device manager
|
||||
dm = dm_with_devices
|
||||
setattr(dm, "current_session", dm._session)
|
||||
#
|
||||
left_top = SetableSignal(name="left_top", value=0, kind="config")
|
||||
right_top = SetableSignal(name="right_top", value=0, kind="config")
|
||||
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
|
||||
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
|
||||
for signal in [left_top, right_top, right_bot, left_bot]:
|
||||
|
||||
dev_cfg = Device(
|
||||
name=signal.name,
|
||||
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
)
|
||||
dm._session["devices"].append(dev_cfg.model_dump())
|
||||
dm.devices._add_device(signal.name, signal)
|
||||
return dm
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bpm(patched_dm_bpm):
|
||||
name = "bpm"
|
||||
bpm_config = Device(
|
||||
name=name,
|
||||
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
|
||||
enabled=True,
|
||||
readoutPriority="baseline",
|
||||
deviceConfig={
|
||||
"left_top": "left_top",
|
||||
"right_top": "right_top",
|
||||
"right_bot": "right_bot",
|
||||
"left_bot": "left_bot",
|
||||
},
|
||||
needs=["left_top", "right_top", "right_bot", "left_bot"],
|
||||
)
|
||||
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
|
||||
try:
|
||||
bpm = BPM(
|
||||
name=name,
|
||||
left_top="left_top",
|
||||
right_top="right_top",
|
||||
right_bot="right_bot",
|
||||
left_bot="left_bot",
|
||||
device_manager=patched_dm_bpm,
|
||||
)
|
||||
patched_dm_bpm.devices._add_device(bpm.name, bpm)
|
||||
bpm.wait_for_connection()
|
||||
yield bpm
|
||||
finally:
|
||||
bpm.destroy()
|
||||
|
||||
|
||||
def test_bpm_positions(bpm):
|
||||
left_top = bpm.device_manager.devices["left_top"]
|
||||
right_top = bpm.device_manager.devices["right_top"]
|
||||
right_bot = bpm.device_manager.devices["right_bot"]
|
||||
left_bot = bpm.device_manager.devices["left_bot"]
|
||||
|
||||
# Test center position
|
||||
for signal in [left_top, right_top, right_bot, left_bot]:
|
||||
signal.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
|
||||
# Test fully left
|
||||
left_top.put(1)
|
||||
right_top.put(0)
|
||||
right_bot.put(0)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == -1
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 0
|
||||
assert bpm.intensity.get() == 2
|
||||
|
||||
# Test fully right
|
||||
left_top.put(0)
|
||||
right_top.put(1)
|
||||
right_bot.put(1)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 1
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Test fully top
|
||||
left_top.put(1)
|
||||
right_top.put(1)
|
||||
right_bot.put(0)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 1
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Test fully bottom
|
||||
left_top.put(0)
|
||||
right_top.put(0)
|
||||
right_bot.put(1)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == -1
|
||||
assert bpm.diagonal.get() == 0
|
||||
|
||||
# Diagonal beam
|
||||
left_top.put(1)
|
||||
right_top.put(0)
|
||||
right_bot.put(1)
|
||||
left_bot.put(0)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == -1
|
||||
|
||||
left_top.put(0)
|
||||
right_top.put(1)
|
||||
right_bot.put(0)
|
||||
left_bot.put(1)
|
||||
assert bpm.pos_x.get() == 0
|
||||
assert bpm.pos_y.get() == 0
|
||||
assert bpm.diagonal.get() == 1
|
||||
Reference in New Issue
Block a user