merge with eiger for gpfs writing

This commit is contained in:
Data Backend account
2022-04-29 18:18:35 +02:00
parent 6a321b2bac
commit 6ecc79257b
40 changed files with 905 additions and 357 deletions
+2 -2
View File
@@ -23,10 +23,10 @@ namespace BufferUtils
return os << det_config.detector_name << ' '
<< det_config.detector_type << ' '
<< det_config.n_modules << ' '
<< det_config.start_udp_port << ' '
<< det_config.bit_depth << ' '
<< det_config.image_height << ' '
<< det_config.image_width << ' ';
<< det_config.image_width << ' '
<< det_config.start_udp_port << ' ';
}
};
+11
View File
@@ -98,6 +98,11 @@ void* BufferUtils::connect_socket(
detector_name + "-" +
stream_name;
#ifdef DEBUG_OUTPUT
cout << "[BufferUtils::connect_socket]";
cout << " IPC address: " << ipc_address << endl;
#endif
void* socket = zmq_socket(ctx, ZMQ_SUB);
if (socket == nullptr) {
throw runtime_error(zmq_strerror(errno));
@@ -166,6 +171,12 @@ void* BufferUtils::bind_socket(
detector_name + "-" +
stream_name;
#ifdef DEBUG_OUTPUT
cout << "[BufferUtils::bind_socket]";
cout << " IPC address: " << ipc_address << endl;
#endif
void* socket = zmq_socket(ctx, ZMQ_PUB);
const int sndhwm = BUFFER_ZMQ_SNDHWM;
+23
View File
@@ -0,0 +1,23 @@
# arm centos
FROM arm64v8/centos:7
# setting env variables
ENV container docker
ENV CPLUS_INCLUDE_PATH /hdf5-1.12.0/hdf5/include/
RUN rm /bin/sh && ln -s /bin/bash /bin/sh
SHELL ["/bin/bash", "-c"]
RUN yum -y update; yum clean all; yum -y groupinstall "Development Tools";
RUN yum -y install centos-release-scl-rh
RUN yum -y install dnf wget epel-release which
RUN yum -y install cmake3 zeromq-devel hdf5-devel mpich-3.2
RUN yum -y install devtoolset-9 openmpi-devel
RUN scl enable devtoolset-9 bash && source /etc/profile.d/modules.sh && module load mpi && \
wget https://support.hdfgroup.org/ftp/HDF5/releases/hdf5-1.12/hdf5-1.12.0/src/hdf5-1.12.0.tar.gz && \
tar -xzf hdf5-1.12.0.tar.gz && cd hdf5-1.12.0 && ./configure --enable-parallel; make install
RUN scl enable devtoolset-9 bash && source /etc/profile.d/modules.sh && module load mpi && \
rm -rf /sf_daq_buffer && git clone https://github.com/paulscherrerinstitute/sf_daq_buffer.git && \
cd sf_daq_buffer && git checkout eiger && mkdir build
RUN echo "source /opt/rh/devtoolset-9/enable && source /etc/profile.d/modules.sh && module load mpi && cd /sf_daq_buffer/build/ && cmake3 .. -DUSE_EIGER=ON -DBUILD_JF_LIVE_WRITER=ON -DCMAKE_CXX_STANDARD_LIBRARIES=\"-L/hdf5-1.12.0/hdf5/lib/ -lhdf5\" && make" >> /tmp/setup
RUN /bin/bash -C "/tmp/setup"
RUN rm -f /tmp/setup
ENTRYPOINT [ "/bin/bash" ]
+1
View File
@@ -4,5 +4,6 @@
"n_modules": 4,
"image_height": 514,
"image_width": 1030,
"bit_depth": 16,
"start_udp_port": 50200
}
+10 -28
View File
@@ -21,7 +21,6 @@ CONFIG_FILE='/home/hax_l/sf_daq_buffer/eiger/sf-daq-4/config/eiger.json'
HELP_FLAG=0
# CONFIGURATION
BIT_DEPTH=16
N_MPI_EXEC=3
STREAM_NAME='streamvis'
while getopts h:c:b:m: flag
@@ -29,7 +28,6 @@ do
case "${flag}" in
h ) HELP_FLAG=${OPTARG};;
c ) CONFIG_FILE=${OPTARG};;
b ) BIT_DEPTH=${OPTARG};;
m ) N_MPIT_EXEC=${OPTARG};;
s ) STREAMVIS=${OPTARG};;
esac
@@ -37,10 +35,9 @@ done
# prints help and exits
if (( ${HELP_FLAG} == 1 )); then
echo "Usage : $0 -h <help_flag> -c <config_file> -b <bit_depth> -s <stream_name>"
echo "Usage : $0 -h <help_flag> -c <config_file> -s <stream_name>"
echo " help_flag : show this help and exits."
echo " config_file : detector configuration file."
echo " bit_depth : detector bit depth."
echo " stream name : live stream name."
exit
fi
@@ -74,16 +71,11 @@ echo "Starting ${N_UDP_RECVS} udp receivers..."
COUNTER=0
if [ -f "${BUILD_PATH}${UDP_RECV}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
if [ ${BIT_DEPTH} -ne 0 ]; then
while [ $COUNTER -lt ${N_UDP_RECVS} ]; do
${BUILD_PATH}${UDP_RECV} ${CONFIG_FILE} ${COUNTER} ${BIT_DEPTH} &
let COUNTER=COUNTER+1
sleep 0.5
done
else
echo "Error: ${BIT_DEPTH} can't be zero..."
exit
fi
while [ $COUNTER -lt ${N_UDP_RECVS} ]; do
${BUILD_PATH}${UDP_RECV} ${CONFIG_FILE} ${COUNTER}&
let COUNTER=COUNTER+1
sleep 0.5
done
else
echo "Something went wrong while starting the ${UDP_RECV}..."
exit
@@ -115,13 +107,8 @@ fi
echo "Starting the ${EIGER_ASSEMBLER}..."
if [ -f "${BUILD_PATH}${EIGER_ASSEMBLER}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
if [ ${BIT_DEPTH} -ne 0 ]; then
${BUILD_PATH}${EIGER_ASSEMBLER} ${CONFIG_FILE} ${BIT_DEPTH} &
sleep 0.5
else
echo "Error: ${BIT_DEPTH} can't be zero..."
exit
fi
${BUILD_PATH}${EIGER_ASSEMBLER} ${CONFIG_FILE} &
sleep 0.5
else
echo "Something went wrong while starting the ${EIGER_ASSEMBLER}..."
exit
@@ -135,12 +122,7 @@ fi
echo "Starting the ${STD_STREAM_SEND}..."
if [ -f "${BUILD_PATH}${STD_STREAM_SEND}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
if [ ${BIT_DEPTH} -ne 0 ]; then
${BUILD_PATH}${STD_STREAM_SEND} ${CONFIG_FILE} ${BIT_DEPTH} ${STREAM_NAME} &
else
echo "Error: ${BIT_DEPTH} can't be zero..."
exit
fi
${BUILD_PATH}${STD_STREAM_SEND} ${CONFIG_FILE} ${STREAM_NAME} &
else
echo "Something went wrong while starting the ${STD_STREAM_SEND}..."
exit
@@ -157,7 +139,7 @@ export LD_LIBRARY_PATH="/usr/lib64/mpich-3.2/lib:${LD_LIBRARY_PATH}";
if [ -f "${BUILD_PATH}${STD_DET_WRITER}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
mpiexec -n ${N_MPI_EXEC} ${BUILD_PATH}${STD_DET_WRITER} ${CONFIG_FILE} ${BIT_DEPTH} &
mpiexec -n ${N_MPI_EXEC} ${BUILD_PATH}${STD_DET_WRITER} ${CONFIG_FILE} &
sleep 0.5
else
echo "Something went wrong while starting the ${STD_DET_WRITER}..."
-6
View File
@@ -1,6 +0,0 @@
#!/bin/bash
export PATH=/home/dbe/miniconda3/bin:$PATH:/home/hax_l/sf_daq_buffer/slsDetectorPackage/build/bin/
source /home/dbe/miniconda3/etc/profile.d/conda.sh
conda deactivate
conda activate sf-daq
-12
View File
@@ -1,12 +0,0 @@
#!/bin/bash
if [ -z ${SLS_DET_PACKAGE_PATH} ]; then SLS_DET_PACKAGE_PATH="/home/hax_l/software/sf_daq_buffer/slsDetectorPackage/build/bin/"; fi
${SLS_DET_PACKAGE_PATH}sls_detector_put triggers 1
${SLS_DET_PACKAGE_PATH}sls_detector_put timing trigger
${SLS_DET_PACKAGE_PATH}sls_detector_put exptime 0.000005
${SLS_DET_PACKAGE_PATH}sls_detector_put frames 1
${SLS_DET_PACKAGE_PATH}sls_detector_put dr 16
${SLS_DET_PACKAGE_PATH}sls_detector_put start
echo "Acquisition started..."
-39
View File
@@ -1,39 +0,0 @@
#!/bin/bash
# default port 2050 and 2052
if [ $# -eq 0 ]
then
PORT=2050
PORT_SLAVE=2052
elif [ $# -eq 1 ]
then
PORT=$1
PORT_SLAVE=$(( $1 + 2 ))
else
echo "Usage : $0 <port>"
echo " tcp port : optional, default 2050"
exit
fi
SLS_DET_PACKAGE_PATH='/home/hax_l/software/sf_daq_buffer/slsDetectorPackage/build/bin/'
echo "Starting the master eiger virtual detector at port ${PORT}..."
if [ -f "${SLS_DET_PACKAGE_PATH}eigerDetectorServerMaster_virtual" ]; then
${SLS_DET_PACKAGE_PATH}eigerDetectorServerMaster_virtual -p ${PORT} &
else
echo "Eiger master was not found at ${SLS_DET_PACKAGE_PATH}."
exit
fi
echo "Starting the slave bottom eiger virtual detector at port ${PORT_SLAVE}..."
if [ -f "${SLS_DET_PACKAGE_PATH}eigerDetectorServerSlaveBottom_virtual" ]; then
${SLS_DET_PACKAGE_PATH}eigerDetectorServerSlaveBottom_virtual -p ${PORT} &
else
echo "Eiger slave bottom was not found at ${SLS_DET_PACKAGE_PATH}."
exit
fi
+35
View File
@@ -0,0 +1,35 @@
import zmq
from _ctypes import Structure
from ctypes import c_uint64, c_uint16
class ImageMetadata(Structure):
_pack_ = 1
_fields_ = [
("version", c_uint64),
("id", c_uint64),
("height", c_uint64),
("width", c_uint64),
("dtype", c_uint16),
("encoding", c_uint16),
("source_id", c_uint16),
("status", c_uint16),
("user_1", c_uint64),
("user_2", c_uint64)]
def as_dict(self):
return dict((f, getattr(self, f)) for f, _ in self._fields_)
zmq_context = zmq.Context()
backend_socket = zmq_context.socket(zmq.SUB)
#backend_socket.setsockopt(zmq.RCVTIMEO, 100)
backend_socket.setsockopt_string(zmq.SUBSCRIBE, "")
backend_socket.connect("ipc:///tmp/std-daq-cSAXS.EG01V01-assembler")
while True:
image_bytes = backend_socket.recv()
image_meta = ImageMetadata.from_buffer_copy(image_bytes)
print(f'Image received: {image_meta.id}')
+8 -6
View File
@@ -1,5 +1,6 @@
import socket
import numpy as np
import sys
frame_header_dt = np.dtype(
[
@@ -19,19 +20,20 @@ frame_header_dt = np.dtype(
]
)
#ip = "10.30.40.211"
ip = sys.argv[1]
ip = "10.30.20.6"
ports = list(range(50200, 50204, 1))
ports = list(range(50000, 50004, 1))
sockets = [socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for i in range(len(ports))]
for s, p in zip(sockets, ports):
print("IP:Port: ", ip, p)
s.bind((ip, p))
print(ip,p)
s.bind((ip,p))
while True:
for s in sockets:
data, address = s.recvfrom(4096)
h = np.frombuffer(data, count=1, dtype=frame_header_dt)[0]
print(
f'pkt:{h["Packet Number"]}, frame: {h["Frame Number"]}, row: {h["Row"]}, column: {h["Column"]}'
)
f'[{h["Timestamp"]}] frame: {h["Frame Number"]}, pkt:{h["Packet Number"]}, explength:{h["SubFrame Number/ExpLength"]}, module id: {h["Module Id"]} ,row: {h["Row"]}, column: {h["Column"]}'
)
+17
View File
@@ -0,0 +1,17 @@
#!/usr/bin/env python
from datetime import datetime
from slsdet import Eiger
n_test = 10
n_frames = 1000
d = Eiger()
#d.nextframenumber = 1
#d.frames = n_frames
#d.period = 0.001
#d.exptime = 0.0005
for i in range(0,n_test):
d.acquire()
next_fn = d.nextframenumber
print(datetime.now(), ' ', next_fn)
#if not isinstance(next_fn, int):
# raise ValueError(f'Frame numbers differ: {next_fn}')
+12
View File
@@ -0,0 +1,12 @@
{
"size": "medium",
"price": 15.67,
"toppings": ["mushrooms", "pepperoni", "basil"],
"extra_cheese": false,
"delivery": true,
"client": {
"name": "Jane Doe",
"phone": null,
"email": "janedoe@email.com"
}
}
+6
View File
@@ -0,0 +1,6 @@
frames 10
exptime 0.01
period 0.1
dr 16
timing auto #in case this is trigger it waits forever
+28
View File
@@ -0,0 +1,28 @@
import os
import json
import sys
config_file = '/home/dbe/sls/hosts/xbl-daq-28/cSAXS.EG01V01.json'
if len(sys.argv) != 2:
print("error... bit depth argument missing")
bit_d = sys.argv[1]
with open(config_file) as f:
content = json.load(f)
content['bit_depth'] = int(bit_d)
#f.truncate(0)
#json.dump(content, f, indent='\t', separators=(',', ': '))
with open(config_file, 'w') as f:
f.seek(0)
json.dump(content, f, indent='\t')
#os.chdir('/home/dbe/sls/hosts/xbl-daq-28/')
#rc = os.system(f'git pull && git add {config_file} && git commit -m "[LOG] config test change push" && git push')
#print('result value: ', rc)
+22
View File
@@ -0,0 +1,22 @@
detsize 512 1024
#T65 test detector for SLS2
hostname beb003+beb043+
#top
0:rx_tcpport 1954
0:udp_dstport 50000
0:udp_dstport2 50001
0:udp_srcip 10.254.0.33
#bottom
1:rx_tcpport 1955
1:udp_dstport 50002
1:udp_dstport2 50003
1:udp_srcip 10.254.0.34
udp_dstip 10.254.0.1
udp_dstmac 24:be:05:bd:6c:52
tengiga 1
+26
View File
@@ -0,0 +1,26 @@
#!/bin/bash
GREP="std|streamvis|rabbit|journalbeat"
RESTART="OFF"
STATUS_VERBOSE="OFF"
while getopts g:s: flag
do
case "${flag}" in
g) GREP=${OPTARG};;
# r) RESTART='ON';;
s) STATUS_VERBOSE=${OPTARG};;
esac
done
if [ ${STATUS_VERBOSE} = "ON" ]; then
systemctl list-units --type service --all | grep -E ${GREP} | awk '{print $1}' | xargs -I{} systemctl status {}
fi
#if [ RESTART = "ON" ]; then
# systemctl list-units --type service --all | grep -E ${GREP} | awk '{print $1}' | xargs -I{} systemctl restart {}
#fi
systemctl list-units --type service --all | grep -E ${GREP} | awk 'BEGIN{print "Unit State Status"};$4 ~ /^running$/{print $1,$2,$4}' | column -t
+30
View File
@@ -0,0 +1,30 @@
#!/bin/bash
# usage ./start_eiger_detector.sh Eiger 1
if [ $# -lt 1 ]
then
echo "Usage : $0 DETECTOR_NAME <number_of_cycles>"
echo " DETECTOR_NAME: Eiger..."
echo " number_of_cycles : optional, default 100"
exit
fi
SLS_DET_PACKAGE_PATH=''
#SLS_DET_PACKAGE_PATH='/home/dbe/git/sf_daq_buffer_eiger/slsDetectorPackage/build/bin/'
# SLS_DET_PACKAGE_PATH='/home/hax_l/sf_daq_buffer/slsDetectorPackage/build/bin/'
# DETECTOR=$1
n_cycles=1
if [ $# == 2 ]
then
n_cycles=$2
fi
${SLS_DET_PACKAGE_PATH}sls_detector_put timing auto
${SLS_DET_PACKAGE_PATH}sls_detector_put triggers ${n_cycles}
${SLS_DET_PACKAGE_PATH}sls_detector_put exptime 0.000005
${SLS_DET_PACKAGE_PATH}sls_detector_put frames 60
${SLS_DET_PACKAGE_PATH}sls_detector_put dr 16
#sls_detector_put ${D}-clearbit to 0x5d 0 # normal mode, not highG0
${SLS_DET_PACKAGE_PATH}sls_detector_put acquire
echo "Now start trigger"
+35
View File
@@ -0,0 +1,35 @@
from datetime import datetime
import requests
import time
import json
# writer agent endpoint
URL = "http://127.0.0.1:5000"
# details of request
n_images = 10
n_acquisitions = 5
headers = {'Content-type': 'application/json'}
print("Configuring the detector...")
data_config = {"det_name":"eiger","config":{"frames":100,"dr":16, "triggers":1, "exptime":0.000005, "timing":"auto", "tengiga":1}}
r = requests.post(url="http://127.0.0.1:5000/detector", headers=headers, json=data_config)
time.sleep(0.5)
print("Starting the detector...")
start_data = {'cmd':"START"}
r = requests.post(url = "http://127.0.0.1:5000/detector/eiger", headers=headers, json=start_data)
time.sleep(0.5)
print("Performing sync aquisitions...")
for i in range(0,n_acquisitions):
output_file ='/home/dbe/git/sf_daq_buffer/eiger/xbl-daq-24/output_folder/eiger_sync_%s_%s.h5' % (datetime.now().strftime("%H%M%S"), i)
#output_file='/tmp/output.h5'
data = {'sources':'BEC.EG01V01', 'n_images':n_images, 'output_file':output_file}
print("REQUEST: ", i)
print("DATA: ", data)
r = requests.post(url = "http://127.0.0.1:5000/write_async", json=data, headers=headers)
print("RESPONSE FROM REQUEST: ", r.text)
time.sleep(0.2)
+19
View File
@@ -0,0 +1,19 @@
detsize 512 1024
hostname beb058+beb059+
udp_dstmac 9c:dc:71:47:e5:dc
udp_dstip 10.30.30.211
#top
0:udp_dstport 50000
0:udp_dstport2 50001
0:udp_srcip 10.30.30.50
0:rx_tcpport 1980
#bottom:
1:udp_dstport 50002
1:udp_dstport2 50003
1:udp_srcip 10.30.30.51
1:rx_tcpport 1981
tengiga 1
+19
View File
@@ -0,0 +1,19 @@
detsize 512 1024
hostname BEB111+BEB070
udp_dstmac 9c:dc:71:47:e5:dc
udp_dstip 10.30.30.211
#top
0:udp_dstport 50200
0:udp_dstport2 50201
0:udp_srcip 10.30.30.50
0:rx_tcpport 1980
#bottom:
1:udp_dstport 50202
1:udp_dstport2 50203
1:udp_srcip 10.30.30.51
1:rx_tcpport 1981
tengiga 1
+88 -28
View File
@@ -1,42 +1,102 @@
detsize 2048 2560
hostname BEB111+BEB070+BEB060+BEB042+BEB004+BEB120+BEB105+BEB121+BEB092+BEB091+BEB058+BEB059+BEB030+BEB038+BEB110+BEB115+BEB095+BEB094+BEB119+BEB055+
#
hostname beb058+beb059+
#hostname BEB111+BEB070+
#BEB060+BEB042+BEB004+BEB120+BEB105+BEB121+BEB092+BEB091+BEB058+BEB059+BEB030+BEB038+BEB110+BEB115+BEB095+BEB094+BEB119+BEB055+
udp_dstmac 9c:dc:71:47:e5:dc
udp_dstip 10.30.20.6
udp_dstip 10.30.30.211
#udp_dstmac 9c:dc:71:47:e5:d0
#udp_dstip 10.30.40.211
0:rx_tcpport 1980
0:udp_dstport 50000
0:udp_dstport2 50001
0:udp_srcip 10.30.30.55
1:rx_tcpport 1981
1:udp_dstport 50002
1:udp_dstport2 50003
1:udp_srcip 10.30.30.56
2:rx_tcpport 1982
2:udp_dstport 50004
2:udp_dstport2 50005
3:rx_tcpport 1983
3:udp_dstport 50006
3:udp_dstport2 50007
#2:rx_tcpport 1982
#2:udp_dstport 50004
#2:udp_dstport2 50005
#2:udp_srcip 10.30.30.52
#3:rx_tcpport 1983
#3:udp_dstport 50006
#3:udp_dstport2 50007
#3:udp_srcip 10.30.30.53
4:rx_tcpport 1984
4:udp_dstport 50008
4:udp_dstport2 50009
5:rx_tcpport 1985
5:udp_dstport 50010
5:udp_dstport2 50011
#4:rx_tcpport 1984
#4:udp_dstport 50008
#4:udp_dstport2 50009
#4:udp_srcip 10.30.30.54
#5:rx_tcpport 1985
#5:udp_dstport 50010
#5:udp_dstport2 50011
#5:udp_srcip 10.30.30.55
6:rx_tcpport 1986
6:udp_dstport 50012
6:udp_dstport2 50013
7:rx_tcpport 1987
7:udp_dstport 50014
7:udp_dstport2 50015
#6:rx_tcpport 1986
#6:udp_dstport 50012
#6:udp_dstport2 50013
#6:udp_srcip 10.30.30.56
#7:rx_tcpport 1987
#7:udp_dstport 50014
#7:udp_dstport2 50015
#7:udp_srcip 10.30.30.57
8:rx_tcpport 1988
8:udp_dstport 50016
8:udp_dstport2 50017
9:rx_tcpport 1989
9:udp_dstport 50018
9:udp_dstport2 50019
#8:rx_tcpport 1988
#8:udp_dstport 50016
#8:udp_dstport2 50017
#8:udp_srcip 10.30.30.58
#9:rx_tcpport 1989
#9:udp_dstport 50018
#9:udp_dstport2 50019
#9:udp_srcip 10.30.30.59
#10:rx_tcpport 1990
#10:udp_dstport 50020
#10:udp_dstport2 50021
#10:udp_srcip 10.30.30.60
#11:rx_tcpport 1991
#11:udp_dstport 50022
#11:udp_dstport 50023
#11:udp_srcip 10.30.30.61
#12:rx_tcpport 1992
#12:udp_dstport 50024
#12:udp_dstport2 50025
#12:udp_srcip 10.30.30.62
#13:rx_tcpport 1993
#13:udp_dstport 50026
#13:udp_dstport2 50027
#13:udp_srcip 10.30.30.63
#14:rx_tcpport 1994
#14:udp_dstport 50028
#14:udp_dstport2 50029
#14:udp_srcip 10.30.30.64
#15:rx_tcpport 1995
#15:udp_dstport 50030
#15:udp_dstport 50031
#15:udp_srcip 10.30.30.65
#16:rx_tcpport 1996
#16:udp_dstport 50032
#16:udp_dstport2 50033
#16:udp_srcip 10.30.30.66
#17:rx_tcpport 1997
#17:udp_dstport 50034
#17:udp_dstport2 50035
#17:udp_srcip 10.30.30.67
#18:rx_tcpport 1998
#18:udp_dstport 50036
#18:udp_dstport 50037
#18:udp_srcip 10.30.30.68
#19:rx_tcpport 1999
#19:udp_dstport 50038
#19:udp_dstport 50039
#19:udp_srcip 10.30.30.69
tengiga 1
+26
View File
@@ -0,0 +1,26 @@
#!/bin/bash
GREP="std|streamvis|rabbit|journalbeat"
RESTART="OFF"
STATUS_VERBOSE="OFF"
while getopts g:s: flag
do
case "${flag}" in
g) GREP=${OPTARG};;
# r) RESTART='ON';;
s) STATUS_VERBOSE=${OPTARG};;
esac
done
if [ ${STATUS_VERBOSE} = "ON" ]; then
systemctl list-units --type service --all | grep -E ${GREP} | awk '{print $1}' | xargs -I{} systemctl status {}
fi
#if [ RESTART = "ON" ]; then
# systemctl list-units --type service --all | grep -E ${GREP} | awk '{print $1}' | xargs -I{} systemctl restart {}
#fi
systemctl list-units --type service --all | grep -E ${GREP} | awk 'BEGIN{print "Unit State Status"};$4 ~ /^running$/{print $1,$2,$4}' | column -t
-42
View File
@@ -1,42 +0,0 @@
import socket
import numpy as np
frame_header_dt = np.dtype(
[
("Frame Number", "u8"),
("SubFrame Number/ExpLength", "u4"),
("Packet Number", "u4"),
("Bunch ID", "u8"),
("Timestamp", "u8"),
("Module Id", "u2"),
("Row", "u2"),
("Column", "u2"),
("Reserved", "u2"),
("Debug", "u4"),
("Round Robin Number", "u2"),
("Detector Type", "u1"),
("Header Version", "u1"),
]
)
hostnames = ['BEB111','BEB070','BEB060','BEB042','BEB004','BEB120','BEB105','BEB121','BEB092','BEB091','BEB058','BEB059','BEB030', 'BEB038','BEB110', 'BEB115', 'BEB095', 'BEB094', 'BEB119', 'BEB055']
# ip = "10.30.20.6"
ips = []
for hostname in hostnames:
ips.append(socket.gethostbyname('BEB111'))
ports = list(range(50000, 50204, 1))
sockets = [socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for i in range(len(ports))]
for s, p in zip(sockets, ports):
print("IP:Port: ", ip, p)
s.bind((ip, p))
while True:
for s in sockets:
data, address = s.recvfrom(4096)
h = np.frombuffer(data, count=1, dtype=frame_header_dt)[0]
print(
f'[{h["Timestamp"]}] frame: {h["Frame Number"]}, pkt:{h["Packet Number"]}, explength:{h["SubFrame Number/ExpLength"]}, module id: {h["Module Id"]} ,row: {h["Row"]}, column: {h["Column"]}'
)
+54
View File
@@ -0,0 +1,54 @@
#!/bin/bash
UDP=0
SYNC=0
ASSEMBLER=0
WRITER=0
HELP_FLAG=0
while getopts h:u:s:a:w: flag
do
case "${flag}" in
h ) HELP_FLAG=${OPTARG};;
u ) UDP=${OPTARG};;
s ) SYNC=${OPTARG};;
w ) WRITER=${OPTARG};;
a ) ASSEMBLER=${OPTARG};;
esac
done
if (( ${UDP} == 0 )) && (( ${SYNC} == 0 )) && (( ${ASSEMBLER} == 0 )) && (( ${WRITER} == 0 )); then
echo "Nothing to do..."
exit
fi
# prints help and exits
if (( ${HELP_FLAG} == 1 )); then
echo "Usage : $0 -u <udp_recvs> -s <stream> -a <assembler> -h <help_flag>"
echo " udp : kill udp receivers."
echo " sync : kill sync."
echo " assembler : kill assembler."
echo " writer : kill writer."
echo " help_flag : show this help and exits."
exit
fi
if (( ${UDP} == 1 )); then
echo "Killing upd recvs..."
ps aux | grep std_udp_recv | awk 'NR > 1 { print prev } { prev = $2 }' | xargs -I{} kill {}
fi
if (( ${SYNC} == 1 )); then
echo "Killing sync..."
ps aux | grep std_udp_sync | awk 'NR > 1 { print prev } { prev = $2 }' | xargs -I{} kill {}
fi
if (( ${ASSEMBLER} == 1 )); then
echo "Killing assembler..."
ps aux | grep eiger_assembler | awk 'NR > 1 { print prev } { prev = $2 }' | xargs -I{} kill {}
fi
if (( ${WRITER} == 1 )); then
echo "Killing writer..."
ps aux | grep mpiexec | awk 'NR > 1 { print prev } { prev = $2 }' | xargs -I{} kill {}
fi
Regular → Executable
View File
+28
View File
@@ -0,0 +1,28 @@
#!/bin/bash
GREP="std|streamvis|journalbeat"
RESTART="OFF"
STATUS_VERBOSE="OFF"
while getopts g:r:s: flag
do
case "${flag}" in
g) GREP=${OPTARG};;
r) RESTART=${OPTARG};;
s) STATUS_VERBOSE=${OPTARG};;
esac
done
if [ ${STATUS_VERBOSE} = "ON" ]; then
systemctl list-units --type service --all | grep -E ${GREP} | awk '{print $1}' | xargs -I{} systemctl status {}
else
systemctl list-units --type service --all | grep -E ${GREP} | awk 'BEGIN{print "Unit State Status"};$4 ~ /^running$/{print $1,$2,$4}'| column -t
fi
if [ ${RESTART} = "ON" ]; then
systemctl list-units --type service --all | grep -E ${GREP} | awk '{print $1}' | xargs -I{} systemctl restart {}
fi
#systemctl list-units --type service --all | grep -E ${GREP} | awk 'BEGIN{print "Unit State Status"};$4 ~ /^running$/{print $1,$2,$4}' | column -t
+7
View File
@@ -0,0 +1,7 @@
for i in {1..10}
do
echo "Acquire $i"
sls_detector_acquire
stat=`sls_detector_get status`
echo "$stat"
done
+30
View File
@@ -0,0 +1,30 @@
#!/bin/bash
# usage ./start_eiger_detector.sh Eiger 1
if [ $# -lt 1 ]
then
echo "Usage : $0 DETECTOR_NAME <number_of_cycles>"
echo " DETECTOR_NAME: Eiger..."
echo " number_of_cycles : optional, default 100"
exit
fi
SLS_DET_PACKAGE_PATH=''
#SLS_DET_PACKAGE_PATH='/home/dbe/git/sf_daq_buffer_eiger/slsDetectorPackage/build/bin/'
# SLS_DET_PACKAGE_PATH='/home/hax_l/sf_daq_buffer/slsDetectorPackage/build/bin/'
# DETECTOR=$1
n_cycles=1
if [ $# == 2 ]
then
n_cycles=$2
fi
${SLS_DET_PACKAGE_PATH}sls_detector_put timing auto
${SLS_DET_PACKAGE_PATH}sls_detector_put triggers ${n_cycles}
${SLS_DET_PACKAGE_PATH}sls_detector_put exptime 0.000005
${SLS_DET_PACKAGE_PATH}sls_detector_put frames 3600
${SLS_DET_PACKAGE_PATH}sls_detector_put dr 16
#sls_detector_put ${D}-clearbit to 0x5d 0 # normal mode, not highG0
${SLS_DET_PACKAGE_PATH}sls_detector_put acquire
echo "Now start trigger"
+157
View File
@@ -0,0 +1,157 @@
#!/bin/bash
# conda activate
export PATH=/home/dbe/miniconda2/bin:$PATH
source /home/dbe/miniconda2/etc/profile.d/conda.sh
conda deactivate
conda activate sf-daq
# path to build dir
BUILD_PATH='/home/dbe/git/sf_daq_buffer/build/'
# executables
UDP_RECV='std_udp_recv'
UDP_SYNC='std_udp_sync'
EIGER_ASSEMBLER='eiger_assembler'
STD_DET_WRITER='std_det_writer'
# default config file
CONFIG_FILE='/home/dbe/git/sf_daq_buffer/eiger/xbl-daq-28/eiger.json'
DET_CONFIG_FILE='/home/dbe/git/sf_daq_buffer/eiger/xbl-daq-28/5m.config'
DET_PARAM_FILE='/home/dbe/git/sf_daq_buffer/eiger/xbl-daq-28/test.par'
HELP_FLAG=0
# CONFIGURATION
BIT_DEPTH=16
N_MPI_EXEC=3
while getopts h:c:b:m: flag
do
case "${flag}" in
h ) HELP_FLAG=${OPTARG};;
c ) CONFIG_FILE=${OPTARG};;
b ) BIT_DEPTH=${OPTARG};;
m ) N_MPIT_EXEC=${OPTARG};;
esac
done
# prints help and exits
if (( ${HELP_FLAG} == 1 )); then
echo "Usage : $0 -h <help_flag> -c <config_file> -b <bit_depth>"
echo " help_flag : show this help and exits."
echo " config_file : detector configuration file."
echo " bit_depth : detector bit depth."
exit
fi
if [ -f "${CONFIG_FILE}" ]; then
N_UDP_RECVS="`cat ${CONFIG_FILE} | grep -P '"n_modules":' | grep -o '[0-9]\+'`"
else
echo "Something went wrong with the config file... ${CONFIG_FILE}..."
exit
fi
if [ ${N_MPI_EXEC} -gt 0 ]
then
echo "Number of mpi writers: ${N_MPI_EXEC}."
else
echo "Something went wrong with the number of mpi writer processes..."
exit
fi
if [ -f "${CONFIG_FILE}" ]; then
N_UDP_RECVS="`cat ${CONFIG_FILE} | grep -P '"n_modules":' | grep -o '[0-9]\+'`"
else
echo "Something went wrong with the config file... ${CONFIG_FILE}..."
exit
fi
# Start the receivers
echo "Starting ${N_UDP_RECVS} udp receivers..."
COUNTER=0
if [ -f "${BUILD_PATH}${UDP_RECV}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
if [ ${BIT_DEPTH} -ne 0 ]; then
while [ $COUNTER -lt ${N_UDP_RECVS} ]; do
${BUILD_PATH}${UDP_RECV} ${CONFIG_FILE} ${COUNTER} ${BIT_DEPTH} &
let COUNTER=COUNTER+1
sleep 0.5
done
else
echo "Error: ${BIT_DEPTH} can't be zero..."
exit
fi
else
echo "Something went wrong while starting the ${UDP_RECV}..."
exit
fi
else
echo "Error: ${UDP_RECV} wasn't found..."
exit
fi
# Start the std-udp-sync
echo "Starting the ${UDP_SYNC}..."
COUNTER=0
if [ -f "${BUILD_PATH}${UDP_SYNC}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
${BUILD_PATH}${UDP_SYNC} ${CONFIG_FILE} &
sleep 0.5
else
echo "Something went wrong while starting the ${UDP_SYNC}..."
exit
fi
else
echo "Error: ${UDP_SYNC} wasn't found..."
exit
fi
# Start the eiger assembler
echo "Starting the ${EIGER_ASSEMBLER}..."
if [ -f "${BUILD_PATH}${EIGER_ASSEMBLER}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
if [ ${BIT_DEPTH} -ne 0 ]; then
${BUILD_PATH}${EIGER_ASSEMBLER} ${CONFIG_FILE} ${BIT_DEPTH} &
sleep 0.5
else
echo "Error: ${BIT_DEPTH} can't be zero..."
exit
fi
else
echo "Something went wrong while starting the ${EIGER_ASSEMBLER}..."
exit
fi
else
echo "Error: ${EIGER_ASSEMBLER} wasn't found..."
exit
fi
# Start the eiger writer
echo "Starting the ${STD_DET_WRITER}..."
export PATH="/usr/lib64/mpich-3.2/bin:${PATH}";
export LD_LIBRARY_PATH="/usr/lib64/mpich-3.2/lib:${LD_LIBRARY_PATH}";
if [ -f "${BUILD_PATH}${STD_DET_WRITER}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
mpiexec -n ${N_MPI_EXEC} ${BUILD_PATH}${STD_DET_WRITER} ${CONFIG_FILE} ${BIT_DEPTH} &
sleep 0.5
else
echo "Something went wrong while starting the ${STD_DET_WRITER}..."
exit
fi
else
echo "Error: ${STD_DET_WRITER} wasn't found..."
exit
fi
# prepares the detector
echo "Preparing the detector configuration for a default acquisition...";
export PATH="/home/dbe/git/sf_daq_buffer/slsDetectorPackage/build/bin/:${PATH}";
sls_detector_put config ${DET_CONFIG_FILE}
sls_detector_put parameters ${DET_PARAM_FILE}
sls_detector_put trimbits /home/dbe/git/sf_daq_buffer/eiger/test/eiger.sn000
sls_detector_put dac vthreshold 2050
+56
View File
@@ -0,0 +1,56 @@
#!/usr/bin/python
from datetime import datetime
import sys
import getopt
import requests
import time
import json
from urllib.parse import urljoin
URL = "http://127.0.0.1:5000"
SYNC = "/write_sync"
ASYNC = "/write_async"
DET = "/detector/eiger"
headers = {'Content-type':'application/json'}
def prepare_det(dr):
print("Stoping the detector...")
stop_data = {'cmd': "STOP"}
r = requests.post(url=urljoin(URL,DET), headers=headers, json=stop_data)
time.sleep(0.1)
print("Configuring the detector...")
data_config = {"det_name":"eiger","config":{"frames":5000, "triggers":1, "exptime":0.0005, "period": 0.001, "timing":"auto", "tengiga":1, "dr":dr}}
r = requests.post(url=urljoin(URL,DET), headers=headers, json=data_config)
time.sleep(0.5)
print("Starting the detector...")
start_data = {'cmd':"START"}
r = requests.post(url = urljoin(URL,DET), headers=headers, json=start_data)
time.sleep(0.5)
def main(argv):
# details of request
n_images = 5
n_acquisitions = 5
for dr in [8, 16, 32]:
prepare_det(dr)
for j in [SYNC, ASYNC]:
print(f'Performing { j } aquisitions (bit depth { dr })...')
for i in range(0,n_acquisitions):
output_file ='/home/dbe/git/sf_daq_buffer/eiger/xbl-daq-28/output_folder%s_%s_req%s_dr%s.h5' % (j, datetime.now().strftime("%H%M%S"), i, dr)
data = {'sources':'BEC.EG01V01', 'n_images':n_images, 'output_file':output_file, 'user_id': 503}
r = requests.post(url = urljoin(URL,j), json=data, headers=headers)
print("RESPONSE FROM REQUEST: ", r.text)
if __name__ == "__main__":
main(sys.argv[1:])
-96
View File
@@ -1,96 +0,0 @@
#!/bin/bash
# path to build dir
BUILD_PATH='/home/dbe/git/sf_daq_buffer/build/'
# executables
UDP_RECV='std_udp_recv'
UDP_SYNC='std_udp_sync'
EIGER_ASSEMBLER='eiger_assembler'
# default config file
# CONFIG_FILE='/home/dbe/git/sf_daq_buffer/eiger/xbl-daq-28/eiger-5M.json'
CONFIG_FILE='/home/dbe/git/sf_daq_buffer/eiger/xbl-daq-28/eiger-1M.json'
HELP_FLAG=0
BIT_DEPTH=16
while getopts n:h:u:c:b: flag
do
case "${flag}" in
h ) HELP_FLAG=${OPTARG};;
c ) CONFIG_FILE=${OPTARG};;
b ) BIT_DEPTH=${OPTARG};;
esac
done
N_UDP_RECVS="`cat ${CONFIG_FILE} | grep -P '"n_modules":' | grep -o '[0-9]\+'`"
# prints help and exits
if (( ${HELP_FLAG} == 1 )); then
echo "Usage : $0 -n <n_udp_recvs> -c <compile_flag> -h <help_flag> -u <udp_executable> -c <config_file> -b <bit_depth>"
echo " n_udp_recvs : number of receivers."
echo " help_flag : show this help and exits."
echo " config_file : detector configuration file."
echo " bit_depth : detector bit depth."
exit
fi
# Start the receivers
echo "Starting ${N_UDP_RECVS} udp receivers..."
COUNTER=0
if [ -f "${UDP_RECV}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
while [ $COUNTER -lt ${N_UDP_RECVS} ]; do
"${BUILD_PATH}${UDP_RECV} ${CONFIG_FILE} ${COUNTER} ${BIT_DEPTH}" &
let COUNTER=COUNTER+1
sleep 0.5
done
else
echo "Something went wrong while starting the ${UDP_RECV}..."
exit
fi
else
echo "Error: ${UDP_RECV} wasn't found..."
exit
fi
# Start the std-udp-sync
echo "Starting the ${UDP_SYNC}..."
COUNTER=0
if [ -f "${UDP_SYNC}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
if [ ${BIT_DEPTH} -ne 0 ]; then
"${BUILD_PATH}${UDP_SYNC} ${CONFIG_FILE} ${BIT_DEPTH}" &
sleep 0.5
else
echo "Error: ${BIT_DEPTH} can't be zero..."
exit
fi
else
echo "Something went wrong while starting the ${UDP_SYNC}..."
exit
fi
else
echo "Error: ${UDP_SYNC} wasn't found..."
exit
fi
# Start the eiger assembler
echo "Starting the ${EIGER_ASSEMBLER}..."
COUNTER=0
if [ -f "${EIGER_ASSEMBLER}" ]; then
if [ -f "${CONFIG_FILE}" ]; then
"${BUILD_PATH}${EIGER_ASSEMBLER} ${CONFIG_FILE} ${BIT_DEPTH}" &
sleep 0.5
else
echo "Something went wrong while starting the ${EIGER_ASSEMBLER}..."
exit
fi
else
echo "Error: ${EIGER_ASSEMBLER} wasn't found..."
exit
fi
+12
View File
@@ -0,0 +1,12 @@
#! /bin/bash
while : ; do sleep 1 && echo $(date) && top -b -p 12790,12775,12774,12778 -n 1 | tail -n4;
#while true; do:
# sleep 1
# echo $(date)
# top -b -p 12790,12775,12774, 12778 -n1 | tail -4
# if [[ $input = "q" ]] || [[ $input = "Q" ]]
# then
# echo "Q pressed, quitting..."
# break
# fi;
done
+7 -9
View File
@@ -24,28 +24,25 @@ using namespace assembler_config;
int main (int argc, char *argv[])
{
if (argc != 3) {
if (argc != 2) {
cout << endl;
#ifndef USE_EIGER
cout << "Usage: jf_assembler [detector_json_filename] "
" [bit_depth]" << endl;
<< endl;
#else
cout << "Usage: eiger_assembler [detector_json_filename] "
" [bit_depth]" << endl;
<< endl;
#endif
cout << "\tdetector_json_filename: detector config file path.";
cout << endl;
cout << "\tbit_depth: bit depth of the image.";
cout << endl;
exit(-1);
}
auto config = BufferUtils::read_json_config(string(argv[1]));
const int bit_depth = atoi(argv[2]);
auto const stream_name = "assembler";
const size_t IMAGE_N_BYTES = config.image_height * config.image_width * bit_depth / 8;
const size_t IMAGE_N_BYTES = config.image_height * config.image_width * config.bit_depth / 8;
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, ASSEMBLER_ZMQ_IO_THREADS);
auto sender = BufferUtils::bind_socket(
@@ -62,13 +59,14 @@ int main (int argc, char *argv[])
cout << " || n_modules: " << config.n_modules;
cout << " || img width: " << config.image_width;
cout << " || img height: " << config.image_height;
cout << " || bit_depth: " << config.bit_depth;
cout << endl;
#endif
const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8;
const size_t FRAME_N_BYTES = MODULE_N_PIXELS * config.bit_depth / 8;
const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET;
EigerAssembler assembler(config.n_modules, bit_depth,
EigerAssembler assembler(config.n_modules, config.bit_depth,
config.image_width, config.image_height);
#ifdef DEBUG_OUTPUT
+2 -2
View File
@@ -17,16 +17,16 @@ struct DetWriterConfig {
return {
config_parameters["detector_name"].GetString(),
config_parameters["bit_depth"].GetInt(),
config_parameters["image_height"].GetInt(),
config_parameters["image_width"].GetInt(),
config_parameters["bit_depth"].GetInt(),
};
}
const std::string detector_name;
const int bit_depth;
const int image_height;
const int image_width;
const int bit_depth;
};
+101 -68
View File
@@ -2,6 +2,9 @@
#include <string>
#include <zmq.h>
#include <mpi.h>
#include <unistd.h>
#include <sstream>
#include <chrono>
#include "RamBuffer.hpp"
#include "BufferUtils.hpp"
@@ -12,9 +15,10 @@
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "date.h"
using namespace std;
using namespace date;
using namespace buffer_config;
using namespace live_writer_config;
@@ -42,11 +46,10 @@ int main (int argc, char *argv[])
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, LIVE_ZMQ_IO_THREADS);
auto receiver = BufferUtils::connect_socket_gf(
ctx, config.detector_name, "tcp://localhost:9911");
ctx, config.detector_name, "tcp://localhost:9667");
const size_t IMAGE_N_BYTES = config.image_width * config.image_height * config.bit_depth / 8;
JFH5Writer writer(config.detector_name);
WriterStats stats(config.detector_name, IMAGE_N_BYTES);
@@ -56,79 +59,109 @@ int main (int argc, char *argv[])
bool header_in = false;
int last_run_id = -1;
while (true) {
auto nbytes = zmq_recv(receiver, &recv_buffer_meta, sizeof(recv_buffer_meta), 0);
rapidjson::Document document;
if (document.Parse(recv_buffer_meta, nbytes).HasParseError()) {
std::string error_str(recv_buffer_meta, nbytes);
throw runtime_error(error_str);
}
const string output_file = document["output_file"].GetString();
const uint64_t image_id = document["image_id"].GetUint64();
const int run_id = document["run_id"].GetInt();
const int i_image = document["i_image"].GetInt();
const int n_images = document["n_images"].GetInt();
const int user_id = document["user_id"].GetInt();
//meta
auto header_nbytes = zmq_recv(receiver, &recv_buffer_meta, sizeof(recv_buffer_meta), 0);
if (header_nbytes != -1 && header_in == false){
header_in = true;
// cout << "Header nbytes " << header_nbytes << endl;
rapidjson::Document document;
if (document.Parse(recv_buffer_meta, header_nbytes).HasParseError()) {
std::string error_str(recv_buffer_meta, header_nbytes);
throw runtime_error(error_str);
const int status = document["status"].GetInt();
const rapidjson::Value& a = document["shape"];
const int width = a[0].GetInt();
const int heigth = a[1].GetInt();
const int dtype = 2;
// i_image == n_images -> end of run.
if (i_image == n_images && open_run == true) {
writer.close_run();
stats.end_run();
open_run = false;
#ifdef DEBUG_OUTPUT
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[std_daq_det_writer] Setting real group/user id..." << endl;
#endif
if (setresgid(0,0,0)){
stringstream error_message;
cout << " Problem setting real group id..." << endl;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[std_daq_det_writer] Cannot set real group id..." << endl;
throw runtime_error(error_message.str());
}
if (setresuid(0,0,0)){
stringstream error_message;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[std_daq_det_writer] Cannot set real user id..." << endl;
throw runtime_error(error_message.str());
}
continue;
}
// i_image == 0 -> we have a new run.
if (i_image == 0 && open_run == false) {
// TODO Improve changing GID and UID of the writer processes
// to be part of the deployment via the ansible deployment.
#ifdef DEBUG_OUTPUT
cout << "[" << std::chrono::system_clock::now() << "]";
cout << "[std_daq_det_writer] Setting process uid to " << user_id << endl;
#endif
if (setegid(user_id)) {
stringstream error_message;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[std_daq_det_writer] Cannot set group_id to " << user_id << endl;
throw runtime_error(error_message.str());
}
// #ifdef DEBUG_OUTPUT
// // using namespace date;
// // cout << " [" << std::chrono::system_clock::now() << "]";
// cout << " [std_det_writer::json metadata] :";
// rapidjson::StringBuffer strbuf;
// strbuf.Clear();
// rapidjson::Writer<rapidjson::StringBuffer> writer_json_dsa(strbuf);
// document.Accept(writer_json_dsa);
// cout << strbuf.GetString() << endl;
// cout << endl;
// #endif
const string output_file = document["output_file"].GetString();
const int run_id = document["run_id"].GetInt();
const int i_image = document["i_image"].GetInt();
const int n_images = document["n_images"].GetInt();
const int status = document["status"].GetInt();
const rapidjson::Value& a = document["shape"];
const int width = a[0].GetInt();
const int heigth = a[1].GetInt();
const int dtype = 2;
// i_image == n_images -> end of run.
if (i_image == n_images && open_run == true) {
writer.close_run();
stats.end_run();
open_run = false;
continue;
if (seteuid(user_id)) {
stringstream error_message;
error_message << "[" << std::chrono::system_clock::now() << "]";
error_message << "[std_daq_det_writer] Cannot set user_id to " << user_id << endl;
throw runtime_error(error_message.str());
}
// i_image == 0 -> we have a new run.
if (i_image == 0 && open_run == false) {
writer.open_run(output_file,
run_id,
n_images,
heigth,
width,
dtype);
open_run = true;
}
// data
auto img_nbytes = zmq_recv(receiver, &recv_buffer_data, sizeof(recv_buffer_data), 0);
if (img_nbytes != -1 && header_in == true && open_run == true){
// Fair distribution of images among writers.
if (i_image % n_writers == i_writer) {
stats.start_image_write();
writer.write_data(run_id, i_image, recv_buffer_data);
stats.end_image_write();
}
header_in = false;
}
writer.open_run(output_file,
run_id,
n_images,
heigth,
width,
dtype);
open_run = true;
}
// Only the first instance writes metadata.
if (i_writer == 0 && header_in == true && open_run == true) {
writer.write_meta_gf(run_id,
i_image,
// data
auto img_nbytes = zmq_recv(receiver, &recv_buffer_data, sizeof(recv_buffer_data), 0);
if (img_nbytes != -1 && header_in == true && open_run == true){
// Fair distribution of images among writers.
if (i_image % n_writers == i_writer) {
stats.start_image_write();
writer.write_data(run_id, i_image, recv_buffer_data);
stats.end_image_write();
}
header_in = false;
}
// Only the first instance writes metadata.
if (i_writer == 0 && header_in == true && open_run == true) {
writer.write_meta_gf(run_id, i_image,
(uint16_t)run_id,
(uint64_t)status);
}
}
}
}
}
+14 -4
View File
@@ -50,11 +50,21 @@ void ZmqLiveSender::send(const ImageMetadata& meta, const char *data,
// TODO: Here we need to send to streamvis and live analysis metadata(probably need to operate still on them) and data(not every frame)
header.AddMember("frame", meta.id, header_alloc);
header.AddMember("is_good_frame", meta.status, header_alloc);
header.AddMember("pulse_id", meta.id, header_alloc);
// image status convention 0 == good, 1 == frames missing, 2 == different ids
header.AddMember("is_good_frame", (meta.status == 0) ? 1 : 0, header_alloc);
header.AddMember("flip_data", true, header_alloc);
rapidjson::Value detector_name;
detector_name.SetString(det_name_.c_str(), header_alloc);
header.AddMember("detector_name", detector_name, header_alloc);
#ifdef USE_EIGER
header.AddMember("detector_name", "Eiger", header_alloc);
#else
rapidjson::Value detector_name;
detector_name.SetString(det_name_.c_str(), header_alloc);
header.AddMember("detector_name", detector_name, header_alloc);
#endif
header.AddMember("htype", "array-1.0", header_alloc);
+6 -7
View File
@@ -7,6 +7,7 @@
#include "stream_config.hpp"
#include "ZmqLiveSender.hpp"
#include <algorithm>
using namespace std;
using namespace stream_config;
@@ -14,12 +15,11 @@ using namespace buffer_config;
int main (int argc, char *argv[])
{
if (argc != 4) {
if (argc != 3) {
cout << endl;
cout << "Usage: std_stream_send [detector_json_filename]"
" [bit_depth] [stream_address]" << endl;
" [stream_address]" << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << "\tstream_address: address to bind the output stream." << endl;
cout << endl;
@@ -27,8 +27,7 @@ int main (int argc, char *argv[])
}
auto config = BufferUtils::read_json_config(string(argv[1]));
const int bit_depth = atoi(argv[2]);
const auto stream_address = string(argv[3]);
const auto stream_address = string(argv[2]);
auto ctx = zmq_ctx_new();
zmq_ctx_set(ctx, ZMQ_IO_THREADS, STREAM_ZMQ_IO_THREADS);
@@ -37,7 +36,7 @@ int main (int argc, char *argv[])
auto receiver_assembler = BufferUtils::connect_socket(
ctx, config.detector_name, "assembler");
const size_t IMAGE_N_BYTES = config.image_height * config.image_width * bit_depth / 8;
const size_t IMAGE_N_BYTES = config.image_height * config.image_width * config.bit_depth / 8;
RamBuffer image_buffer(config.detector_name + "_assembler",
sizeof(ImageMetadata), IMAGE_N_BYTES,
@@ -52,7 +51,7 @@ int main (int argc, char *argv[])
// gets the image data
char* dst_data = image_buffer.get_slot_data(meta.id);
// sends the json metadata with the data
sender.send(meta, dst_data, IMAGE_N_BYTES);
sender.send(meta, dst_data , IMAGE_N_BYTES);
}
}
+6
View File
@@ -19,6 +19,9 @@ struct UdpRecvConfig {
config_parameters["detector_name"].GetString(),
config_parameters["detector_type"].GetString(),
config_parameters["n_modules"].GetInt(),
config_parameters["bit_depth"].GetInt(),
config_parameters["image_height"].GetInt(),
config_parameters["image_width"].GetInt(),
config_parameters["start_udp_port"].GetInt(),
};
}
@@ -26,6 +29,9 @@ struct UdpRecvConfig {
const std::string detector_name;
const std::string detector_type;
const int n_modules;
const int bit_depth;
const int image_height;
const int image_width;
const int start_udp_port;
};
+4 -7
View File
@@ -18,21 +18,18 @@ using namespace BufferUtils;
int main (int argc, char *argv[]) {
if (argc != 4) {
if (argc != 3) {
cout << endl;
cout << "Usage: std_udp_recv [detector_json_filename] [module_id] "
"[bit_depth]";
cout << "Usage: std_udp_recv [detector_json_filename] [module_id] ";
cout << endl;
cout << "\tdetector_json_filename: detector config file path." << endl;
cout << "\tmodule_id: id of the module for this process." << endl;
cout << "\tbit_depth: bit depth of the incoming udp packets." << endl;
cout << endl;
exit(-1);
}
const auto config = UdpRecvConfig::from_json_file(string(argv[1]));
const int module_id = atoi(argv[2]);
const int bit_depth = atoi(argv[3]);
if (DETECTOR_TYPE != config.detector_type) {
throw runtime_error("UDP recv version for " + DETECTOR_TYPE +
@@ -40,7 +37,7 @@ int main (int argc, char *argv[]) {
}
const auto udp_port = config.start_udp_port + module_id;
const size_t FRAME_N_BYTES = MODULE_N_PIXELS * bit_depth / 8;
const size_t FRAME_N_BYTES = MODULE_N_PIXELS * config.bit_depth / 8;
const size_t N_PACKETS_PER_FRAME = FRAME_N_BYTES / DATA_BYTES_PER_PACKET;
FrameUdpReceiver receiver(udp_port, N_PACKETS_PER_FRAME);
@@ -54,7 +51,7 @@ int main (int argc, char *argv[]) {
ModuleFrame meta;
meta.module_id = module_id;
meta.bit_depth = bit_depth;
meta.bit_depth = config.bit_depth;
char* data = new char[FRAME_N_BYTES];
+3 -1
View File
@@ -17,12 +17,14 @@ struct UdpSyncConfig {
return {
config_parameters["detector_name"].GetString(),
config_parameters["n_modules"].GetInt()
config_parameters["n_modules"].GetInt(),
config_parameters["bit_depth"].GetInt()
};
}
const std::string detector_name;
const int n_modules;
const int bit_depth;
};