feat: added scan status helper function

This commit is contained in:
2023-07-20 15:31:48 +02:00
parent 8cee246dac
commit 8127ad2381

View File

@@ -0,0 +1,42 @@
from bec_lib.core import BECMessage, MessageEndpoints, RedisConnector
def send_scan_status(scan_number, status):
if status == "start":
scan_status_msg = BECMessage.ScanStatusMessage(
scanID="test", status="open", info={"scan_number": scan_number}
)
elif status == "stop":
scan_status_msg = BECMessage.ScanStatusMessage(
scanID="test", status="closed", info={"scan_number": scan_number}
)
else:
raise ValueError("Unknown status")
producer = RedisConnector(["localhost:6379"]).producer()
producer.send(MessageEndpoints.scan_status(), scan_status_msg.dumps())
print(f"Sent scan status message {scan_status_msg}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Scan status helper")
command = parser.add_subparsers(dest="command")
start = command.add_parser("start", help="Start a new scan")
start.add_argument(
"--scan_number",
type=int,
required=True,
help="Scan number",
)
stop = command.add_parser("stop", help="Stop the scan")
stop.add_argument(
"--scan_number",
type=int,
required=True,
help="Scan number",
)
args = parser.parse_args()
send_scan_status(args.scan_number, args.command)