100 lines
3.3 KiB
Python
100 lines
3.3 KiB
Python
# Uses DECTRIS Stream2 example
|
|
|
|
import zmq
|
|
import numpy as np
|
|
import sys
|
|
|
|
import signal
|
|
import cbor2
|
|
import time
|
|
from dectris.compression import decompress
|
|
from tifffile import imwrite
|
|
|
|
TERMINATE = False
|
|
|
|
|
|
def decode_multi_dim_array(tag, column_major):
|
|
dimensions, contents = tag.value
|
|
if isinstance(contents, list):
|
|
array = np.empty((len(contents),), dtype=object)
|
|
array[:] = contents
|
|
elif isinstance(contents, (np.ndarray, np.generic)):
|
|
array = contents
|
|
else:
|
|
raise cbor2.CBORDecodeValueError("expected array or typed array")
|
|
return array.reshape(dimensions, order="F" if column_major else "C")
|
|
|
|
|
|
def decode_typed_array(tag, dtype):
|
|
if not isinstance(tag.value, bytes):
|
|
raise cbor2.CBORDecodeValueError("expected byte string in typed array")
|
|
return np.frombuffer(tag.value, dtype=dtype)
|
|
|
|
|
|
def decode_dectris_compression(tag):
|
|
algorithm, elem_size, encoded = tag.value
|
|
return decompress(encoded, algorithm, elem_size=elem_size)
|
|
|
|
|
|
tag_decoders = {
|
|
40: lambda tag: decode_multi_dim_array(tag, column_major=False),
|
|
64: lambda tag: decode_typed_array(tag, dtype="u1"),
|
|
65: lambda tag: decode_typed_array(tag, dtype=">u2"),
|
|
66: lambda tag: decode_typed_array(tag, dtype=">u4"),
|
|
67: lambda tag: decode_typed_array(tag, dtype=">u8"),
|
|
68: lambda tag: decode_typed_array(tag, dtype="u1"),
|
|
69: lambda tag: decode_typed_array(tag, dtype="<u2"),
|
|
70: lambda tag: decode_typed_array(tag, dtype="<u4"),
|
|
71: lambda tag: decode_typed_array(tag, dtype="<u8"),
|
|
72: lambda tag: decode_typed_array(tag, dtype="i1"),
|
|
73: lambda tag: decode_typed_array(tag, dtype=">i2"),
|
|
74: lambda tag: decode_typed_array(tag, dtype=">i4"),
|
|
75: lambda tag: decode_typed_array(tag, dtype=">i8"),
|
|
77: lambda tag: decode_typed_array(tag, dtype="<i2"),
|
|
78: lambda tag: decode_typed_array(tag, dtype="<i4"),
|
|
79: lambda tag: decode_typed_array(tag, dtype="<i8"),
|
|
80: lambda tag: decode_typed_array(tag, dtype=">f2"),
|
|
81: lambda tag: decode_typed_array(tag, dtype=">f4"),
|
|
82: lambda tag: decode_typed_array(tag, dtype=">f8"),
|
|
83: lambda tag: decode_typed_array(tag, dtype=">f16"),
|
|
84: lambda tag: decode_typed_array(tag, dtype="<f2"),
|
|
85: lambda tag: decode_typed_array(tag, dtype="<f4"),
|
|
86: lambda tag: decode_typed_array(tag, dtype="<f8"),
|
|
87: lambda tag: decode_typed_array(tag, dtype="<f16"),
|
|
1040: lambda tag: decode_multi_dim_array(tag, column_major=True),
|
|
56500: lambda tag: decode_dectris_compression(tag),
|
|
}
|
|
|
|
|
|
def tag_hook(decoder, tag):
|
|
tag_decoder = tag_decoders.get(tag.tag)
|
|
return tag_decoder(tag) if tag_decoder else tag
|
|
|
|
|
|
def signal_handler(signal, frame):
|
|
global TERMINATE
|
|
TERMINATE = True
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
context = zmq.Context()
|
|
socket = context.socket(zmq.SUB)
|
|
socket.RCVTIMEO = 20000 # in milliseconds
|
|
socket.connect("tcp://127.0.0.1:5400")
|
|
socket.setsockopt(zmq.SUBSCRIBE, b"")
|
|
|
|
while not TERMINATE:
|
|
try:
|
|
msg = socket.recv()
|
|
msg = cbor2.loads(msg, tag_hook=tag_hook)
|
|
imwrite('test.tif', msg['data']['default'])
|
|
title = msg['series_unique_id']
|
|
beam_center_x = msg['beam_center_x']
|
|
beam_center_y = msg['beam_center_y']
|
|
detector_distance = msg['detector_distance']
|
|
energy = msg['incident_energy']
|
|
print(msg["spots"])
|
|
except:
|
|
pass
|