Compare commits

..

3 Commits

Author SHA1 Message Date
b07ac52371 test: add tests for bpm and bpm_control
All checks were successful
CI for csaxs_bec / test (push) Successful in 1m57s
CI for csaxs_bec / test (pull_request) Successful in 1m58s
2026-03-27 20:07:44 +01:00
9557d98f30 fix(pseudo_devices): fix pseudo devices, bpm and bpm_control 2026-03-27 20:05:43 +01:00
x12sa
fecd4b84a4 feat: Add BPM and BPMControl pseudo devices 2026-03-27 20:05:42 +01:00
16 changed files with 1273 additions and 1242 deletions

View File

@@ -70,7 +70,7 @@ DLPCA200_AMPLIFIER_CONFIG: dict[str, dict] = {
"rio_device": "galilrioesxbox",
"description": "Beam Position Monitor 4 current amplifier",
"channels": {
"gain_lsb": 0, # Pin 10 -> Galil ch0
"gain_lsb": rio_optics.analog_in.ch0, # Pin 10 -> Galil ch0
"gain_mid": 1, # Pin 11 -> Galil ch1
"gain_msb": 2, # Pin 12 -> Galil ch2
"coupling": 3, # Pin 13 -> Galil ch3

View File

@@ -1317,9 +1317,7 @@ class Flomni(
self._webpage_gen = FlomniWebpageGenerator(
bec_client=client,
output_dir="~/data/raw/webpage/",
#upload_url="http://s1090968537.online.de/upload.php", # optional
upload_url="https://v1p0zyg2w9n2k9c1.myfritz.net/upload.php",
local_port=8080
upload_url="http://s1090968537.online.de/upload.php", # optional
)
self._webpage_gen.start()

View File

@@ -5,14 +5,10 @@ Background thread that reads tomo progress from the BEC global variable store
and writes status.json (every cycle) + status.html (once at startup) to a
staging directory. An optional HttpUploader sends those files to a web host
after every cycle, running in a separate daemon thread so uploads never block
the generator cycle. A built-in LocalHttpServer always serves the output
directory locally (default port 8080) so the page can be accessed on the
lab network without any extra setup.
the generator cycle.
Architecture
------------
LocalHttpServer -- built-in HTTP server; serves output_dir on port 8080.
Always started at _launch(); URL printed to console.
HttpUploader -- non-blocking HTTP uploader (fire-and-forget thread).
Tracks file mtimes; only uploads changed files.
Sends a cleanup request to the server when the
@@ -35,24 +31,19 @@ Integration (inside Flomni.__init__, after self._progress_proxy.reset()):
bec_client=client,
output_dir="~/data/raw/webpage/",
upload_url="http://omny.online/upload.php", # optional
local_port=8080, # optional, default 8080
)
self._webpage_gen.start()
# On start(), the console prints:
# ➜ Status page: http://hostname:8080/status.html
Interactive helpers (optional, in the iPython session):
-------------------------------------------------------
flomni._webpage_gen.status() # print current status + local URL
flomni._webpage_gen.status() # print current status
flomni._webpage_gen.verbosity = 2 # VERBOSE: one-line summary per cycle
flomni._webpage_gen.verbosity = 3 # DEBUG: full JSON per cycle
flomni._webpage_gen.stop() # release lock, stop local server
flomni._webpage_gen.stop() # release lock
flomni._webpage_gen.start() # restart after stop()
"""
import datetime
import functools
import http.server
import json
import os
import shutil
@@ -244,16 +235,6 @@ class HttpUploader:
self._uploaded: dict[str, float] = {} # abs path -> mtime at last upload
self._lock = threading.Lock()
self._busy = False # True while an upload thread is running
self._warn_at: dict[str, float] = {} # key -> epoch of last warning
_WARN_COOLDOWN_S = 600 # only repeat the same warning once per minute
def _warn(self, key: str, msg: str) -> None:
"""Log a warning at most once per _WARN_COOLDOWN_S for a given key."""
now = _epoch()
if now - self._warn_at.get(key, 0) >= self._WARN_COOLDOWN_S:
self._warn_at[key] = now
logger.warning(msg)
# ── Public API ──────────────────────────────────────────────────────────
@@ -313,10 +294,8 @@ class HttpUploader:
def _upload_files(self, files: list, force: bool = False) -> None:
try:
import requests as _requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
self._warn("no_requests", "HttpUploader: 'requests' library not installed")
logger.warning("HttpUploader: 'requests' library not installed")
return
for path in files:
@@ -333,26 +312,21 @@ class HttpUploader:
self._url,
files={"file": (path.name, f)},
timeout=self._timeout,
verify=False, # accept self-signed / untrusted certs
)
if r.status_code == 200:
self._uploaded[str(path)] = mtime
self._warn_at.pop(f"upload_{path.name}", None) # clear on success
logger.debug(f"HttpUploader: OK {path.name}")
else:
self._warn(
f"upload_{path.name}",
logger.warning(
f"HttpUploader: {path.name} -> HTTP {r.status_code}: "
f"{r.text[:120]}"
)
except Exception as exc:
self._warn(f"upload_{path.name}", f"HttpUploader: {path.name} failed: {exc}")
logger.warning(f"HttpUploader: {path.name} failed: {exc}")
def _do_cleanup(self) -> None:
try:
import requests as _requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
return
try:
@@ -360,86 +334,15 @@ class HttpUploader:
self._url,
data={"action": "cleanup"},
timeout=self._timeout,
verify=False, # accept self-signed / untrusted certs
)
logger.info(f"HttpUploader cleanup: {r.text[:120]}")
self._warn_at.pop("cleanup", None) # clear on success
# Forget mtime records for ptycho files so they get re-uploaded
with self._lock:
to_remove = [k for k in self._uploaded if "/S" in k or "\\S" in k]
for k in to_remove:
self._uploaded.pop(k, None)
except Exception as exc:
self._warn("cleanup", f"HttpUploader cleanup failed: {exc}")
# ---------------------------------------------------------------------------
# Local HTTP server (serves output_dir over http://hostname:port/)
# ---------------------------------------------------------------------------
class LocalHttpServer:
"""
Serves the generator's output directory over plain HTTP in a daemon thread.
Uses Python's built-in http.server — no extra dependencies.
Request logging is suppressed so the BEC console stays clean.
The server survives stop()/start() cycles: _launch() creates a fresh
instance each time start() is called.
Usage:
srv = LocalHttpServer(output_dir, port=8080)
srv.start()
print(srv.url) # http://hostname:8080/status.html
srv.stop()
"""
def __init__(self, directory: Path, port: int = 8080):
self._directory = Path(directory)
self._port = port
self._server = None
self._thread = None
# ── silence the per-request log lines in the iPython console ──────────
class _QuietHandler(http.server.SimpleHTTPRequestHandler):
def log_message(self, *args):
pass
def handle_error(self, request, client_address):
pass # suppress BrokenPipeError and other per-connection noise
def start(self) -> None:
Handler = functools.partial(
self._QuietHandler,
directory=str(self._directory),
)
try:
self._server = http.server.HTTPServer(("", self._port), Handler)
except OSError as exc:
raise RuntimeError(
f"LocalHttpServer: cannot bind port {self._port}: {exc}"
) from exc
self._thread = threading.Thread(
target=self._server.serve_forever,
name="LocalHttpServer",
daemon=True,
)
self._thread.start()
def stop(self) -> None:
if self._server is not None:
self._server.shutdown() # blocks until serve_forever() returns
self._server = None
def is_alive(self) -> bool:
return self._thread is not None and self._thread.is_alive()
@property
def port(self) -> int:
return self._port
@property
def url(self) -> str:
"""Best-guess URL for printing. Uses the machine's hostname."""
return f"http://{socket.gethostname()}:{self._port}/status.html"
logger.warning(f"HttpUploader cleanup failed: {exc}")
# ---------------------------------------------------------------------------
@@ -460,15 +363,12 @@ class WebpageGeneratorBase:
cycle_interval: float = _CYCLE_INTERVAL_S,
verbosity: int = VERBOSITY_NORMAL,
upload_url: str = None,
local_port: int = 8080,
):
self._bec = bec_client
self._output_dir = Path(output_dir).expanduser().resolve()
self._cycle_interval = cycle_interval
self._verbosity = verbosity
self._uploader = HttpUploader(upload_url) if upload_url else None
self._local_port = local_port
self._local_server = None # created fresh each _launch()
self._thread = None
self._stop_event = threading.Event()
@@ -518,17 +418,6 @@ class WebpageGeneratorBase:
self._copy_logo()
(self._output_dir / "status.html").write_text(_render_html(_PHONE_NUMBERS))
# Start local HTTP server (always on; a fresh instance per _launch).
if self._local_server is not None and self._local_server.is_alive():
self._local_server.stop()
self._local_server = LocalHttpServer(self._output_dir, self._local_port)
try:
self._local_server.start()
local_url_msg = f" local={self._local_server.url}"
except RuntimeError as exc:
local_url_msg = f" local=ERROR({exc})"
self._log(VERBOSITY_NORMAL, str(exc), level="warning")
# Upload static files (html + logo) once at startup
if self._uploader is not None:
self._uploader.upload_dir_async(self._output_dir)
@@ -541,20 +430,13 @@ class WebpageGeneratorBase:
self._log(VERBOSITY_NORMAL,
f"WebpageGenerator started owner={self._owner_id} "
f"output={self._output_dir} interval={self._cycle_interval}s"
+ (f" upload={self._uploader._url}" if self._uploader else " upload=disabled")
+ f"\n ➜ Status page:{local_url_msg}")
+ (f" upload={self._uploader._url}" if self._uploader else " upload=disabled"))
def stop(self) -> None:
"""Stop the generator thread, local HTTP server, and release the singleton lock."""
"""Stop the generator thread and release the singleton lock."""
self._stop_event.set()
if self._thread is not None:
self._thread.join(timeout=self._cycle_interval + 5)
# Always clear the reference so start() gets a clean slate,
# even if the thread did not exit within the join timeout.
self._thread = None
if self._local_server is not None:
self._local_server.stop()
self._local_server = None
self._release_lock()
self._log(VERBOSITY_NORMAL, "WebpageGenerator stopped.")
@@ -571,14 +453,12 @@ class WebpageGeneratorBase:
"""Print a human-readable status summary to the console."""
lock = self._read_lock()
running = self._thread is not None and self._thread.is_alive()
local = self._local_server.url if (self._local_server and self._local_server.is_alive()) else "stopped"
print(
f"WebpageGenerator\n"
f" This session running : {running}\n"
f" Lock owner : {lock.get('owner_id', 'none')}\n"
f" Lock heartbeat : {lock.get('heartbeat', 'never')}\n"
f" Output dir : {self._output_dir}\n"
f" Local URL : {local}\n"
f" Cycle interval : {self._cycle_interval}s\n"
f" Upload URL : {self._uploader._url if self._uploader else 'disabled'}\n"
f" Verbosity : {self._verbosity}\n"
@@ -1365,6 +1245,21 @@ def _render_html(phone_numbers: list) -> str:
}}
.info-item .value {{ font-size: 0.9rem; font-weight: 600; color: var(--text); }}
.bar-wrap {{ grid-column: 1 / -1; display: flex; flex-direction: column; gap: 0.35rem; }}
.bar-label {{
display: flex; justify-content: space-between;
font-family: var(--mono); font-size: 0.6rem; color: var(--text-dim);
letter-spacing: 0.06em; text-transform: uppercase;
}}
.bar-track {{ height: 5px; background: var(--surface2); border-radius: 99px; overflow: hidden; }}
.bar-fill {{
height: 100%;
background: var(--ring-blend);
background: color-mix(in srgb, var(--status-color) 65%, var(--surface2));
border-radius: 99px;
transition: width 0.8s cubic-bezier(.4,0,.2,1), background 0.6s;
}}
/* ── Recon card ── */
.recon-stats {{ display: flex; gap: 2rem; flex-wrap: wrap; }}
.recon-stat {{ display: flex; flex-direction: column; gap: 0.15rem; }}
@@ -1607,7 +1502,10 @@ def _render_html(phone_numbers: list) -> str:
<div class="info-item"><span class="label">ETA</span><span class="value" id="pi-eta">-</span></div>
<div class="info-item"><span class="label">Started</span><span class="value" id="pi-start">-</span></div>
</div>
<div class="bar-wrap">
<div class="bar-label"><span>Sub-tomo progress</span><span id="bar-sub-label">-</span></div>
<div class="bar-track"><div class="bar-fill" id="bar-sub-fill" style="width:0%"></div></div>
</div>
</div>
@@ -1769,80 +1667,90 @@ function initDrag() {{
}}
initDrag();
// ── Audio ─────────────────────────────────────────────────────────────────
// iOS (all browsers on iPhone use WebKit) strict rules:
// 1. AudioContext must be created inside a user gesture handler.
// 2. A real BufferSource must be started SYNCHRONOUSLY in the gesture
// .then() / microtasks run outside the gesture and are rejected.
// 3. ctx.resume() is called fire-and-forget; beeps are delayed 80ms by
// setTimeout so the engine has time to start before nodes are scheduled.
//
// unlockAudio() handles all of this and must be called at the TOP of any
// onclick handler that wants audio — before any other logic.
// iOS (all browsers on iPhone use WebKit) requires:
// 1. AudioContext created inside a user gesture.
// 2. A real (even silent) BufferSource started synchronously in the gesture.
// 3. ctx.resume() awaited before scheduling audible nodes.
// We combine all three: silent unlock buffer + resume promise + .then(beeps).
let audioCtx=null, audioEnabled=false;
let audioArmed=false, warningActive=false, warningTimer=null, lastStatus=null;
let staleActive=false, staleTimer=null, staleConfirmed=false;
let audioCtx = null, audioEnabled = false;
let audioArmed = false, warningActive = false, warningTimer = null, lastStatus = null;
let staleActive = false, staleTimer = null, staleConfirmed = false;
function getCtx(){{
if(!audioCtx) audioCtx=new(window.AudioContext||window.webkitAudioContext)();
function getCtx() {{
if (!audioCtx) audioCtx = new (window.AudioContext || window.webkitAudioContext)();
return audioCtx;
}}
function unlockAudio(){{
// Synchronous silent 1-sample buffer — the only reliable iOS unlock.
// Must be called synchronously at the start of a user gesture handler.
const ctx=getCtx();
const buf=ctx.createBuffer(1,1,ctx.sampleRate);
const src=ctx.createBufferSource();
src.buffer=buf; src.connect(ctx.destination); src.start(0);
if(ctx.state==='suspended') ctx.resume(); // fire-and-forget
function unlockAudio() {{
// Must be called synchronously inside a user gesture handler.
// Plays a 1-sample silent buffer — the most reliable iOS unlock method.
const ctx = getCtx();
const buf = ctx.createBuffer(1, 1, ctx.sampleRate);
const src = ctx.createBufferSource();
src.buffer = buf;
src.connect(ctx.destination);
src.start(0);
// resume() is async; return the promise so callers can chain.
return ctx.state === 'suspended' ? ctx.resume() : Promise.resolve();
}}
function beep(freq,dur,vol){{
try{{
const ctx=getCtx(),o=ctx.createOscillator(),g=ctx.createGain();
function beep(freq, dur, vol) {{
try {{
const ctx = getCtx();
const o = ctx.createOscillator();
const g = ctx.createGain();
o.connect(g); g.connect(ctx.destination);
o.type='sine'; o.frequency.setValueAtTime(freq,ctx.currentTime);
g.gain.setValueAtTime(vol,ctx.currentTime);
g.gain.exponentialRampToValueAtTime(0.001,ctx.currentTime+dur);
o.start(); o.stop(ctx.currentTime+dur);
}}catch(e){{console.warn('Audio:',e);}}
o.type = 'sine';
o.frequency.setValueAtTime(freq, ctx.currentTime);
g.gain.setValueAtTime(vol, ctx.currentTime);
g.gain.exponentialRampToValueAtTime(0.001, ctx.currentTime + dur);
o.start(); o.stop(ctx.currentTime + dur);
}} catch(e) {{ console.warn('Audio beep:', e); }}
}}
function warningChime(){{
beep(660,0.3,0.4); setTimeout(()=>beep(440,0.5,0.4),350);
}}
function staleChime(){{
beep(1200,0.12,0.35);
setTimeout(()=>beep(1200,0.12,0.35),180);
setTimeout(()=>beep(1200,0.25,0.35),360);
function warningChime() {{
beep(660, 0.3, 0.4);
setTimeout(() => beep(440, 0.5, 0.4), 350);
}}
function testSound(){{
// Gesture handler — unlock first, then delay beeps 80ms for resume().
unlockAudio();
setTimeout(()=>beep(880, 0.15,0.4), 80);
setTimeout(()=>beep(1100,0.15,0.4),260);
setTimeout(()=>beep(880, 0.3, 0.4),440);
function staleChime() {{
beep(1200, 0.12, 0.35);
setTimeout(() => beep(1200, 0.12, 0.35), 180);
setTimeout(() => beep(1200, 0.25, 0.35), 360);
}}
function toggleAudio(){{
// Gesture handler — unlock first (synchronous), then do logic.
unlockAudio();
audioEnabled=!audioEnabled;
localStorage.setItem('audioEnabled',audioEnabled);
if(!audioEnabled){{
stopWarning(); audioArmed=false; warningActive=false;
document.getElementById('btn-confirm').style.display='none';
stopStaleWarning(); staleActive=false; staleConfirmed=false;
document.getElementById('btn-confirm-stale').style.display='none';
}} else {{
if(lastStatus==='scanning' && !audioArmed) audioArmed=true;
}}
updateAudioUI();
function testSound() {{
// Called directly from onclick — gesture is active here.
unlockAudio().then(() => {{
beep(880, 0.15, 0.4);
setTimeout(() => beep(1100, 0.15, 0.4), 180);
setTimeout(() => beep(880, 0.3, 0.4), 360);
}});
}}
function toggleAudio() {{
// Called directly from onclick — gesture is active here.
unlockAudio().then(() => {{
audioEnabled = !audioEnabled;
localStorage.setItem('audioEnabled', audioEnabled);
if (!audioEnabled) {{
stopWarning();
audioArmed = false; warningActive = false;
document.getElementById('btn-confirm').style.display = 'none';
stopStaleWarning();
staleActive = false; staleConfirmed = false;
document.getElementById('btn-confirm-stale').style.display = 'none';
}} else {{
if (lastStatus === 'scanning' && !audioArmed) audioArmed = true;
}}
updateAudioUI();
}});
}}
function confirmWarning(){{
stopWarning();
warningActive=false;
@@ -1851,10 +1759,9 @@ function confirmWarning(){{
}}
function startWarning(){{
// Not a gesture handler — context already unlocked by Enable button click.
if(warningActive) return;
warningActive=true;
if(audioEnabled) warningChime();
if(audioEnabled) warningChime(); // returns promise; chime plays after unlock
warningTimer=setInterval(()=>{{ if(audioEnabled) warningChime(); }},30000);
document.getElementById('btn-confirm').style.display='inline-block';
updateAudioUI();
@@ -1873,10 +1780,9 @@ function confirmStale(){{
}}
function startStaleWarning(){{
// Not a gesture handler — context already unlocked by Enable button click.
if(staleActive || staleConfirmed) return;
staleActive=true;
if(audioEnabled) staleChime();
if(audioEnabled) staleChime(); // returns promise; chime plays after unlock
staleTimer=setInterval(()=>{{ if(audioEnabled) staleChime(); }},30000);
document.getElementById('btn-confirm-stale').style.display='inline-block';
updateAudioUI();
@@ -2058,7 +1964,8 @@ function render(d){{
document.getElementById('pi-type').textContent=p.tomo_type||'-';
document.getElementById('pi-eta').textContent=p.estimated_remaining_human||'-';
document.getElementById('pi-start').textContent=fmtTime(p.tomo_start_time);
document.getElementById('bar-sub-label').textContent=(p.subtomo_projection||0)+' / '+(p.subtomo_total_projections||0);
document.getElementById('bar-sub-fill').style.width=(sPct*100).toFixed(1)+'%';
if(d.recon){{
document.getElementById('recon-waiting').textContent=d.recon.waiting;
const fv=document.getElementById('recon-failed');
@@ -2083,7 +1990,7 @@ function render(d){{
async function poll(){{
try{{
const r=await fetch(STATUS_JSON, {{cache:'no-store'}});
const r=await fetch(STATUS_JSON+'?t='+Date.now());
if(!r.ok) throw new Error('HTTP '+r.status);
render(await r.json());
}}catch(e){{
@@ -2104,4 +2011,4 @@ poll();
</script>
</body>
</html>
"""
"""

View File

@@ -544,6 +544,66 @@ sl5trxt:
# bl_smar_stage to use csaxs reference method. assign number according to axis channel
bl_smar_stage: 5
sl5ch:
description: ESbox1 slit 5 center horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5wh:
description: ESbox1 slit 5 width horizontal
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxi
right_slit: sl5trxo
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxi
- sl5trxo
sl5cv:
description: ESbox1 slit 5 center vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitCenter
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
sl5wv:
description: ESbox1 slit 5 width vertical
deviceClass: ophyd_devices.devices.virtual_slit.VirtualSlitWidth
deviceConfig:
left_slit: sl5trxb
right_slit: sl5trxt
offset: 0
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
needs:
- sl5trxb
- sl5trxt
xbimtrx:
description: ESbox2 beam intensity monitor x movement
deviceClass: csaxs_bec.devices.smaract.smaract_ophyd.SmaractMotor
@@ -822,4 +882,37 @@ dettrx:
onFailure: retry
enabled: true
readoutPriority: baseline
softwareTrigger: false
softwareTrigger: false
####################
### Beamstop control for flight tube
####################
beamstop_control:
description: Gain control for beamstop flightube
deviceClass: csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl
deviceConfig:
gain_lsb: galilrioesft.digital_out.ch0 # Pin 10 -> Galil ch0
gain_mid: galilrioesft.digital_out.ch1 # Pin 11 -> Galil ch1
gain_msb: galilrioesft.digital_out.ch2 # Pin 12 -> Galil ch2
coupling: galilrioesft.digital_out.ch3 # Pin 13 -> Galil ch3
speed_mode: galilrioesft.digital_out.ch4 # Pin 14 -> Galil ch4
enabled: true
readoutPriority: baseline
onFailure: retry
needs:
- galilrioesft
galilrioesft:
description: Galil RIO for remote gain switching and slow reading FlightTube
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: retry
readOnly: false
readoutPriority: baseline
connectionTimeout: 20

View File

@@ -199,6 +199,25 @@ xbpm1c4:
readOnly: true
softwareTrigger: false
bpm1:
description: 'XBPM1 (frontend)'
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
left_top: xbpm1c1
right_top: xbpm1c2
right_bot: xbpm1c3
left_bot: xbpm1c4
onFailure: raise
enabled: true
readoutPriority: monitored
readOnly: true
softwareTrigger: false
needs:
- xbpm1c1
- xbpm1c2
- xbpm1c3
- xbpm1c4
############################################
######### End of xbpm sub devices ##########
############################################

View File

@@ -68,18 +68,22 @@ ccmx:
- cSAXS
- optics
# ccm_energy:
# readoutPriority: baseline
# deviceClass: ophyd_devices.devices.simple_positioner.PSIPositionerBase
# prefix: "X12SA-OP-CCM1:"
# override_suffixes:
# user_readback: "ENERGY-GET"
# user_setpoint: "ENERGY-SET"
# velocity: "ROTY:VELO"
# deviceTags:
# - user motors
# enabled: true
# readOnly: false
# TO BE REVIEWED, REMOVE VELOCITY WITH NEW CLASS!
ccm_energy:
description: 'test'
deviceClass: ophyd_devices.devices.simple_positioner.PSISimplePositioner
deviceConfig:
prefix: 'X12SA-OP-CCM1:'
override_suffixes:
user_readback: "ENERGY-GET"
user_setpoint: "ENERGY-SET"
velocity: "ROTY.VELO"
motor_done_move: "ROTY.DMOV"
onFailure: buffer
enabled: true
readoutPriority: baseline
readOnly: false
softwareTrigger: false

View File

@@ -0,0 +1,24 @@
galilrioesxbox:
description: Galil RIO for remote gain switching and slow reading ES XBox
deviceClass: csaxs_bec.devices.omny.galil.galil_rio.GalilRIO
deviceConfig:
host: galilrioesft.psi.ch
enabled: true
onFailure: raise
readOnly: false
readoutPriority: baseline
connectionTimeout: 20
bpm1:
readoutPriority: baseline
deviceClass: csaxs_bec.devices.pseudo_devices.bpm.BPM
deviceConfig:
blade_t: galilrioesxbox.analog_in.ch0
blade_r: galilrioesxbox.analog_in.ch1
blade_b: galilrioesxbox.analog_in.ch2
blade_l: galilrioesxbox.analog_in.ch3
enabled: true
readOnly: false
softwareTrigger: true
needs:
- galilrioesxbox

View File

@@ -13,14 +13,6 @@ from ophyd_devices import PSIDeviceBase
logger = bec_logger.logger
class MonitorSignal(Signal):
"""A simple wrapper around ophyd Signal that automatically monitors the signal for changes."""
def __init__(self, *, name, auto_monitor=False, **kwargs):
super().__init__(name=name, **kwargs)
self.auto_monitor = auto_monitor
class OMNYFastShutter(PSIDeviceBase, Device):
"""
Fast Shutter control for OMNY setup. If started with at the beamline, it will expose
@@ -34,7 +26,7 @@ class OMNYFastShutter(PSIDeviceBase, Device):
SUB_VALUE = "value"
_default_sub = SUB_VALUE
shutter = Cpt(MonitorSignal, name="shutter", auto_monitor=True)
shutter = Cpt(Signal, name="shutter")
# -----------------------------------------------------
# User-facing shutter control functions
@@ -56,7 +48,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshopen(self):
"""Open the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(1)
return self.device_manager.devices["fsh"].fshopen()
else:
self.shutter.put(1)
@@ -64,7 +55,6 @@ class OMNYFastShutter(PSIDeviceBase, Device):
def fshclose(self):
"""Close the fast shutter."""
if self._check_if_cSAXS_shutter_exists_in_config():
self.shutter.put(0)
return self.device_manager.devices["fsh"].fshclose()
else:
self.shutter.put(0)

View File

@@ -0,0 +1,172 @@
"""Module for a BPM pseudo device that computes the position and intensity from the blade signals."""
from ophyd import Component as Cpt
from ophyd import Kind, Signal
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
class BPM(PSIPseudoDeviceBase):
"""BPM positioner pseudo device."""
# Blade signals, a,b,c,d
left_top = Cpt(
BECProcessedSignal,
name="left_top",
model_config=None,
kind=Kind.config,
doc="BPM left_top blade",
)
right_top = Cpt(
BECProcessedSignal,
name="right_top",
model_config=None,
kind=Kind.config,
doc="BPM right_top blade",
)
right_bot = Cpt(
BECProcessedSignal,
name="right_bot",
model_config=None,
kind=Kind.config,
doc="BPM right_bottom blade",
)
left_bot = Cpt(
BECProcessedSignal,
name="left_bot",
model_config=None,
kind=Kind.config,
doc="BPM left_bot blade",
)
# Virtual signals
pos_x = Cpt(
BECProcessedSignal,
name="pos_x",
model_config=None,
kind=Kind.config,
doc="BPM X position, -1 fully left, 1 fully right",
)
pos_y = Cpt(
BECProcessedSignal,
name="pos_y",
model_config=None,
kind=Kind.config,
doc="BPM Y position, -1 fully bottom, 1 fully top",
)
diagonal = Cpt(
BECProcessedSignal,
name="diagonal",
model_config=None,
kind=Kind.config,
doc="BPM diagonal, -1 fully diagonal left_top-right_bot, 1 fully diagonal right_top-left_bot",
)
intensity = Cpt(
BECProcessedSignal,
name="intensity",
model_config=None,
kind=Kind.config,
doc="BPM intensity",
)
def __init__(
self,
name,
left_top: str,
right_top: str,
right_bot: str,
left_bot: str,
device_manager=None,
scan_info=None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# Get all blade signal objects from utility method
signal_t = self.left_top.get_device_object_from_bec(
object_name=left_top, signal_name=self.name, device_manager=device_manager
)
signal_r = self.right_top.get_device_object_from_bec(
object_name=right_top, signal_name=self.name, device_manager=device_manager
)
signal_b = self.right_bot.get_device_object_from_bec(
object_name=right_bot, signal_name=self.name, device_manager=device_manager
)
signal_l = self.left_bot.get_device_object_from_bec(
object_name=left_bot, signal_name=self.name, device_manager=device_manager
)
# Set compute methods for blade signals and virtual signals
self.left_top.set_compute_method(self._compute_blade_signal, signal=signal_t)
self.right_top.set_compute_method(self._compute_blade_signal, signal=signal_r)
self.right_bot.set_compute_method(self._compute_blade_signal, signal=signal_b)
self.left_bot.set_compute_method(self._compute_blade_signal, signal=signal_l)
self.intensity.set_compute_method(
self._compute_intensity,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
left_bot=self.left_bot,
)
self.pos_x.set_compute_method(
self._compute_pos_x,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.pos_y.set_compute_method(
self._compute_pos_y,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
self.diagonal.set_compute_method(
self._compute_diagonal,
left_bot=self.left_bot,
left_top=self.left_top,
right_top=self.right_top,
right_bot=self.right_bot,
)
def _compute_blade_signal(self, signal: Signal) -> float:
return signal.get()
def _compute_intensity(
self, left_top: Signal, right_top: Signal, right_bot: Signal, left_bot: Signal
) -> float:
intensity = left_top.get() + right_top.get() + right_bot.get() + left_bot.get()
return intensity
def _compute_pos_x(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""X position from -1 to 1, where -1 means beam fully on the left side, 1 means beam fully on the right side."""
sum_left = left_bot.get() + left_top.get()
sum_right = right_top.get() + right_bot.get()
sum_total = sum_left + sum_right
if sum_total == 0:
return 0.0
return (sum_right - sum_left) / sum_total
def _compute_pos_y(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
"""Y position from -1 to 1, where -1 means beam fully on the bottom side, 1 means beam fully on the top side."""
sum_top = left_top.get() + right_top.get()
sum_bot = right_bot.get() + left_bot.get()
sum_total = sum_top + sum_bot
if sum_total == 0:
return 0.0
return (sum_top - sum_bot) / sum_total
def _compute_diagonal(
self, left_bot: Signal, left_top: Signal, right_top: Signal, right_bot: Signal
) -> float:
sum_diag1 = left_bot.get() + right_top.get()
sum_diag2 = left_top.get() + right_bot.get()
sum_total = sum_diag1 + sum_diag2
if sum_total == 0:
return 0.0
return (sum_diag1 - sum_diag2) / sum_total

View File

@@ -0,0 +1,189 @@
"""
Module for controlling the BPM amplifier settings, such as gain and coupling.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Literal
from ophyd import Component as Cpt
from ophyd import Kind
from ophyd_devices.interfaces.base_classes.psi_pseudo_device_base import PSIPseudoDeviceBase
from ophyd_devices.utils.bec_processed_signal import BECProcessedSignal
if TYPE_CHECKING: # pragma: no cover
from bec_lib.devicemanager import ScanInfo
from bec_server.device_server.devices.devicemanager import DeviceManagerDS
from ophyd import Signal
_GAIN_BITS_LOW_NOISE: dict[tuple, int] = {
(0, 0, 0): int(1e3),
(0, 0, 1): int(1e4),
(0, 1, 0): int(1e5),
(0, 1, 1): int(1e6),
(1, 0, 0): int(1e7),
(1, 0, 1): int(1e8),
(1, 1, 0): int(1e9),
}
_GAIN_BITS_HIGH_SPEED: dict[tuple, int] = {
(0, 0, 0): int(1e5),
(0, 0, 1): int(1e6),
(0, 1, 0): int(1e7),
(0, 1, 1): int(1e8),
(1, 0, 0): int(1e9),
(1, 0, 1): int(1e10),
(1, 1, 0): int(1e11),
}
_GAIN_TO_BITS: dict[int, tuple] = {}
for _bits, _gain in _GAIN_BITS_LOW_NOISE.items():
_GAIN_TO_BITS[_gain] = (*_bits, True)
for _bits, _gain in _GAIN_BITS_HIGH_SPEED.items():
if _gain not in _GAIN_TO_BITS: # low-noise takes priority
_GAIN_TO_BITS[_gain] = (*_bits, False)
VALID_GAINS = sorted(_GAIN_TO_BITS.keys())
class BPMControl(PSIPseudoDeviceBase):
"""
BPM amplifier control pseudo device. It is responsible for controlling the
gain and coupling for the BPM amplifier. It relies on signals from a device
in BEC to be available. For cSAXS, these are most liikely to be from the
GalilRIO device that controls the BPM amplifier.
Args:
name (str): Name of the pseudo device.
gain_lsb (str): Name of the signal in BEC that controls the LSB
of the gain setting.
gain_mid (str): Name of the signal in BEC that controls the MID
bit of the gain setting.
gain_msb (str): Name of the signal in BEC that controls the MSB
of the gain setting.
coupling (str): Name of the signal in BEC that controls the coupling
setting.
speed_mode (str): Name of the signal in BEC that controls the speed mode
(low-noise vs high-speed) of the amplifier.
"""
USER_ACCESS = ["set_gain", "set_coupling"]
gain = Cpt(
BECProcessedSignal,
name="gain",
model_config=None,
kind=Kind.config,
doc="Gain of the amplifier",
)
coupling = Cpt(
BECProcessedSignal,
name="coupling",
model_config=None,
kind=Kind.config,
doc="Coupling of the amplifier",
)
speed = Cpt(
BECProcessedSignal,
name="speed",
model_config=None,
kind=Kind.config,
doc="Speed of the amplifier",
)
def __init__(
self,
name: str,
gain_lsb: str,
gain_mid: str,
gain_msb: str,
coupling: str,
speed_mode: str,
device_manager: DeviceManagerDS | None = None,
scan_info: ScanInfo | None = None,
**kwargs,
):
super().__init__(name=name, device_manager=device_manager, scan_info=scan_info, **kwargs)
# First we get all signal objects from BEC using the utility method provided by the BECProcessedSignal class.
self._gain_lsb = self.gain.get_device_object_from_bec(
object_name=gain_lsb, signal_name=self.name, device_manager=device_manager
)
self._gain_mid = self.gain.get_device_object_from_bec(
object_name=gain_mid, signal_name=self.name, device_manager=device_manager
)
self._gain_msb = self.gain.get_device_object_from_bec(
object_name=gain_msb, signal_name=self.name, device_manager=device_manager
)
self._coupling = self.gain.get_device_object_from_bec(
object_name=coupling, signal_name=self.name, device_manager=device_manager
)
self._speed_mode = self.gain.get_device_object_from_bec(
object_name=speed_mode, signal_name=self.name, device_manager=device_manager
)
# Set the compute methods for the virtual signals.
self.gain.set_compute_method(
self._compute_gain,
msb=self._gain_msb,
mid=self._gain_mid,
lsb=self._gain_lsb,
speed_mode=self._speed_mode,
)
self.coupling.set_compute_method(self._compute_coupling, coupling=self._coupling)
self.speed.set_compute_method(self._compute_speed, speed=self._speed_mode)
def set_gain(
self,
gain: Literal[
1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000, 100000000000
],
) -> None:
"""
Set the gain of the amplifier.
Args:
gain (Literal): Must be one of 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000, 10000000000.
"""
gain_int = int(gain)
if gain_int not in VALID_GAINS:
raise ValueError(
f"{self.name} received invalid gain {gain_int}, must be in {VALID_GAINS}"
)
msb, mid, lsb, use_low_noise = _GAIN_TO_BITS[gain_int]
self._gain_msb.set(bool(msb)).wait(timeout=2)
self._gain_lsb.set(bool(lsb)).wait(timeout=2)
self._gain_mid.set(bool(mid)).wait(timeout=2)
self._speed_mode.set(bool(use_low_noise))
def set_coupling(self, coupling: Literal["AC", "DC"]) -> None:
"""
Set the coupling of the amplifier.
Args:
coupling (Literal): Must be either "AC" or "DC".
"""
if coupling not in ["AC", "DC"]:
raise ValueError(
f"{self.name} received invalid coupling value {coupling}, please use 'AC' or 'DC'"
)
self._coupling.set(coupling == "DC").wait(timeout=2)
def _compute_gain(self, msb: Signal, mid: Signal, lsb: Signal, speed_mode: Signal) -> int:
"""Compute the gain based on the bits and speed mode."""
bits = (msb.get(), mid.get(), lsb.get())
speed_mode = speed_mode.get()
if speed_mode:
return _GAIN_BITS_LOW_NOISE.get(bits)
else:
return _GAIN_BITS_HIGH_SPEED.get(bits)
def _compute_coupling(self, coupling: Signal) -> str:
"""Compute the coupling based on the signal."""
return "DC" if coupling.get() else "AC"
def _compute_speed(self, speed: Signal) -> str:
"""Compute the speed based on the signal."""
return "low_speed" if speed.get() else "high_speed"

View File

@@ -0,0 +1 @@
# from ophyd

View File

@@ -1 +1 @@
from .csaxs_nexus import cSAXSNeXusFormat
from .csaxs_nexus import NeXus_format as cSAXS_NeXus_format

File diff suppressed because it is too large Load Diff

View File

@@ -1,69 +0,0 @@
import pytest
from bec_server.device_server.tests.utils import DMMock
from csaxs_bec.devices.omny.shutter import MonitorSignal, OMNYFastShutter
@pytest.mark.parametrize("auto_monitor", [False, True])
def test_monitor_signal_stores_auto_monitor(auto_monitor):
signal = MonitorSignal(name="signal", auto_monitor=auto_monitor)
assert signal.auto_monitor is auto_monitor
def test_monitor_signal_put_propagates_value_to_readback_callback():
signal = MonitorSignal(name="signal", auto_monitor=True)
initial_value = signal.read()[signal.name]["value"]
callback_values = []
callback_reads = []
def _test_cb(value, old_value, **kwargs):
callback_values.append((value, old_value))
callback_reads.append(kwargs["obj"].read())
signal.subscribe(_test_cb, event_type=signal.SUB_VALUE, run=False)
signal.put(1)
assert callback_values == [(1, initial_value)]
assert len(callback_reads) == 1
assert callback_reads[0][signal.name]["value"] == 1
assert signal.read()[signal.name]["value"] == 1
signal.put(0)
assert callback_values == [(1, initial_value), (0, 1)]
assert len(callback_reads) == 2
assert callback_reads[1][signal.name]["value"] == 0
assert signal.read()[signal.name]["value"] == 0
@pytest.fixture
def omny_fast_shutter():
shutter = OMNYFastShutter(name="omny_fast_shutter", device_manager=DMMock())
try:
yield shutter
finally:
shutter.destroy()
def test_omny_fast_shutter_uses_monitor_signal_with_auto_monitor(omny_fast_shutter):
assert isinstance(omny_fast_shutter.shutter, MonitorSignal)
assert omny_fast_shutter.shutter.auto_monitor is True
def test_omny_fast_shutter_propagates_signal_changes_to_device_readback(omny_fast_shutter):
signal_name = omny_fast_shutter.shutter.name
callback_reads = []
def _test_cb(**kwargs):
callback_reads.append(omny_fast_shutter.read())
omny_fast_shutter.shutter.subscribe(_test_cb, event_type=omny_fast_shutter.shutter.SUB_VALUE, run=False)
omny_fast_shutter.shutter.put(1)
assert len(callback_reads) == 1
assert callback_reads[0][signal_name]["value"] == 1
assert omny_fast_shutter.read()[signal_name]["value"] == 1
assert omny_fast_shutter.fshstatus() == 1

View File

@@ -0,0 +1,241 @@
"""Module to test the pseudo_device module."""
import pytest
from bec_lib.atlas_models import Device
from ophyd_devices.sim.sim_signals import SetableSignal
from csaxs_bec.devices.pseudo_devices.bpm import BPM
from csaxs_bec.devices.pseudo_devices.bpm_control import _GAIN_TO_BITS, BPMControl
@pytest.fixture
def patched_dm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
signal_lsb = SetableSignal(name="gain_lsb", value=0, kind="config")
signal_mid = SetableSignal(name="gain_mid", value=0, kind="config")
signal_msb = SetableSignal(name="gain_msb", value=0, kind="config")
signal_coupling = SetableSignal(name="coupling", value=0, kind="config")
signal_speed = SetableSignal(name="speed_mode", value=0, kind="config")
for signal in [signal_lsb, signal_mid, signal_msb, signal_coupling, signal_speed]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm_control(patched_dm):
name = "bpm_control"
control_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm_control.BPMControl",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"gain_lsb": "gain_lsb",
"gain_mid": "gain_mid",
"gain_msb": "gain_msb",
"coupling": "coupling",
"speed_mode": "speed_mode",
},
needs=["gain_lsb", "gain_mid", "gain_msb", "coupling", "speed_mode"],
)
patched_dm._session["devices"].append(control_config.model_dump())
try:
control = BPMControl(
name=name,
gain_lsb="gain_lsb",
gain_mid="gain_mid",
gain_msb="gain_msb",
coupling="coupling",
speed_mode="speed_mode",
device_manager=patched_dm,
)
patched_dm.devices._add_device(control.name, control)
control.wait_for_connection()
yield control
finally:
control.destroy()
def test_bpm_control_set_gain(bpm_control):
gain_lsb = bpm_control.device_manager.devices["gain_lsb"]
gain_mid = bpm_control.device_manager.devices["gain_mid"]
gain_msb = bpm_control.device_manager.devices["gain_msb"]
coupling = bpm_control.device_manager.devices["coupling"]
speed_mode = bpm_control.device_manager.devices["speed_mode"]
gain_lsb.put(0)
gain_mid.put(0)
gain_msb.put(0)
coupling.put(0)
speed_mode.put(1)
gain = bpm_control.gain.get()
assert _GAIN_TO_BITS.get(gain) == (0, 0, 0, speed_mode.get() == 1)
gain_val = 10000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
gain_val = 100000000000
bpm_control.set_gain(gain_val)
assert _GAIN_TO_BITS.get(gain_val, ()) == (
gain_msb.get(),
gain_mid.get(),
gain_lsb.get(),
speed_mode.get(),
)
with pytest.raises(ValueError):
bpm_control.set_gain(1005.0)
def test_bpm_control_set_coupling(bpm_control):
coupling = bpm_control.device_manager.devices["coupling"]
coupling.put(0)
bpm_control.coupling.get() == "AC"
coupling.put(1)
bpm_control.coupling.get() == "DC"
bpm_control.set_coupling("AC")
assert coupling.get() == 0
with pytest.raises(ValueError):
bpm_control.set_coupling("wrong")
@pytest.fixture
def patched_dm_bpm(dm_with_devices):
# Patch missing current_session attribute in the device manager
dm = dm_with_devices
setattr(dm, "current_session", dm._session)
#
left_top = SetableSignal(name="left_top", value=0, kind="config")
right_top = SetableSignal(name="right_top", value=0, kind="config")
right_bot = SetableSignal(name="right_bot", value=0, kind="config")
left_bot = SetableSignal(name="left_bot", value=0, kind="config")
for signal in [left_top, right_top, right_bot, left_bot]:
dev_cfg = Device(
name=signal.name,
deviceClass="ophyd_devices.sim.sim_signals.SetableSignal",
enabled=True,
readoutPriority="baseline",
)
dm._session["devices"].append(dev_cfg.model_dump())
dm.devices._add_device(signal.name, signal)
return dm
@pytest.fixture
def bpm(patched_dm_bpm):
name = "bpm"
bpm_config = Device(
name=name,
deviceClass="csaxs_bec.devices.pseudo_devices.bpm.BPM",
enabled=True,
readoutPriority="baseline",
deviceConfig={
"left_top": "left_top",
"right_top": "right_top",
"right_bot": "right_bot",
"left_bot": "left_bot",
},
needs=["left_top", "right_top", "right_bot", "left_bot"],
)
patched_dm_bpm._session["devices"].append(bpm_config.model_dump())
try:
bpm = BPM(
name=name,
left_top="left_top",
right_top="right_top",
right_bot="right_bot",
left_bot="left_bot",
device_manager=patched_dm_bpm,
)
patched_dm_bpm.devices._add_device(bpm.name, bpm)
bpm.wait_for_connection()
yield bpm
finally:
bpm.destroy()
def test_bpm_positions(bpm):
left_top = bpm.device_manager.devices["left_top"]
right_top = bpm.device_manager.devices["right_top"]
right_bot = bpm.device_manager.devices["right_bot"]
left_bot = bpm.device_manager.devices["left_bot"]
# Test center position
for signal in [left_top, right_top, right_bot, left_bot]:
signal.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
# Test fully left
left_top.put(1)
right_top.put(0)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == -1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
assert bpm.intensity.get() == 2
# Test fully right
left_top.put(0)
right_top.put(1)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 1
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 0
# Test fully top
left_top.put(1)
right_top.put(1)
right_bot.put(0)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 1
assert bpm.diagonal.get() == 0
# Test fully bottom
left_top.put(0)
right_top.put(0)
right_bot.put(1)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == -1
assert bpm.diagonal.get() == 0
# Diagonal beam
left_top.put(1)
right_top.put(0)
right_bot.put(1)
left_bot.put(0)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == -1
left_top.put(0)
right_top.put(1)
right_bot.put(0)
left_bot.put(1)
assert bpm.pos_x.get() == 0
assert bpm.pos_y.get() == 0
assert bpm.diagonal.get() == 1