// Copyright (2019-2023) Paul Scherrer Institute #include "HLSSimulatedDevice.h" #include #include #include "../fpga/hls/datamover_model.h" #include "../fpga/hls/hls_jfjoch.h" uint16_t checksum(const uint16_t *addr, size_t count) { /* Compute Internet Checksum for "count" bytes * beginning at location "addr". */ long sum = 0; for (int i = 0; i < count / 2; i++) sum += addr[i]; /* Add left-over byte, if any */ if (count % 2 == 1) sum += ((uint8_t *) addr)[count / 2]; /* Fold 32-bit sum to 16 bits */ while (sum>>16) sum = (sum & 0xffff) + (sum >> 16); return ~sum; } HLSSimulatedDevice::HLSSimulatedDevice(uint16_t data_stream, size_t in_frame_buffer_size_modules, int16_t numa_node) : FPGAAcquisitionDevice(data_stream), datamover_in(Direction::Input, nullptr), datamover_out(Direction::Output, nullptr), datamover_out_hbm_0(Direction::Output, (char *) hbm.data()), datamover_out_hbm_1(Direction::Output, (char *) hbm.data()), datamover_in_hbm_0(Direction::Input, (char *) hbm.data()), datamover_in_hbm_1(Direction::Input, (char *) hbm.data()), idle(true), hbm(hbm_if_size / 32 * hbm_if_count), dma_address_table(65536) { mac_addr = 0xCCAA11223344; ipv4_addr = 0x0132010A; max_modules = MAX_MODULES_FPGA; MapBuffersStandard(in_frame_buffer_size_modules, numa_node); auto in_mem_location32 = (uint32_t *) dma_address_table.data(); for (int i = 0; i < buffer_device.size(); i++) { in_mem_location32[2 * i ] = ((uint64_t) buffer_device[i]) & UINT32_MAX; in_mem_location32[2 * i + 1] = ((uint64_t) buffer_device[i]) >> 32; } } void HLSSimulatedDevice::CreateFinalPacket(const DiffractionExperiment& experiment) { CreatePacketJF(experiment, UINT64_MAX, 0, 0, nullptr, false); } void HLSSimulatedDevice::SendPacket(char *buffer, int len, uint8_t user) { auto obuff = (ap_uint<512> *)buffer; for (int i = 0; i < (len + 63) / 64; i++) { packet_512_t packet_in; if (i == (len + 63) / 64 - 1) packet_in.last = 1; else packet_in.last = 0; packet_in.keep = 0xFFFFFFFFFFFFFFFF; packet_in.user = user; packet_in.data = obuff[i]; din_eth.write(packet_in); } } void HLSSimulatedDevice::CreatePacketJF(const DiffractionExperiment& experiment, uint64_t frame_number, uint32_t eth_packet, uint32_t module_number, const uint16_t *data, bool trigger, int8_t adjust_axis, uint8_t user) { char buff[256*64]; memset(buff, 0, 256*64); auto packet = (jf_raw_packet *)buff; packet->ether_type = htons(0x0800); packet->sour_mac[0] = 0x00; // module 0 uint64_t tmp_mac = mac_addr; for (int i = 0; i < 6; i++) packet->dest_mac[i] = (tmp_mac >> (8*i)) % 256; uint32_t half_module = 2 * module_number | ((eth_packet >= 64) ? 1 : 0); packet->ipv4_header_h = htons(0x4500); // Big endian in IP header! packet->ipv4_header_total_length = htons(8268); // Big endian in IP header! packet->ipv4_header_dest_ip = ipv4_addr; packet->ipv4_header_sour_ip = experiment.GetSrcIPv4Address(data_stream, half_module); packet->ipv4_header_ttl_protocol = htons(0x0011); packet->ipv4_header_checksum = checksum( (uint16_t *) &packet->ipv4_header_h, 20); // checksum is already in network order packet->udp_dest_port = htons(GetUDPPort()); // module number packet->udp_sour_port = htons(0xDFAC); packet->udp_length = htons(8248); // JF headers are little endian packet->jf.timestamp = 0xABCDEF0000FEDCBAL; packet->jf.bunchid = 0x1234567898765431L; packet->jf.xCoord = half_module; packet->jf.framenum = frame_number; packet->jf.packetnum = eth_packet % 64; if (trigger) packet->jf.debug = 1<<31; if (data != nullptr) { for (int i = 0; i < 4096; i++) packet->jf.data[i] = data[i]; } packet->udp_checksum = htons(checksum( (uint16_t *) (buff+42), 8192+48)); SendPacket(buff, (130+adjust_axis)*64, user); } void HLSSimulatedDevice::CreatePackets(const DiffractionExperiment& experiment, uint64_t frame_number_0, uint64_t frames, uint32_t module_number, const uint16_t *data, bool trigger, int8_t adjust_axis, uint8_t user) { for (uint64_t i = 0; i < frames; i++) { for (int j = 0; j < 128; j++) CreatePacketJF(experiment, frame_number_0 + i, j, module_number, data + (i * 128 + j) * 4096, trigger, adjust_axis, user); } } AXI_STREAM & HLSSimulatedDevice::OutputStream() { return dout_eth; } void HLSSimulatedDevice::HW_ReadActionRegister(DataCollectionConfig *job) { memcpy(job, &cfg, sizeof(DataCollectionConfig)); } void HLSSimulatedDevice::HW_WriteActionRegister(const DataCollectionConfig *job) { memcpy(&cfg, job, sizeof(DataCollectionConfig)); } void HLSSimulatedDevice::FPGA_StartAction(const DiffractionExperiment &experiment) { if (action_thread.joinable()) action_thread.join(); run_counter += 1; run_data_collection = 1; cancel_data_collection = 0; idle = false; while (!din_frame_generator.empty()) din_frame_generator.read(); datamover_out.ClearCompletedDescriptors(); action_thread = std::thread(&HLSSimulatedDevice::HLSMainThread, this ); } void HLSSimulatedDevice::FrameGeneratorFuture(FrameGeneratorConfig config) { frame_generator(din_frame_generator, hbm.data(), hbm.data(), hbm_if_size, config.frames, config.modules, mac_addr, config.dest_mac_addr, ipv4_addr, config.dest_ipv4_addr, config.bunchid, config.exptime, config.debug, cancel_data_collection); } void HLSSimulatedDevice::HW_RunInternalGenerator(const FrameGeneratorConfig &config) { frame_generator_future = std::async(std::launch::async, &HLSSimulatedDevice::FrameGeneratorFuture, this, config); } void HLSSimulatedDevice::FPGA_EndAction() { if (action_thread.joinable()) action_thread.join(); } HLSSimulatedDevice::~HLSSimulatedDevice() { if (action_thread.joinable()) action_thread.join(); } bool HLSSimulatedDevice::HW_ReadMailbox(uint32_t *values) { std::unique_lock ul(completion_mutex); ap_uint<32> tmp; bool ret = completion_stream.read_nb(tmp); values[0] = tmp; // equivalent to driver functionality if (ret) { uint32_t data_collection_id = (values[0] >> 16) & 0xFFFF; uint32_t handle = values[0] & 0xFFFF; if (handle == HANDLE_START) completion_count = 0; else if ((handle != HANDLE_END) && (data_collection_id != DATA_COLLECTION_ID_PURGE)) { completion_count++; while (completion_count * DMA_DESCRIPTORS_PER_MODULE > datamover_out.GetCompletedDescriptors()) std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } return ret; } void HLSSimulatedDevice::Cancel() { cancel_data_collection = 1; } bool HLSSimulatedDevice::HW_IsIdle() const { return idle && datamover_out.IsIdle(); } bool HLSSimulatedDevice::HW_SendWorkRequest(uint32_t handle) { work_request_stream.write(handle); return true; } inline uint32_t float2uint(float f) { float_uint32 fu; fu.f = f; return fu.u; } void HLSSimulatedDevice::HLSMainThread() { uint64_t counter_hbm; uint64_t counter_host; uint64_t eth_packets; uint64_t icmp_packets; uint64_t udp_packets; uint64_t sls_packets; uint32_t udp_len_err; uint32_t udp_eth_err; ap_uint<1> clear_counters = 0; uint64_t packets_processed; std::vector hls_cores; STREAM_512 ip1, udp1, udp2, icmp1, arp1; STREAM_512 network0; STREAM_512 raw0; STREAM_512 raw1; STREAM_512 raw2; STREAM_512 raw3; STREAM_768 stream_768_0; STREAM_768 stream_768_1; STREAM_768 stream_768_2; STREAM_768 stream_768_3; STREAM_512 converted_0; STREAM_512 converted_1; STREAM_512 converted_2; STREAM_512 converted_3; STREAM_512 converted_3a; STREAM_512 converted_4; STREAM_512 converted_5; STREAM_512 converted_5a; STREAM_576 converted_6; STREAM_512 converted_7; STREAM_512 converted_8; hls::stream addr0; hls::stream addr1; hls::stream addr2; hls::stream addr3; hls::stream compl0, compl1, compl2, compl2a, compl3, compl4, compl5, compl6, compl7, compl8; hls::stream> hbm_handles; hls::stream> adu_histo_result; hls::stream> integration_result_0; hls::stream> integration_result_1; hls::stream> spot_finder_result_0; hls::stream> spot_finder_result_1; hls::stream> spot_finder_conn_0; hls::stream> spot_finder_result_2; hls::stream> spot_finder_mask_0; hls::stream > udp_metadata; volatile ap_uint<1> idle_data_collection = 1; ap_uint<1> load_to_hbm_idle; ap_uint<1> save_to_hbm_idle; ap_uint<1> integration_idle; ap_uint<1> stream_conv_idle; ap_uint<1> frame_summation_idle; volatile bool done = false; volatile bool udp_done = false; // done AND udp_idle volatile bool sls_done = false; // done AND sls_idle // Sent gratuitous ARP message arp(arp1, dout_eth, mac_addr, ipv4_addr, 1, 1); Logger logger_hls("HLS"); volatile rcv_state_t state = RCV_INIT; // Start data collection data_collection_fsm(raw0, raw1, addr0, addr1, run_data_collection, cancel_data_collection, idle_data_collection, cfg.mode, float2uint(cfg.energy_kev), cfg.nframes, cfg.nmodules, cfg.nstorage_cells, cfg.nsummation , state); run_data_collection = 0; data_collection_fsm(raw0, raw1, addr0, addr1, run_data_collection, cancel_data_collection, idle_data_collection, cfg.mode, float2uint(cfg.energy_kev), cfg.nframes, cfg.nmodules, cfg.nstorage_cells, cfg.nsummation, state); hls_cores.emplace_back([&] { while (!udp_done) { if (din_eth.empty() && din_eth_4x10G.empty() && din_frame_generator.empty()) std::this_thread::sleep_for(std::chrono::microseconds(10)); else stream_merge(din_eth, din_eth_4x10G, din_frame_generator, network0, data_source); } logger_hls.Info("Stream_merge done"); }); hls_cores.emplace_back([&] { while (!udp_done) { if (network0.empty()) std::this_thread::sleep_for(std::chrono::microseconds(10)); else ethernet(network0, ip1, arp1, mac_addr, eth_packets, clear_counters); } logger_hls.Info("ethernet done"); }); hls_cores.emplace_back([&] { while (!udp_done) { if (ip1.empty()) std::this_thread::sleep_for(std::chrono::microseconds(10)); else ipv4(ip1, udp1, icmp1, ipv4_addr); } logger_hls.Info("ipv4 done"); }); hls_cores.emplace_back([&] { ap_uint<1> udp_idle = 1; while (!done || !udp_idle) { if (udp1.empty()) std::this_thread::sleep_for(std::chrono::microseconds(10)); else udp(udp1, udp2, udp_metadata, udp_packets, clear_counters, udp_idle); } udp_done = true; logger_hls.Info("udp done"); }); hls_cores.emplace_back([&] { ap_uint<1> sls_idle = 1; while (!done || !sls_idle) { if (udp2.empty()) std::this_thread::sleep_for(std::chrono::microseconds (10)); else sls_detector(udp2, udp_metadata, raw0, addr0, sls_packets, udp_eth_err, udp_len_err, clear_counters, sls_idle); } sls_done = true; logger_hls.Info("sls_detector done"); }); // 1. Parse incoming UDP packets hls_cores.emplace_back([&] { while ((state != RCV_WAIT_FOR_START) || (idle_data_collection.read() == 0) || (!raw0.empty())) { data_collection_fsm(raw0, raw1, addr0, addr1, run_data_collection, cancel_data_collection, idle_data_collection, cfg.mode, float2uint(cfg.energy_kev), cfg.nframes, cfg.nmodules, cfg.nstorage_cells, cfg.nsummation, state); } done = true; logger_hls.Info("data collection done"); }); // 2. Cache images in HBM hls_cores.emplace_back([&] { save_to_hbm(raw1, converted_1, addr1, compl0, hbm_handles, datamover_out_hbm_0.GetDataStream(), datamover_out_hbm_1.GetDataStream(), datamover_out_hbm_0.GetCtrlStream(), datamover_out_hbm_1.GetCtrlStream(), save_to_hbm_idle, hbm_if_size);}); hls_cores.emplace_back([&] {frame_summation_reorder_compl(converted_1, converted_2, compl0, compl1);}); hls_cores.emplace_back([&] { load_from_hbm(converted_2, converted_3, compl1, compl2, hbm_handles, datamover_in_hbm_0.GetDataStream(), datamover_in_hbm_1.GetDataStream(), datamover_in_hbm_0.GetCtrlStream(), datamover_in_hbm_1.GetCtrlStream(), load_to_hbm_idle, hbm_if_size);}); hls_cores.emplace_back([&] { pedestal(converted_3, converted_3a, compl2, compl2a, hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm_if_size);}); // 3. Calculate histogram of ADU values hls_cores.emplace_back([&] { adu_histo(converted_3a, converted_4, adu_histo_result, compl2a, compl3);}); // 4. Mask missing pixels hls_cores.emplace_back([&] { mask_missing(converted_4, converted_5, compl3, compl4);}); // 5. Apply pedestal & gain corrections hls_cores.emplace_back([&] { jf_conversion(converted_5, converted_6, compl4, compl5, hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm.data(), hbm_if_size); }); // 6. Frame summation hls_cores.emplace_back([&] { frame_summation(converted_6, stream_768_0, compl5, compl6, frame_summation_idle);}); // 7. Integration of pixels hls_cores.emplace_back([&] { integration(stream_768_0, stream_768_1, integration_result_0, compl6, compl7, hbm.data(), hbm.data(), hbm.data(), hbm.data(), integration_idle, hbm_if_size);}); hls_cores.emplace_back([&] { axis_64_to_512(integration_result_0, integration_result_1);}); // 8. Spot finding ap_uint<32> tmp_snr_threshold = float2uint(spot_finder_parameters.snr_threshold); ap_uint<32> min_d = float2uint(spot_finder_parameters.min_d); ap_uint<32> max_d = float2uint(spot_finder_parameters.max_d); ap_int<32> tmp_count_threshold = spot_finder_parameters.count_threshold; ap_uint<32> min_pix_per_spot = spot_finder_parameters.min_pix_per_spot; hls_cores.emplace_back([&] { spot_finder_mask(stream_768_1, stream_768_2, spot_finder_mask_0, compl7, compl8, hbm.data(), hbm.data(), min_d, max_d, hbm_if_size); logger_hls.Info("spot_finder_mask done"); }); hls_cores.emplace_back([&] { spot_finder(stream_768_2, spot_finder_mask_0, stream_768_3, spot_finder_result_0, tmp_count_threshold, tmp_snr_threshold); logger_hls.Info("spot_finder done"); }); hls_cores.emplace_back([&] { spot_finder_connectivity(spot_finder_result_0, spot_finder_result_1, spot_finder_conn_0); logger_hls.Info("spot_finder_connectivity done"); }); hls_cores.emplace_back([&] { spot_finder_merge(spot_finder_result_1, spot_finder_conn_0, spot_finder_result_2, min_pix_per_spot); logger_hls.Info("spot_finder_merge done"); }); // 9. Reduce/extend 24-bit stream hls_cores.emplace_back([&] { stream_24bit_conv(stream_768_3, converted_7, stream_conv_idle);}); // 10. Prepare data to write to host memory hls_cores.emplace_back([&] { ap_uint<3> state; host_writer(converted_7, adu_histo_result, integration_result_1, spot_finder_result_2, compl8, datamover_out.GetDataStream(), datamover_out.GetCtrlStream(), work_request_stream, completion_stream, dma_address_table.data(), packets_processed, host_writer_idle, cancel_data_collection, state);}); for (auto &i : hls_cores) i.join(); if (frame_generator_future.valid()) frame_generator_future.get(); // reset static counter arp(arp1, dout_eth, mac_addr, ipv4_addr, 0, 1); try { while (!din_eth.empty()) din_eth.read(); if (!addr1.empty()) throw std::runtime_error("Addr1 queue not empty"); if (!addr2.empty()) throw std::runtime_error("Addr2 queue not empty"); if (!addr3.empty()) throw std::runtime_error("Addr3 queue not empty"); if (!raw1.empty()) throw std::runtime_error("Raw1 queue not empty"); if (!raw2.empty()) throw std::runtime_error("Raw2 queue not empty"); if (!raw3.empty()) throw std::runtime_error("Raw3 queue not empty"); if (!converted_0.empty()) throw std::runtime_error("Converted_0 queue not empty"); if (!converted_1.empty()) throw std::runtime_error("Converted_1 queue not empty"); if (!converted_2.empty()) throw std::runtime_error("Converted_2 queue not empty"); if (!converted_3.empty()) throw std::runtime_error("Converted_3 queue not empty"); if (!converted_4.empty()) throw std::runtime_error("Converted_4 queue not empty"); if (!converted_5.empty()) throw std::runtime_error("Converted_5 queue not empty"); if (!converted_6.empty()) throw std::runtime_error("Converted_6 queue not empty"); if (!converted_7.empty()) throw std::runtime_error("Converted_7 queue not empty"); if (!converted_8.empty()) throw std::runtime_error("Converted_8 queue not empty"); if (!compl0.empty()) throw std::runtime_error("Compl0 queue not empty"); if (!compl1.empty()) throw std::runtime_error("Compl1 queue not empty"); if (!compl2.empty()) throw std::runtime_error("Compl2 queue not empty"); if (!compl3.empty()) throw std::runtime_error("Compl3 queue not empty"); if (!compl4.empty()) throw std::runtime_error("Compl4 queue not empty"); if (!compl5.empty()) throw std::runtime_error("Compl5 queue not empty"); if (!compl6.empty()) throw std::runtime_error("Compl6 queue not empty"); if (!compl7.empty()) throw std::runtime_error("Compl7 queue not empty"); if (!stream_768_0.empty()) throw std::runtime_error("Stream_768_0 queue not empty"); if (!stream_768_1.empty()) throw std::runtime_error("Stream_768_1 queue not empty"); if (!stream_768_2.empty()) throw std::runtime_error("Stream_768_2 queue not empty"); if (!hbm_handles.empty()) throw std::runtime_error("Handles queue not empty"); if (!integration_result_0.empty()) throw std::runtime_error("Integration result queue not empty"); if (!integration_result_1.empty()) throw std::runtime_error("Integration result queue not empty"); if (!datamover_in.GetDataStream().empty()) throw std::runtime_error("Datamover queue is not empty"); while (!datamover_out.IsIdle()) std::this_thread::sleep_for(std::chrono::milliseconds(100)); } catch (const std::runtime_error &e) { if (logger) logger->ErrorException(e); throw e; } if (logger) logger->Info("Packets Eth {} UDP {} SLS {} Proc {}", eth_packets, udp_packets, sls_packets, packets_processed); idle = true; } void HLSSimulatedDevice::HW_GetStatus(DataCollectionStatus *status) const { memset(status, 0, sizeof(DataCollectionStatus)); status->ctrl_reg = ap_uint<1>(host_writer_idle) ? (1 << 4) : 0; status->max_modules = max_modules; status->hbm_size_bytes = hbm_if_size; status->run_counter = run_counter; } void HLSSimulatedDevice::HW_LoadCalibration(const LoadCalibrationConfig &config) { int ret = load_calibration(hbm.data(), hbm.data(), config, hbm_if_size, datamover_in.GetCtrlStream(), datamover_in.GetDataStream(), dma_address_table.data()); if (ret) throw JFJochException(JFJochExceptionCategory::AcquisitionDeviceError, "Error in loading calibration " + std::to_string(ret)); if (!datamover_in.GetDataStream().empty()) throw std::runtime_error("Datamover queue is not empty"); } void HLSSimulatedDevice::HW_SetSpotFinderParameters(const SpotFinderParameters ¶ms) { spot_finder_parameters = params; } void HLSSimulatedDevice::HW_SetDataSource(uint32_t val) { data_source = val; } uint32_t HLSSimulatedDevice::HW_GetDataSource() { return data_source; }