Files
Jungfraujoch/python/preview_w_adxv/preview.py

169 lines
6.3 KiB
Python

# Uses DECTRIS Stream2 example
import zmq
import numpy as np
import sys
import subprocess
import os
import signal
import cbor2
import time
from dectris.compression import decompress
from tifffile import imwrite
import argparse
import socket
sys.path.append('/data/visitors/micromax/20231830/tests/sw/adxv_live_view')
import adxvSocketh5 #as jna5
from adxvSocketh5 import adxvSocketh5
TERMINATE = False
def decode_multi_dim_array(tag, column_major):
dimensions, contents = tag.value
if isinstance(contents, list):
array = np.empty((len(contents),), dtype=object)
array[:] = contents
elif isinstance(contents, (np.ndarray, np.generic)):
array = contents
else:
raise cbor2.CBORDecodeValueError("expected array or typed array")
return array.reshape(dimensions, order="F" if column_major else "C")
def decode_typed_array(tag, dtype):
if not isinstance(tag.value, bytes):
raise cbor2.CBORDecodeValueError("expected byte string in typed array")
return np.frombuffer(tag.value, dtype=dtype)
def decode_dectris_compression(tag):
algorithm, elem_size, encoded = tag.value
return decompress(encoded, algorithm, elem_size=elem_size)
def tag_hook(decoder, tag):
tag_decoder = tag_decoders.get(tag.tag)
return tag_decoder(tag) if tag_decoder else tag
tag_decoders = {
40: lambda tag: decode_multi_dim_array(tag, column_major=False),
64: lambda tag: decode_typed_array(tag, dtype="u1"),
65: lambda tag: decode_typed_array(tag, dtype=">u2"),
66: lambda tag: decode_typed_array(tag, dtype=">u4"),
67: lambda tag: decode_typed_array(tag, dtype=">u8"),
68: lambda tag: decode_typed_array(tag, dtype="u1"),
69: lambda tag: decode_typed_array(tag, dtype="<u2"),
70: lambda tag: decode_typed_array(tag, dtype="<u4"),
71: lambda tag: decode_typed_array(tag, dtype="<u8"),
72: lambda tag: decode_typed_array(tag, dtype="i1"),
73: lambda tag: decode_typed_array(tag, dtype=">i2"),
74: lambda tag: decode_typed_array(tag, dtype=">i4"),
75: lambda tag: decode_typed_array(tag, dtype=">i8"),
77: lambda tag: decode_typed_array(tag, dtype="<i2"),
78: lambda tag: decode_typed_array(tag, dtype="<i4"),
79: lambda tag: decode_typed_array(tag, dtype="<i8"),
80: lambda tag: decode_typed_array(tag, dtype=">f2"),
81: lambda tag: decode_typed_array(tag, dtype=">f4"),
82: lambda tag: decode_typed_array(tag, dtype=">f8"),
83: lambda tag: decode_typed_array(tag, dtype=">f16"),
84: lambda tag: decode_typed_array(tag, dtype="<f2"),
85: lambda tag: decode_typed_array(tag, dtype="<f4"),
86: lambda tag: decode_typed_array(tag, dtype="<f8"),
87: lambda tag: decode_typed_array(tag, dtype="<f16"),
1040: lambda tag: decode_multi_dim_array(tag, column_major=True),
56500: lambda tag: decode_dectris_compression(tag),
}
if len(sys.argv) != 2:
print("Please provide address: python preview.py <address>")
sys.exit(1)
address = sys.argv[1]
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.RCVTIMEO = 20000 # in milliseconds
socket.connect(address) #plut in the address here, get address from sys.argv[1]
#socket.connect("tcp://127.0.0.1:5400")
socket.setsockopt(zmq.SUBSCRIBE, b"")
#===========trying to start adxv in the background listening to images=======
#command = "/data/visitors/micromax/20231830/tests/sw/adxv_live_view/start_adxv.sh -p 8100"
#background_process = subprocess.Popen(f"{command} &", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
#try:
# Simulate Ctrl + Z by sending SIGTSTP to the subprocess
# os.kill(background_process.pid, signal.SIGTSTP)
#except KeyboardInterrupt:
# Handle Ctrl + C if needed
# pass
# Optionally, capture the output and error (if needed)
#output, error = background_process.communicate()
# Check if there was an error
#if background_process.returncode != 0:
# print(f"Error occurred: {error.decode('utf-8')}")
#else:
# print("Command started in the background.")
#===========trying to start adxv in the background listening to images=======
adxv = adxvSocketh5(port = 8100, spot_type_file = "/data/visitors/micromax/20231830/tests/sw/adxv_live_view/spot_type.txt")
adxv.init()
print("adxv init...")
spots_file = "noindexed.adx"
spots_file2 = "indexed.adx"
tiff_path = "/data/visitors/micromax/20231830/tests/dawn/preview/test.tif"
while not TERMINATE:
try:
msg = socket.recv()
msg = cbor2.loads(msg, tag_hook=tag_hook)
imwrite('test.tif', msg['data']['default'])
title = msg['series_unique_id']
beam_center_x = msg['beam_center_x']
beam_center_y = msg['beam_center_y']
detector_distance = msg['detector_distance']
energy = msg['incident_energy']
with open('indexed.adx', 'w') as file0, open('noindexed.adx', 'w') as file1:
for i in range(len(msg["spots"])):
index_status=msg["spots"][i]["indexed"]
if index_status:
#print("indexed: ",msg["spots"][i])
# Write i string to the file
x0=int(msg["spots"][i]["x"])
y0=int(msg["spots"][i]["y"])
ind0=float(msg["spots"][i]["I"])
thing_to_write0 = "{} {} {} 1 1 \n".format(x0, y0, ind0)
file0.write(thing_to_write0)
else:
# Write a string to the file
x1=str(msg["spots"][i]["x"])
y1=str(msg["spots"][i]["y"])
ind1=str(msg["spots"][i]["I"])
thing_to_write1 = "{} {} {} 1 1 \n".format(x1, y1, ind1)
file1.write(thing_to_write1)
print('writes')
#print(msg["spots"][1])
#print(msg["spots"][3])
#print(len(msg["spots"]))
adxv.load_tiff(tiff_path)
spots_sets_list=[]
total_spots_num =0
spots = adxv.read_spots_from_file(spots_file)
spots_sets_list.append(spots)
total_spots_num += spots.shape[0]
spots = adxv.read_spots_from_file(spots_file2)
spots_sets_list.append(spots)
total_spots_num += spots.shape[0]
adxv.add_spots(spots_sets_list, total_spots_num)
#print(len(spots_sets_list), total_spots_num)
print("Running try")
except:
print("Running except")
pass