Support yaml config and run for swissfel-daqbuf-ca

This commit is contained in:
Dominik Werder
2023-01-06 16:25:46 +01:00
parent f781166053
commit 6b974e572f
7 changed files with 24 additions and 13 deletions

View File

@@ -18,6 +18,7 @@ bytes = "1.0.1"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_yaml = "0.9.16"
chrono = "0.4"
url = "2.2.2"
clap = { version = "4.0.22", features = ["derive", "cargo"] }

View File

@@ -51,13 +51,23 @@ async fn go() -> Result<(), Error> {
match opts.subcmd {
SubCmd::Retrieval(subcmd) => {
info!("daqbuffer {}", clap::crate_version!());
let mut config_file = File::open(subcmd.config).await?;
let mut config_file = File::open(&subcmd.config).await?;
let mut buf = Vec::new();
config_file.read_to_end(&mut buf).await?;
let node_config: NodeConfig = serde_json::from_slice(&buf)?;
let node_config: Result<NodeConfigCached, Error> = node_config.into();
let node_config = node_config?;
daqbufp2::run_node(node_config.clone()).await?;
if let Ok(cfg) = serde_json::from_slice::<NodeConfig>(&buf) {
let cfg: Result<NodeConfigCached, Error> = cfg.into();
let cfg = cfg?;
daqbufp2::run_node(cfg).await?;
} else if let Ok(cfg) = serde_yaml::from_slice::<NodeConfig>(&buf) {
let cfg: Result<NodeConfigCached, Error> = cfg.into();
let cfg = cfg?;
daqbufp2::run_node(cfg).await?;
} else {
return Err(Error::with_msg_no_trace(format!(
"can not parse config at {}",
subcmd.config
)));
}
}
SubCmd::Proxy(subcmd) => {
info!("daqbuffer proxy {}", clap::crate_version!());

View File

@@ -81,8 +81,8 @@ pub async fn chconf_from_database(channel: &Channel, ncc: &NodeConfigCached) ->
if let Some(series) = channel.series() {
let res = pgclient
.query(
"select channel, scalar_type, shape_dims from series_by_channel where series = $1",
&[&(series as i64)],
"select channel, scalar_type, shape_dims from series_by_channel where facility = $1 and series = $2",
&[&channel.backend(), &(series as i64)],
)
.await
.err_conv()?;

View File

@@ -52,7 +52,7 @@ pub async fn delay_io_medium() {
pub async fn create_connection(db_config: &Database) -> Result<Client, Error> {
// TODO use a common already running worker pool for these queries:
let d = db_config;
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, 5432, d.name);
let uri = format!("postgresql://{}:{}@{}:{}/{}", d.user, d.pass, d.host, d.port, d.name);
let (cl, conn) = tokio_postgres::connect(&uri, NoTls)
.await
.map_err(|e| format!("Can not connect to database: {e:?}"))

View File

@@ -59,7 +59,7 @@ pub async fn channel_search(req: Request<Body>, proxy_config: &ProxyConfig) -> R
let nt = |tag, res| {
let fut = async {
let body = hyper::body::to_bytes(res).await?;
info!("got a result {:?}", body);
//info!("got a result {:?}", body);
let res: ChannelSearchResult = match serde_json::from_slice(&body) {
Ok(k) => k,
Err(_) => {

View File

@@ -57,7 +57,7 @@ where
break;
}
};
debug!("collect_in_span see item");
trace!("collect_in_span see item");
match item {
Ok(item) => match item {
StreamItem::DataItem(item) => match item {
@@ -70,7 +70,7 @@ where
}
}
RangeCompletableItem::Data(mut item) => {
debug!("collect_in_span sees {}", item.len());
debug!("collect_in_span sees len {}", item.len());
if collector.is_none() {
let c = item.new_collector();
collector = Some(c);

View File

@@ -62,7 +62,7 @@ where
Ready(Ok(())) => {
let n = buf.filled().len();
self.buf.wadv(n)?;
debug!("recv bytes {}", n);
trace!("recv bytes {}", n);
Ready(Ok(n))
}
Ready(Err(e)) => Ready(Err(e.into())),
@@ -131,7 +131,7 @@ where
return Err(e);
}
self.inp_bytes_consumed += lentot as u64;
debug!("parsed frame well len {}", len);
trace!("parsed frame well len {}", len);
let ret = InMemoryFrame {
len,
tyid,