Files
sf_daq_broker/tests/test_buffer_reader.py

73 lines
2.4 KiB
Python

import os
import struct
import unittest
import zmq
from sf_daq_broker.detector.buffer_reader import ModuleReader, DetectorReader
from sf_daq_broker.detector.image_assembler import get_pull_receiver
from sf_daq_broker.detector.ram_buffer import RamBuffer
from tests.utils import test_ram_buffer, fill_binary_file
class TestBufferReader(unittest.TestCase):
def setUp(self):
os.makedirs("./M00/200000/", exist_ok=True)
os.makedirs("./M01/200000/", exist_ok=True)
fill_binary_file("./M00/200000/202000.bin", pulse_id=202002, module_id=0)
fill_binary_file("./M01/200000/202000.bin", pulse_id=202002, module_id=1)
def tearDown(self):
os.remove("./M00/200000/202000.bin")
os.remove("./M01/200000/202000.bin")
os.removedirs("./M00/200000/")
os.removedirs("./M01/200000/")
def test_frame_loading(self):
n_modules = 2
n_slots = 5
detector_folder = "."
ram_buffer = RamBuffer(n_modules=n_modules, n_slots=n_slots)
module_0_reader = ModuleReader(ram_buffer=ram_buffer, detector_folder=detector_folder, module_id=0)
module_1_reader = ModuleReader(ram_buffer=ram_buffer, detector_folder=detector_folder, module_id=1)
module_0_reader.load_frame_to_ram_buffer(202002)
module_1_reader.load_frame_to_ram_buffer(202002)
test_ram_buffer(self, ram_buffer, 202002)
def test_detector_reading(self):
n_modules = 2
n_slots = 5
detector_folder = "."
pulse_id = 202002
context = zmq.Context()
receivers = []
ram_buffer = RamBuffer(n_modules=n_modules, n_slots=n_slots)
detector_reader = DetectorReader(ram_buffer=ram_buffer,
detector_folder=detector_folder,
n_modules=n_modules,
zmq_context=context)
for i_module in range(n_modules):
receiver = get_pull_receiver(context, zmq_rcv_hwm=1)
receiver.connect("inproc://%d" % i_module)
receivers.append(receiver)
detector_reader.start_reading(start_pulse_id=pulse_id, stop_pulse_id=pulse_id, pulse_id_step=1)
for receiver in receivers:
received_pulse_id = struct.unpack("Q", receiver.recv())[0]
self.assertEqual(pulse_id, received_pulse_id)
detector_reader.close()
test_ram_buffer(self, ram_buffer=ram_buffer, pulse_id=pulse_id)