Files
debye_bec/bec_plugins/services/NIDAQ_writer/scan_status.py
2023-12-07 16:17:59 +01:00

43 lines
1.3 KiB
Python

from bec_lib import MessageEndpoints, RedisConnector, messages
def send_scan_status(scan_number, status):
if status == "start":
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status="open", info={"scan_number": scan_number}
)
elif status == "stop":
scan_status_msg = messages.ScanStatusMessage(
scanID="test", status="closed", info={"scan_number": scan_number}
)
else:
raise ValueError("Unknown status")
producer = RedisConnector(["localhost:6379"]).producer()
producer.send(MessageEndpoints.scan_status(), scan_status_msg.dumps())
print(f"Sent scan status message {scan_status_msg}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Scan status helper")
command = parser.add_subparsers(dest="command")
start = command.add_parser("start", help="Start a new scan")
start.add_argument(
"--scan_number",
type=int,
required=True,
help="Scan number",
)
stop = command.add_parser("stop", help="Stop the scan")
stop.add_argument(
"--scan_number",
type=int,
required=True,
help="Scan number",
)
args = parser.parse_args()
send_scan_status(args.scan_number, args.command)