mirror of
https://github.com/paulscherrerinstitute/sf_daq_broker.git
synced 2026-04-30 16:52:23 +02:00
73 lines
2.4 KiB
Python
73 lines
2.4 KiB
Python
import os
|
|
import struct
|
|
import unittest
|
|
|
|
import zmq
|
|
|
|
from sf_daq_broker.detector.buffer_reader import ModuleReader, DetectorReader
|
|
from sf_daq_broker.detector.image_assembler import get_pull_receiver
|
|
from sf_daq_broker.detector.ram_buffer import RamBuffer
|
|
from tests.utils import test_ram_buffer, fill_binary_file
|
|
|
|
|
|
class TestBufferReader(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
os.makedirs("./M00/200000/", exist_ok=True)
|
|
os.makedirs("./M01/200000/", exist_ok=True)
|
|
|
|
fill_binary_file("./M00/200000/202000.bin", pulse_id=202002, module_id=0)
|
|
fill_binary_file("./M01/200000/202000.bin", pulse_id=202002, module_id=1)
|
|
|
|
def tearDown(self):
|
|
os.remove("./M00/200000/202000.bin")
|
|
os.remove("./M01/200000/202000.bin")
|
|
|
|
os.removedirs("./M00/200000/")
|
|
os.removedirs("./M01/200000/")
|
|
|
|
def test_frame_loading(self):
|
|
n_modules = 2
|
|
n_slots = 5
|
|
detector_folder = "."
|
|
|
|
ram_buffer = RamBuffer(n_modules=n_modules, n_slots=n_slots)
|
|
module_0_reader = ModuleReader(ram_buffer=ram_buffer, detector_folder=detector_folder, module_id=0)
|
|
module_1_reader = ModuleReader(ram_buffer=ram_buffer, detector_folder=detector_folder, module_id=1)
|
|
|
|
module_0_reader.load_frame_to_ram_buffer(202002)
|
|
module_1_reader.load_frame_to_ram_buffer(202002)
|
|
|
|
test_ram_buffer(self, ram_buffer, 202002)
|
|
|
|
def test_detector_reading(self):
|
|
n_modules = 2
|
|
n_slots = 5
|
|
detector_folder = "."
|
|
pulse_id = 202002
|
|
|
|
context = zmq.Context()
|
|
receivers = []
|
|
|
|
ram_buffer = RamBuffer(n_modules=n_modules, n_slots=n_slots)
|
|
detector_reader = DetectorReader(ram_buffer=ram_buffer,
|
|
detector_folder=detector_folder,
|
|
n_modules=n_modules,
|
|
zmq_context=context)
|
|
|
|
for i_module in range(n_modules):
|
|
receiver = get_pull_receiver(context, zmq_rcv_hwm=1)
|
|
receiver.connect("inproc://%d" % i_module)
|
|
|
|
receivers.append(receiver)
|
|
|
|
detector_reader.start_reading(start_pulse_id=pulse_id, stop_pulse_id=pulse_id, pulse_id_step=1)
|
|
|
|
for receiver in receivers:
|
|
received_pulse_id = struct.unpack("Q", receiver.recv())[0]
|
|
self.assertEqual(pulse_id, received_pulse_id)
|
|
|
|
detector_reader.close()
|
|
|
|
test_ram_buffer(self, ram_buffer=ram_buffer, pulse_id=pulse_id)
|