WIP Consider also waveform channels for pulse mapping

This commit is contained in:
Dominik Werder
2023-01-10 16:30:47 +01:00
parent 6b974e572f
commit 4a75793281
11 changed files with 237 additions and 76 deletions

View File

@@ -1,6 +1,6 @@
[package]
name = "daqbuffer"
version = "0.3.0"
version = "0.3.6"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -54,11 +54,13 @@ async fn go() -> Result<(), Error> {
let mut config_file = File::open(&subcmd.config).await?;
let mut buf = Vec::new();
config_file.read_to_end(&mut buf).await?;
if let Ok(cfg) = serde_json::from_slice::<NodeConfig>(&buf) {
if let Ok(cfg) = serde_json::from_slice::<NodeConfig>(b"nothing") {
info!("Parsed json config from {}", subcmd.config);
let cfg: Result<NodeConfigCached, Error> = cfg.into();
let cfg = cfg?;
daqbufp2::run_node(cfg).await?;
} else if let Ok(cfg) = serde_yaml::from_slice::<NodeConfig>(&buf) {
info!("Parsed yaml config from {}", subcmd.config);
let cfg: Result<NodeConfigCached, Error> = cfg.into();
let cfg = cfg?;
daqbufp2::run_node(cfg).await?;
@@ -71,10 +73,12 @@ async fn go() -> Result<(), Error> {
}
SubCmd::Proxy(subcmd) => {
info!("daqbuffer proxy {}", clap::crate_version!());
let mut config_file = File::open(subcmd.config).await?;
let mut config_file = File::open(&subcmd.config).await?;
let mut buf = Vec::new();
config_file.read_to_end(&mut buf).await?;
let proxy_config: ProxyConfig = serde_json::from_slice(&buf)?;
let proxy_config: ProxyConfig =
serde_yaml::from_slice(&buf).map_err(|e| Error::with_msg_no_trace(e.to_string()))?;
info!("Parsed yaml config from {}", subcmd.config);
daqbufp2::run_proxy(proxy_config.clone()).await?;
}
SubCmd::Client(client) => match client.client_type {

View File

@@ -15,3 +15,4 @@ impl<T, E: Convable> ErrConv<T> for Result<T, E> {
impl Convable for http::Error {}
impl Convable for hyper::Error {}
impl Convable for serde_yaml::Error {}

View File

@@ -1,6 +1,6 @@
[package]
name = "httpret"
version = "0.3.5"
version = "0.3.6"
authors = ["Dominik Werder <dominik.werder@gmail.com>"]
edition = "2021"

View File

@@ -13,6 +13,7 @@ use netpod::TableSizes;
use std::collections::VecDeque;
use std::time::Duration;
#[allow(unused)]
async fn table_sizes(node_config: &NodeConfigCached) -> Result<TableSizes, Error> {
let ret = dbconn::table_sizes(node_config).await?;
Ok(ret)
@@ -100,7 +101,7 @@ impl StatusNodesRecursive {
is_archiver_engine: node_config.node.channel_archiver.is_some(),
is_archiver_appliance: node_config.node.archiver_appliance.is_some(),
database_size: Some(database_size),
table_sizes: Some(table_sizes(node_config).await.map_err(Into::into)),
//table_sizes: Some(table_sizes(node_config).await.map_err(Into::into)),
archiver_appliance_status,
subs: VecDeque::new(),
};

View File

@@ -186,14 +186,14 @@ impl StatusNodesRecursive {
let mut bodies = Vec::new();
let mut urls = Vec::new();
let mut tags = Vec::new();
for backend in &proxy_config.backends {
match Url::parse(&format!("{}{}", backend.url, path)) {
for sub in &proxy_config.status_subs {
match Url::parse(&format!("{}{}", sub.url, path)) {
Ok(url) => {
bodies.push(None);
tags.push(url.to_string());
tags.push(sub.url.to_string());
urls.push(url);
}
Err(e) => return Err(Error::with_msg_no_trace(format!("parse error for: {backend:?} {e:?}"))),
Err(e) => return Err(Error::with_msg_no_trace(format!("parse error for: {sub:?} {e:?}"))),
}
}
let nt = |tag, res| {
@@ -246,7 +246,6 @@ impl StatusNodesRecursive {
is_archiver_engine: false,
is_archiver_appliance: false,
database_size: None,
table_sizes: None,
archiver_appliance_status: None,
subs,
};

View File

@@ -55,6 +55,8 @@ async fn make_tables(node_config: &NodeConfigCached) -> Result<(), Error> {
conn.execute(sql, &[]).await?;
let sql = "alter table map_pulse_files add if not exists hostname text not null default ''";
conn.execute(sql, &[]).await?;
let sql = "alter table map_pulse_files add if not exists ks int not null default 2";
conn.execute(sql, &[]).await?;
let sql = "create index if not exists map_pulse_files_ix2 on map_pulse_files (hostname)";
conn.execute(sql, &[]).await?;
let sql = "set client_min_messages = 'notice'";
@@ -75,31 +77,77 @@ fn timer_channel_names() -> Vec<String> {
.flatten()
.collect();
all.push("SIN-CVME-TIFGUN-EVR0:RX-PULSEID".into());
all.push("SAR-CVME-TIFALL4:EvtSet".into());
all.push("SAR-CVME-TIFALL5:EvtSet".into());
all.push("SAR-CVME-TIFALL6:EvtSet".into());
all.push("SAT-CVME-TIFALL5:EvtSet".into());
all.push("SAT-CVME-TIFALL6:EvtSet".into());
all
}
async fn datafiles_for_channel(name: String, node_config: &NodeConfigCached) -> Result<Vec<PathBuf>, Error> {
let mut a = vec![];
#[derive(Debug)]
enum MapfilePath {
Scalar(PathBuf),
Index(PathBuf, PathBuf),
}
async fn datafiles_for_channel(name: String, node_config: &NodeConfigCached) -> Result<Vec<MapfilePath>, Error> {
let mut a = Vec::new();
let sfc = node_config.node.sf_databuffer.as_ref().unwrap();
let channel_path = sfc
.data_base_path
.join(format!("{}_2", sfc.ksprefix))
.join("byTime")
.join(&name);
let mut rd = tokio::fs::read_dir(&channel_path).await?;
while let Ok(Some(entry)) = rd.next_entry().await {
let mut rd2 = tokio::fs::read_dir(entry.path()).await?;
while let Ok(Some(e2)) = rd2.next_entry().await {
let mut rd3 = tokio::fs::read_dir(e2.path()).await?;
while let Ok(Some(e3)) = rd3.next_entry().await {
if e3.file_name().to_string_lossy().ends_with("_00000_Data") {
//info!("path: {:?}", e3.path());
a.push(e3.path());
match tokio::fs::read_dir(&channel_path).await {
Ok(mut rd) => {
while let Ok(Some(entry)) = rd.next_entry().await {
let mut rd2 = tokio::fs::read_dir(entry.path()).await?;
while let Ok(Some(e2)) = rd2.next_entry().await {
let mut rd3 = tokio::fs::read_dir(e2.path()).await?;
while let Ok(Some(e3)) = rd3.next_entry().await {
if e3.file_name().to_string_lossy().ends_with("_00000_Data") {
let x = MapfilePath::Scalar(e3.path());
a.push(x);
}
}
}
}
Ok(a)
}
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => {
let channel_path = sfc
.data_base_path
.join(format!("{}_3", sfc.ksprefix))
.join("byTime")
.join(&name);
match tokio::fs::read_dir(&channel_path).await {
Ok(mut rd) => {
while let Ok(Some(entry)) = rd.next_entry().await {
let mut rd2 = tokio::fs::read_dir(entry.path()).await?;
while let Ok(Some(e2)) = rd2.next_entry().await {
let mut rd3 = tokio::fs::read_dir(e2.path()).await?;
while let Ok(Some(e3)) = rd3.next_entry().await {
if e3.file_name().to_string_lossy().ends_with("_00000_Data_Index") {
let fns = e3.file_name().to_string_lossy().to_string();
let path_data = e3.path().parent().unwrap().join(&fns[..fns.len() - 6]);
let x = MapfilePath::Index(e3.path(), path_data);
a.push(x);
}
}
}
}
Ok(a)
}
Err(e) => match e.kind() {
_ => return Err(e)?,
},
}
}
_ => return Err(e)?,
},
}
Ok(a)
}
#[derive(Debug)]
@@ -127,6 +175,57 @@ async fn read_buf_or_eof(file: &mut File, buf: &mut BytesMut) -> Result<usize, E
Ok(m)
}
async fn read_first_index_chunk(
mut file_index: File,
file_data: File,
) -> Result<(Option<ChunkInfo>, File, File), Error> {
file_index.seek(SeekFrom::Start(0)).await?;
let mut buf = BytesMut::with_capacity(1024);
let n1 = read_buf_or_eof(&mut file_index, &mut buf).await?;
if n1 < 18 {
let msg = format!("can not even read 18 bytes from index file n1 {}", n1);
warn!("{msg}");
return Ok((None, file_index, file_data));
}
let ver = buf.get_i16();
if ver != 0 {
return Err(Error::with_msg_no_trace(format!("unknown file version ver {}", ver)));
}
let ts = buf.get_u64();
let pos = buf.get_u64();
trace!("read_first_index_chunk ts {ts} pos {pos}");
let (chunk, file_data) = read_chunk_at(file_data, pos, None).await?;
trace!("read_first_index_chunk successful: {chunk:?}");
Ok((Some(chunk), file_index, file_data))
}
async fn read_last_index_chunk(
mut file_index: File,
file_data: File,
) -> Result<(Option<ChunkInfo>, File, File), Error> {
let flen = file_index.seek(SeekFrom::End(0)).await?;
let entry_len = 16;
let c1 = (flen - 2) / entry_len;
if c1 == 0 {
return Ok((None, file_index, file_data));
}
let p2 = 2 + (c1 - 1) * entry_len;
file_index.seek(SeekFrom::Start(p2)).await?;
let mut buf = BytesMut::with_capacity(1024);
let n1 = read_buf_or_eof(&mut file_index, &mut buf).await?;
if n1 < 16 {
let msg = format!("can not even read 16 bytes from index file n1 {}", n1);
warn!("{msg}");
return Ok((None, file_index, file_data));
}
let ts = buf.get_u64();
let pos = buf.get_u64();
trace!("read_last_index_chunk ts {ts} pos {pos}");
let (chunk, file_data) = read_chunk_at(file_data, pos, None).await?;
trace!("read_last_index_chunk successful: {chunk:?}");
Ok((Some(chunk), file_index, file_data))
}
async fn read_first_chunk(mut file: File) -> Result<(Option<ChunkInfo>, File), Error> {
file.seek(SeekFrom::Start(0)).await?;
let mut buf = BytesMut::with_capacity(1024);
@@ -195,27 +294,29 @@ async fn read_last_chunk(mut file: File, pos_first: u64, chunk_len: u64) -> Resu
Ok((Some(ret), file))
}
async fn read_chunk_at(mut file: File, pos: u64, chunk_len: u64) -> Result<(ChunkInfo, File), Error> {
async fn read_chunk_at(mut file: File, pos: u64, chunk_len: Option<u64>) -> Result<(ChunkInfo, File), Error> {
file.seek(SeekFrom::Start(pos)).await?;
let mut buf = BytesMut::with_capacity(1024);
let n1 = file.read_buf(&mut buf).await?;
if n1 < 4 + 3 * 8 {
return Err(Error::with_msg_no_trace(format!(
"can not read enough from datafile n1 {}",
"read_chunk_at can not read enough from datafile n1 {}",
n1
)));
}
let clen = buf.get_u32() as u64;
if clen != chunk_len {
return Err(Error::with_msg_no_trace(format!(
"read_chunk_at mismatch: pos {} clen {} chunk_len {}",
pos, clen, chunk_len
)));
if let Some(chunk_len) = chunk_len {
if clen != chunk_len {
return Err(Error::with_msg_no_trace(format!(
"read_chunk_at mismatch: pos {} clen {} chunk_len {}",
pos, clen, chunk_len
)));
}
}
let _ttl = buf.get_u64();
let ts = buf.get_u64();
let pulse = buf.get_u64();
//info!("data chunk len {} ts {} pulse {}", clen, ts, pulse);
info!("data chunk len {} ts {} pulse {}", clen, ts, pulse);
let ret = ChunkInfo {
pos,
len: clen,
@@ -256,43 +357,88 @@ impl IndexFullHttpFunction {
let files = datafiles_for_channel(channel_name.clone(), node_config).await?;
msg = format!("{}\n{:?}", msg, files);
let mut latest_pair = (0, 0);
for path in files {
let splitted: Vec<_> = path.to_str().unwrap().split("/").collect();
let timebin: u64 = splitted[splitted.len() - 3].parse()?;
let split: u64 = splitted[splitted.len() - 2].parse()?;
if false {
info!(
"hostname {} timebin {} split {}",
node_config.node.host, timebin, split
);
}
let file = tokio::fs::OpenOptions::new().read(true).open(&path).await?;
let (r2, file) = read_first_chunk(file).await?;
msg = format!("{}\n{:?}", msg, r2);
if let Some(r2) = r2 {
let (r3, _file) = read_last_chunk(file, r2.pos, r2.len).await?;
msg = format!("{}\n{:?}", msg, r3);
if let Some(r3) = r3 {
if r3.pulse > latest_pair.0 {
latest_pair = (r3.pulse, r3.ts);
for mp in files {
match mp {
MapfilePath::Scalar(path) => {
let splitted: Vec<_> = path.to_str().unwrap().split("/").collect();
let timebin: u64 = splitted[splitted.len() - 3].parse()?;
let split: u64 = splitted[splitted.len() - 2].parse()?;
if false {
info!(
"hostname {} timebin {} split {}",
node_config.node.host, timebin, split
);
}
let file = tokio::fs::OpenOptions::new().read(true).open(&path).await?;
let (r2, file) = read_first_chunk(file).await?;
msg = format!("{}\n{:?}", msg, r2);
if let Some(r2) = r2 {
let (r3, _file) = read_last_chunk(file, r2.pos, r2.len).await?;
msg = format!("{}\n{:?}", msg, r3);
if let Some(r3) = r3 {
if r3.pulse > latest_pair.0 {
latest_pair = (r3.pulse, r3.ts);
}
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
conn.execute(
sql,
&[
&channel_name,
&(split as i32),
&(timebin as i32),
&(r2.pulse as i64),
&(r3.pulse as i64),
&node_config.node.host,
],
)
.await?;
}
} else {
warn!("could not find first event chunk in {path:?}");
}
}
MapfilePath::Index(path_index, path_data) => {
trace!("Index {path_index:?}");
let splitted: Vec<_> = path_index.to_str().unwrap().split("/").collect();
let timebin: u64 = splitted[splitted.len() - 3].parse()?;
let split: u64 = splitted[splitted.len() - 2].parse()?;
if false {
info!(
"hostname {} timebin {} split {}",
node_config.node.host, timebin, split
);
}
let file_index = tokio::fs::OpenOptions::new().read(true).open(&path_index).await?;
let file_data = tokio::fs::OpenOptions::new().read(true).open(&path_data).await?;
let (r2, file_index, file_data) = read_first_index_chunk(file_index, file_data).await?;
msg = format!("{}\n{:?}", msg, r2);
if let Some(r2) = r2 {
let (r3, _file_index, _file_data) = read_last_index_chunk(file_index, file_data).await?;
msg = format!("{}\n{:?}", msg, r3);
if let Some(r3) = r3 {
if r3.pulse > latest_pair.0 {
latest_pair = (r3.pulse, r3.ts);
}
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname, ks) values ($1, $2, $3, $4, $5, $6, 3) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
conn.execute(
sql,
&[
&channel_name,
&(split as i32),
&(timebin as i32),
&(r2.pulse as i64),
&(r3.pulse as i64),
&node_config.node.host,
],
)
.await?;
}
} else {
warn!("could not find first event chunk in {path_index:?}");
}
// TODO remove update of static columns when older clients are removed.
let sql = "insert into map_pulse_files (channel, split, timebin, pulse_min, pulse_max, hostname) values ($1, $2, $3, $4, $5, $6) on conflict (channel, split, timebin) do update set pulse_min = $4, pulse_max = $5, upc1 = map_pulse_files.upc1 + 1, hostname = $6";
conn.execute(
sql,
&[
&channel_name,
&(split as i32),
&(timebin as i32),
&(r2.pulse as i64),
&(r3.pulse as i64),
&node_config.node.host,
],
)
.await?;
}
} else {
warn!("could not find first event chunk in {path:?}");
}
}
info!("latest for {channel_name} {latest_pair:?}");
@@ -449,7 +595,7 @@ async fn search_pulse(pulse: u64, path: &Path) -> Result<Option<u64>, Error> {
return Ok(None);
}
let m = p1 + d / chunk_len / 2 * chunk_len;
let (z, f2) = read_chunk_at(f1, m, chunk_len).await?;
let (z, f2) = read_chunk_at(f1, m, Some(chunk_len)).await?;
f1 = f2;
if z.pulse == pulse {
return Ok(Some(z.ts));
@@ -621,7 +767,7 @@ impl MapPulseLocalHttpFunction {
.parse()
.map_err(|_| Error::with_public_msg_no_trace(format!("can not understand pulse map url: {}", req.uri())))?;
let conn = dbconn::create_connection(&node_config.node_config.cluster.database).await?;
let sql = "select channel, hostname, timebin, split from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)";
let sql = "select channel, hostname, timebin, split, ks from map_pulse_files where hostname = $1 and pulse_min <= $2 and (pulse_max >= $2 or closed = 0)";
let rows = conn.query(sql, &[&node_config.node.host, &(pulse as i64)]).await?;
let cands: Vec<_> = rows
.iter()
@@ -630,7 +776,8 @@ impl MapPulseLocalHttpFunction {
let hostname: String = r.try_get(1).unwrap_or("nohost".into());
let timebin: i32 = r.try_get(2).unwrap_or(0);
let split: i32 = r.try_get(3).unwrap_or(0);
(channel, hostname, timebin as u32, split as u32)
let ks: i32 = r.try_get(4).unwrap_or(0);
(channel, hostname, timebin as u32, split as u32, ks as u32)
})
.collect();
//let mut msg = String::new();
@@ -638,8 +785,7 @@ impl MapPulseLocalHttpFunction {
//write!(&mut msg, "cands: {:?}\n", cands)?;
let mut tss = Vec::new();
let mut channels = Vec::new();
for (ch, _, tb, sp) in cands {
let ks = 2;
for (ch, _, tb, sp, ks) in cands {
match disk::paths::data_path_tb(ks, &ch, tb, 86400000, sp, &node_config.node) {
Ok(path) => {
//write!(&mut msg, "data_path_tb: {:?}\n", path)?;

View File

@@ -568,8 +568,8 @@ pub struct NodeStatus {
pub is_archiver_appliance: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub database_size: Option<Result<u64, String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub table_sizes: Option<Result<TableSizes, Error>>,
//#[serde(default, skip_serializing_if = "Option::is_none")]
//pub table_sizes: Option<Result<TableSizes, Error>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub archiver_appliance_status: Option<NodeStatusArchiverAppliance>,
#[serde(default, skip_serializing_if = "VecDeque::is_empty")]
@@ -1998,12 +1998,18 @@ pub struct ProxyBackend {
pub url: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StatusSub {
pub url: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProxyConfig {
pub name: String,
pub listen: String,
pub port: u16,
pub backends: Vec<ProxyBackend>,
pub status_subs: Vec<StatusSub>,
}
pub trait HasBackend {

View File

@@ -195,7 +195,7 @@ async fn events_conn_handler_inner_try(
})
.then(|(item, n, d)| async move {
if let Some(d) = d {
debug!("sleep {} times {:?}", n, d);
warn!("sleep {} times {:?}", n, d);
tokio::time::sleep(d).await;
}
item

View File

@@ -71,7 +71,7 @@ where
netout.flush().await?;
netout.forget();
// TODO for images, we need larger buffer capacity
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 256);
let frames = InMemoryFrameAsyncReadStream::new(netin, 1024 * 1024 * 2);
let stream = EventsFromFrames::<_, T>::new(frames);
streams.push(Box::pin(stream) as _);
}

View File

@@ -100,11 +100,15 @@ fn tracing_init_inner() -> Result<(), Error> {
time::format_description::parse(fmtstr).map_err(|e| format!("{e}"))?,
);
if true {
let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.from_env()
.map_err(|e| Error::with_msg_no_trace(format!("can not build tracing env filter {e}")))?;
let fmt_layer = tracing_subscriber::fmt::Layer::new()
.with_timer(timer)
.with_target(true)
.with_thread_names(true)
.with_filter(tracing_subscriber::EnvFilter::from_default_env());
.with_filter(filter);
let z = tracing_subscriber::registry().with(fmt_layer);
#[cfg(CONSOLE)]
{