From 6b974e572f6b26d333b64b56d5c981be01e9edf0 Mon Sep 17 00:00:00 2001 From: Dominik Werder Date: Fri, 6 Jan 2023 16:25:46 +0100 Subject: [PATCH] Support yaml config and run for swissfel-daqbuf-ca --- daqbuffer/Cargo.toml | 1 + daqbuffer/src/bin/daqbuffer.rs | 20 +++++++++++++++----- dbconn/src/channelconfig.rs | 4 ++-- dbconn/src/dbconn.rs | 2 +- httpret/src/proxy/api4.rs | 2 +- streams/src/collect.rs | 4 ++-- streams/src/frames/inmem.rs | 4 ++-- 7 files changed, 24 insertions(+), 13 deletions(-) diff --git a/daqbuffer/Cargo.toml b/daqbuffer/Cargo.toml index cc37b26..a718657 100644 --- a/daqbuffer/Cargo.toml +++ b/daqbuffer/Cargo.toml @@ -18,6 +18,7 @@ bytes = "1.0.1" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" +serde_yaml = "0.9.16" chrono = "0.4" url = "2.2.2" clap = { version = "4.0.22", features = ["derive", "cargo"] } diff --git a/daqbuffer/src/bin/daqbuffer.rs b/daqbuffer/src/bin/daqbuffer.rs index e44be98..9cf4afe 100644 --- a/daqbuffer/src/bin/daqbuffer.rs +++ b/daqbuffer/src/bin/daqbuffer.rs @@ -51,13 +51,23 @@ async fn go() -> Result<(), Error> { match opts.subcmd { SubCmd::Retrieval(subcmd) => { info!("daqbuffer {}", clap::crate_version!()); - let mut config_file = File::open(subcmd.config).await?; + let mut config_file = File::open(&subcmd.config).await?; let mut buf = Vec::new(); config_file.read_to_end(&mut buf).await?; - let node_config: NodeConfig = serde_json::from_slice(&buf)?; - let node_config: Result = node_config.into(); - let node_config = node_config?; - daqbufp2::run_node(node_config.clone()).await?; + if let Ok(cfg) = serde_json::from_slice::(&buf) { + let cfg: Result = cfg.into(); + let cfg = cfg?; + daqbufp2::run_node(cfg).await?; + } else if let Ok(cfg) = serde_yaml::from_slice::(&buf) { + let cfg: Result = cfg.into(); + let cfg = cfg?; + daqbufp2::run_node(cfg).await?; + } else { + return Err(Error::with_msg_no_trace(format!( + "can not parse config at {}", + subcmd.config + ))); + } } SubCmd::Proxy(subcmd) => { info!("daqbuffer proxy {}", clap::crate_version!()); diff --git a/dbconn/src/channelconfig.rs b/dbconn/src/channelconfig.rs index a12371c..0cf13f4 100644 --- a/dbconn/src/channelconfig.rs +++ b/dbconn/src/channelconfig.rs @@ -81,8 +81,8 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) -> if let Some(series) = channel.series() { let res = pgclient .query( - "select channel, scalar_type, shape_dims from series_by_channel where series = $1", - &[&(series as i64)], + "select channel, scalar_type, shape_dims from series_by_channel where facility = $1 and series = $2", + &[&channel.backend(), &(series as i64)], ) .await .err_conv()?; diff --git a/dbconn/src/dbconn.rs b/dbconn/src/dbconn.rs index 8ce9686..08d7add 100644 --- a/dbconn/src/dbconn.rs +++ b/dbconn/src/dbconn.rs @@ -52,7 +52,7 @@ pub async fn delay_io_medium() { pub async fn create_connection(db_config: &Database) -> Result { // TODO use a common already running worker pool for these queries: let d = db_config; - let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name); + let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name); let (cl, conn) = tokio_postgres::connect(&uri, NoTls) .await .map_err(|e| format!("Can not connect to database: {e:?}")) diff --git a/httpret/src/proxy/api4.rs b/httpret/src/proxy/api4.rs index 373a20d..29dffb8 100644 --- a/httpret/src/proxy/api4.rs +++ b/httpret/src/proxy/api4.rs @@ -59,7 +59,7 @@ pub async fn channel_search(req: Request, proxy_config: &ProxyConfig) -> R let nt = |tag, res| { let fut = async { let body = hyper::body::to_bytes(res).await?; - info!("got a result {:?}", body); + //info!("got a result {:?}", body); let res: ChannelSearchResult = match serde_json::from_slice(&body) { Ok(k) => k, Err(_) => { diff --git a/streams/src/collect.rs b/streams/src/collect.rs index cced8a2..bf196a3 100644 --- a/streams/src/collect.rs +++ b/streams/src/collect.rs @@ -57,7 +57,7 @@ where break; } }; - debug!("collect_in_span see item"); + trace!("collect_in_span see item"); match item { Ok(item) => match item { StreamItem::DataItem(item) => match item { @@ -70,7 +70,7 @@ where } } RangeCompletableItem::Data(mut item) => { - debug!("collect_in_span sees {}", item.len()); + debug!("collect_in_span sees len {}", item.len()); if collector.is_none() { let c = item.new_collector(); collector = Some(c); diff --git a/streams/src/frames/inmem.rs b/streams/src/frames/inmem.rs index 1c7ea87..54ef891 100644 --- a/streams/src/frames/inmem.rs +++ b/streams/src/frames/inmem.rs @@ -62,7 +62,7 @@ where Ready(Ok(())) => { let n = buf.filled().len(); self.buf.wadv(n)?; - debug!("recv bytes {}", n); + trace!("recv bytes {}", n); Ready(Ok(n)) } Ready(Err(e)) => Ready(Err(e.into())), @@ -131,7 +131,7 @@ where return Err(e); } self.inp_bytes_consumed += lentot as u64; - debug!("parsed frame well len {}", len); + trace!("parsed frame well len {}", len); let ret = InMemoryFrame { len, tyid,